diff options
| author | Will Hawkins <[email protected]> | 2022-11-07 01:54:24 -0500 |
|---|---|---|
| committer | Will Hawkins <[email protected]> | 2022-12-11 16:53:59 -0500 |
| commit | 094eb99990f1cf734efb8104e0089dbc6920547e (patch) | |
| tree | 8b8bb26481690f852115ebc236be169669f1b4f0 | |
| parent | 096b9d30559f86e07117ff5459c720900a408a11 (diff) | |
[Feature] Rev 3 of Stability (Basic implementation)
| -rw-r--r-- | constants/constants.go | 4 | ||||
| -rw-r--r-- | networkQuality.go | 431 | ||||
| -rw-r--r-- | rpm/rpm.go | 120 |
3 files changed, 276 insertions, 279 deletions
diff --git a/constants/constants.go b/constants/constants.go index 6934459..99de22d 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -28,15 +28,13 @@ var ( InstantaneousThroughputMeasurementCount int = 4 InstantaneousProbeMeasurementCount int = 1 // The number of instantaneous moving averages to consider when determining stability. - InstantaneousMovingAverageCount int = 4 + InstantaneousMovingAverageStabilityCount int = 4 // The standard deviation cutoff used to determine stability among the K preceding moving averages // of a measurement (as a percentage of the mean). StabilityStandardDeviation float64 = 5.0 // The amount of time that the client will cooldown if it is in debug mode. CooldownPeriod time.Duration = 4 * time.Second - // The number of probes to send when calculating RTT. - MeasurementProbeCount int = 5 // The amount of time that we give ourselves to calculate the RPM. RPMCalculationTime int = 10 diff --git a/networkQuality.go b/networkQuality.go index 806f2f7..9235a90 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -29,7 +29,9 @@ import ( "github.com/network-quality/goresponsiveness/debug" "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" + "github.com/network-quality/goresponsiveness/ms" "github.com/network-quality/goresponsiveness/rpm" + "github.com/network-quality/goresponsiveness/stabilizer" "github.com/network-quality/goresponsiveness/timeoutat" "github.com/network-quality/goresponsiveness/utilities" ) @@ -98,17 +100,24 @@ func main() { // This is the overall operating context of the program. All other // contexts descend from this one. Canceling this one cancels all // the others. - operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background()) + operatingCtx, operatingCtxCancel := context.WithCancel(context.Background()) - // - lgDataCollectionCtx, cancelLGDataCollectionCtx := context.WithCancel(operatingCtx) + // This context is used to control the load generators -- we cancel it when + // the system has completed its work. (i.e, rpm and saturation are stable). + // The *operator* contexts control stopping the goroutines that are running + // the process; the *throughput* contexts control whether the load generators + // continue to add new connections at every interval. + uploadLoadGeneratorOperatorCtx, uploadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx) + downloadLoadGeneratorOperatorCtx, downloadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx) - // This context is used to control the load-generating network activity (i.e., all - // the connections that are open to do load generation). - lgNetworkActivityCtx, cancelLgNetworkActivityCtx := context.WithCancel(operatingCtx) + // This context is used to control the load-generating network activity (i.e., it controls all + // the connections that are open to do load generation and probing). Cancelling this context will close + // all the network connections that are responsible for generating the load. + lgNetworkActivityCtx, lgNetworkActivityCtxCancel := context.WithCancel(operatingCtx) + + // This context is used to control the activity of the prober. + proberCtx, proberCtxCancel := context.WithCancel(operatingCtx) - // This context is used to control the activity of the foreign prober. - foreignProbertCtx, foreignProberCtxCancel := context.WithCancel(operatingCtx) config := &config.Config{} var debugLevel debug.DebugLevel = debug.Error @@ -191,10 +200,11 @@ func main() { } } - var selfDataLogger datalogger.DataLogger[rpm.ProbeDataPoint] = nil - var foreignDataLogger datalogger.DataLogger[rpm.ProbeDataPoint] = nil + var selfProbeDataLogger datalogger.DataLogger[rpm.ProbeDataPoint] = nil + var foreignProbeDataLogger datalogger.DataLogger[rpm.ProbeDataPoint] = nil var downloadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil var uploadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil + // User wants to log data from each probe! if *dataLoggerBaseFileName != "" { var err error = nil @@ -214,7 +224,7 @@ func main() { "-throughput-upload"+unique, ) - selfDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ProbeDataPoint]( + selfProbeDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ProbeDataPoint]( dataLoggerSelfFilename, ) if err != nil { @@ -222,10 +232,10 @@ func main() { "Warning: Could not create the file for storing self probe results (%s). Disabling functionality.\n", dataLoggerSelfFilename, ) - selfDataLogger = nil + selfProbeDataLogger = nil } - foreignDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ProbeDataPoint]( + foreignProbeDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ProbeDataPoint]( dataLoggerForeignFilename, ) if err != nil { @@ -233,7 +243,7 @@ func main() { "Warning: Could not create the file for storing foreign probe results (%s). Disabling functionality.\n", dataLoggerForeignFilename, ) - foreignDataLogger = nil + foreignProbeDataLogger = nil } downloadThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint]( @@ -258,6 +268,20 @@ func main() { uploadThroughputDataLogger = nil } } + // If, for some reason, the data loggers are nil, make them Null Data Loggers so that we don't have conditional + // code later. + if selfProbeDataLogger == nil { + selfProbeDataLogger = datalogger.CreateNullDataLogger[rpm.ProbeDataPoint]() + } + if foreignProbeDataLogger == nil { + foreignProbeDataLogger = datalogger.CreateNullDataLogger[rpm.ProbeDataPoint]() + } + if downloadThroughputDataLogger == nil { + downloadThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]() + } + if uploadThroughputDataLogger == nil { + uploadThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]() + } /* * Create (and then, ironically, name) two anonymous functions that, when invoked, @@ -278,194 +302,209 @@ func main() { generateSelfProbeConfiguration := func() rpm.ProbeConfiguration { return rpm.ProbeConfiguration{ - URL: config.Urls.SmallUrl, - DataLogger: selfDataLogger, - Interval: 100 * time.Millisecond, + URL: config.Urls.SmallUrl, } } generateForeignProbeConfiguration := func() rpm.ProbeConfiguration { return rpm.ProbeConfiguration{ - URL: config.Urls.SmallUrl, - DataLogger: foreignDataLogger, - Interval: 100 * time.Millisecond, + URL: config.Urls.SmallUrl, } } var downloadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "download") var uploadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "upload") - var foreignDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "foreign probe") + var combinedProbeDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "combined probe") + + downloadLoadGeneratingConnectionCollection := lgc.NewLoadGeneratingConnectionCollection() + uploadLoadGeneratingConnectionCollection := lgc.NewLoadGeneratingConnectionCollection() // TODO: Separate contexts for load generation and data collection. If we do that, if either of the two // data collection go routines stops well before the other, they will continue to send probes and we can // generate additional information! - downloadSaturationComplete, downloadDataCollectionChannel := rpm.LGCollectData( - lgDataCollectionCtx, + selfProbeConnectionCommunicationChannel, downloadThroughputChannel := rpm.LoadGenerator( lgNetworkActivityCtx, - operatingCtx, + downloadLoadGeneratorOperatorCtx, + time.Second, generate_lgd, - generateSelfProbeConfiguration, - downloadThroughputDataLogger, + &downloadLoadGeneratingConnectionCollection, downloadDebugging, ) - uploadSaturationComplete, uploadDataCollectionChannel := rpm.LGCollectData( - lgDataCollectionCtx, + _, uploadThroughputChannel := rpm.LoadGenerator( lgNetworkActivityCtx, - operatingCtx, + uploadLoadGeneratorOperatorCtx, + time.Second, generate_lgu, - generateSelfProbeConfiguration, - uploadThroughputDataLogger, + &uploadLoadGeneratingConnectionCollection, uploadDebugging, ) - foreignProbeDataPointsChannel := rpm.ForeignProber( - foreignProbertCtx, + // start here. + + selfDownProbeConnection := <-selfDownProbeConnectionCommunicationChannel + selfUpProbeConnection := <-selfUpProbeConnectionCommunicationChannel + + probeDataPointsChannel := rpm.CombinedProber( + proberCtx, generateForeignProbeConfiguration, + generateSelfProbeConfiguration, + selfProbeConnection, + time.Millisecond*100, sslKeyFileConcurrentWriter, - foreignDebugging, + combinedProbeDebugging, ) - dataCollectionTimeout := false - uploadDataGenerationComplete := false - downloadDataGenerationComplete := false - downloadDataCollectionResult := rpm.SelfDataCollectionResult{} - uploadDataCollectionResult := rpm.SelfDataCollectionResult{} + responsivenessIsStable := false + downloadThroughputIsStable := false + uploadThroughputIsStable := false - for !(uploadDataGenerationComplete && downloadDataGenerationComplete) { - select { - case fullyComplete := <-downloadSaturationComplete: - { - downloadDataGenerationComplete = true - if *debugCliFlag { - fmt.Printf( - "################# download load-generating data generation is %s complete!\n", - utilities.Conditional(fullyComplete, "", "(provisionally)")) - } - } - case fullyComplete := <-uploadSaturationComplete: - { - uploadDataGenerationComplete = true - if *debugCliFlag { - fmt.Printf( - "################# upload load-generating data generation is %s complete!\n", - utilities.Conditional(fullyComplete, "", "(provisionally)")) - } - } - case <-timeoutChannel: - { - if dataCollectionTimeout { - // We already timedout on data collection. This signal means that - // we are timedout on getting the provisional data collection. We - // will exit! - fmt.Fprint( - os.Stderr, - "Error: Load-Generating data collection could not be completed in time and no provisional data could be gathered. Test failed.\n", - ) - cancelOperatingCtx() - if *debugCliFlag { - time.Sleep(constants.CooldownPeriod) - } - return // Ends program - } - dataCollectionTimeout = true + // Test parameters: + // 1. I: The number of previous instantaneous measurements to consider when generating + // the so-called instantaneous moving averages. + // 2. K: The number of instantaneous moving averages to consider when determining stability. + // 3: S: The standard deviation cutoff used to determine stability among the K preceding + // moving averages of a measurement. - // We timed out attempting to collect data about the link. So, we will - // shut down the generators - cancelLGDataCollectionCtx() - // and then we will give ourselves some additional time in order - // to see if we can get some provisional data. - timeoutAbsoluteTime = time.Now(). - Add(time.Second * time.Duration(*rpmtimeout)) - timeoutChannel = timeoutat.TimeoutAt( - operatingCtx, - timeoutAbsoluteTime, - debugLevel, - ) - if *debugCliFlag { - fmt.Printf( - "################# timeout collecting load-generating data!\n", - ) - } - } - } + throughputI := constants.InstantaneousThroughputMeasurementCount + probeI := constants.InstantaneousProbeMeasurementCount + K := constants.InstantaneousMovingAverageStabilityCount + S := constants.StabilityStandardDeviation + + downloadThroughputStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, "Download Throughput Stabilizer") + downloadThroughputStabilizerDebugLevel := debug.Error + if *debugCliFlag { + downloadThroughputStabilizerDebugLevel = debug.Debug } + downloadThroughputStabilizer := stabilizer.NewThroughputStabilizer(throughputI, K, S, downloadThroughputStabilizerDebugLevel, downloadThroughputStabilizerDebugConfig) + uploadThroughputStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, "Upload Throughput Stabilizer") + uploadThroughputStabilizerDebugLevel := debug.Error if *debugCliFlag { - fmt.Printf("Stopping all the load generating data generators.\n") + uploadThroughputStabilizerDebugLevel = debug.Debug } - // Just cancel the data collection -- do *not* yet stop the actual load-generating - // network activity. - cancelLGDataCollectionCtx() + uploadThroughputStabilizer := stabilizer.NewThroughputStabilizer(throughputI, K, S, uploadThroughputStabilizerDebugLevel, uploadThroughputStabilizerDebugConfig) - // Shutdown the foreign-connection prober! + probeStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, "Probe Stabilizer") + probeStabilizerDebugLevel := debug.Error if *debugCliFlag { - fmt.Printf("Stopping all foreign probers.\n") + probeStabilizerDebugLevel = debug.Debug } - foreignProberCtxCancel() + probeStabilizer := stabilizer.NewProbeStabilizer(probeI, K, S, probeStabilizerDebugLevel, probeStabilizerDebugConfig) - // Now that we stopped generation, let's give ourselves some time to collect - // all the data from our data generators. - timeoutAbsoluteTime = time.Now(). - Add(time.Second * time.Duration(*rpmtimeout)) - timeoutChannel = timeoutat.TimeoutAt( - operatingCtx, - timeoutAbsoluteTime, - debugLevel, - ) + selfRtts := ms.NewInfiniteMathematicalSeries[float64]() + foreignRtts := ms.NewInfiniteMathematicalSeries[float64]() + + // For later debugging output, record the last throughputs on load-generating connectings + // and the number of open connections. + lastUploadThroughputRate := float64(0) + lastUploadThroughputOpenConnectionCount := int(0) + lastDownloadThroughputRate := float64(0) + lastDownloadThroughputOpenConnectionCount := int(0) - // Now that we have generated the data, let's collect it. - downloadDataCollectionComplete := false - uploadDataCollectionComplete := false - for !(downloadDataCollectionComplete && uploadDataCollectionComplete) { + // Every time that there is a new measurement, the possibility exists that the measurements become unstable. + // This allows us to continue pushing until *everything* is stable at the same time. +timeout: + for !(responsivenessIsStable && downloadThroughputIsStable && uploadThroughputIsStable) { select { - case downloadDataCollectionResult = <-downloadDataCollectionChannel: + + case downloadThroughputMeasurement := <-downloadThroughputChannel: { - downloadDataCollectionComplete = true + downloadThroughputStabilizer.AddMeasurement(downloadThroughputMeasurement) + downloadThroughputIsStable = downloadThroughputStabilizer.IsStable() if *debugCliFlag { fmt.Printf( - "################# download load-generating data collection is complete (%fMBps, %d flows)!\n", - utilities.ToMBps(downloadDataCollectionResult.RateBps), - len(downloadDataCollectionResult.LGCs), - ) + "################# Download is instantaneously %s.\n", utilities.Conditional(downloadThroughputIsStable, "stable", "unstable")) } + downloadThroughputDataLogger.LogRecord(downloadThroughputMeasurement) + + lastDownloadThroughputRate = downloadThroughputMeasurement.Throughput + lastDownloadThroughputOpenConnectionCount = downloadThroughputMeasurement.Connections } - case uploadDataCollectionResult = <-uploadDataCollectionChannel: + + case uploadThroughputMeasurement := <-uploadThroughputChannel: { - uploadDataCollectionComplete = true + uploadThroughputStabilizer.AddMeasurement(uploadThroughputMeasurement) + uploadThroughputIsStable = uploadThroughputStabilizer.IsStable() if *debugCliFlag { fmt.Printf( - "################# upload load-generating data collection is complete (%fMBps, %d flows)!\n", - utilities.ToMBps(uploadDataCollectionResult.RateBps), - len(uploadDataCollectionResult.LGCs), - ) + "################# Upload is instantaneously %s.\n", utilities.Conditional(uploadThroughputIsStable, "stable", "unstable")) } + uploadThroughputDataLogger.LogRecord(uploadThroughputMeasurement) + + lastUploadThroughputRate = uploadThroughputMeasurement.Throughput + lastUploadThroughputOpenConnectionCount = uploadThroughputMeasurement.Connections } - case <-timeoutChannel: + case probeMeasurement := <-probeDataPointsChannel: { - // This is just bad news -- we generated data but could not collect it. Let's just fail. + probeStabilizer.AddMeasurement(probeMeasurement) + + // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy + // is *actually* important, but it can't hurt? + responsivenessIsStable = probeStabilizer.IsStable() - fmt.Fprint( - os.Stderr, - "Error: Load-Generating data collection could not be completed in time and no provisional data could be gathered. Test failed.\n", - ) - return // Ends program + if *debugCliFlag { + fmt.Printf( + "################# Responsiveness is instantaneously %s.\n", utilities.Conditional(responsivenessIsStable, "stable", "unstable")) + } + if probeMeasurement.Type == rpm.Foreign { + for range utilities.Iota(0, int(probeMeasurement.RoundTripCount)) { + foreignRtts.AddElement(probeMeasurement.Duration.Seconds() / float64(probeMeasurement.RoundTripCount)) + + } + } else if probeMeasurement.Type == rpm.Self { + selfRtts.AddElement(probeMeasurement.Duration.Seconds()) + } + + // There may be more than one round trip accumulated together. If that is the case, + // we will blow them apart in to three separate measurements and each one will just + // be 1 / measurement.RoundTripCount of the total length. + + if probeMeasurement.Type == rpm.Foreign { + foreignProbeDataLogger.LogRecord(probeMeasurement) + } else if probeMeasurement.Type == rpm.Self { + selfProbeDataLogger.LogRecord(probeMeasurement) + } + } + case <-timeoutChannel: + { + break timeout } } } - // In the new version we are no longer going to wait to send probes until after - // saturation. When we get here we are now only going to compute the results - // and/or extended statistics! + // Did the test run to stability? + testRanToStability := (downloadThroughputIsStable && uploadThroughputIsStable && responsivenessIsStable) - extendedStats := extendedstats.AggregateExtendedStats{} + if *debugCliFlag { + fmt.Printf("Stopping all the load generating data generators (stability: %s).\n", utilities.Conditional(testRanToStability, "success", "failure")) + } + + /* At this point there are + 1. Load generators running + -- uploadLoadGeneratorOperatorCtx + -- downloadLoadGeneratorOperatorCtx + 2. Network connections opened by those load generators: + -- lgNetworkActivityCtx + 3. Probes + -- proberCtx + */ + + // First, stop the load generators and the probes + proberCtxCancel() + downloadLoadGeneratorOperatorCtxCancel() + uploadLoadGeneratorOperatorCtxCancel() + // Second, calculate the extended stats (if the user requested) + + extendedStats := extendedstats.AggregateExtendedStats{} if *calculateExtendedStats { if extendedstats.ExtendedStatsAvailable() { - for i := 0; i < len(downloadDataCollectionResult.LGCs); i++ { + downloadLoadGeneratingConnectionCollection.Lock.Lock() + for i := 0; i < len(*downloadLoadGeneratingConnectionCollection.LGCs); i++ { // Assume that extended statistics are available -- the check was done explicitly at // program startup if the calculateExtendedStats flag was set by the user on the command line. - if err := extendedStats.IncorporateConnectionStats(downloadDataCollectionResult.LGCs[i].Stats().ConnInfo.Conn); err != nil { + if err := extendedStats.IncorporateConnectionStats((*downloadLoadGeneratingConnectionCollection.LGCs)[i].Stats().ConnInfo.Conn); err != nil { fmt.Fprintf( os.Stderr, "Warning: Could not add extended stats for the connection: %v\n", @@ -473,66 +512,39 @@ func main() { ) } } + downloadLoadGeneratingConnectionCollection.Lock.Unlock() + + // We do not trace upload connections! } else { // TODO: Should we just log here? panic("Extended stats are not available but the user requested their calculation.") } } - // And only now, when we are done getting the extended stats from the connections, can - // we actually shut down the load-generating network activity! - cancelLgNetworkActivityCtx() + // Third, stop the network connections opened by the load generators. + lgNetworkActivityCtxCancel() - fmt.Printf( - "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", - utilities.ToMbps(downloadDataCollectionResult.RateBps), - utilities.ToMBps(downloadDataCollectionResult.RateBps), - len(downloadDataCollectionResult.LGCs), - ) - fmt.Printf( - "Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", - utilities.ToMbps(uploadDataCollectionResult.RateBps), - utilities.ToMBps(uploadDataCollectionResult.RateBps), - len(uploadDataCollectionResult.LGCs), - ) + // Finally, stop the world. + operatingCtxCancel() - foreignProbeDataPoints := utilities.ChannelToSlice(foreignProbeDataPointsChannel) - totalForeignRoundTrips := len(foreignProbeDataPoints) + // Calculate the RPM + + selfProbeRoundTripTimeP90 := selfRtts.Percentile(90) // The specification indicates that we want to calculate the foreign probes as such: // 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign // where tcp_foreign, tls_foreign, http_foreign are the P90 RTTs for the connection // of the tcp, tls and http connections, respectively. However, we cannot break out - // the individual RTTs so we assume that they are roughly equal. Call that _foreign: - // 1/3*_foreign + 1/3*_foreign + 1/3*_foreign = - // 1/3*(3*_foreign) = - // _foreign - // So, there's no need to divide by the number of RTTs defined in the ProbeDataPoints - // in the individual results. - foreignProbeRoundTripTimes := utilities.Fmap( - foreignProbeDataPoints, - func(dp rpm.ProbeDataPoint) float64 { return dp.Duration.Seconds() }, - ) - foreignProbeRoundTripTimeP90 := utilities.CalculatePercentile(foreignProbeRoundTripTimes, 90) - - downloadRoundTripTimes := utilities.Fmap( - downloadDataCollectionResult.ProbeDataPoints, - func(dcr rpm.ProbeDataPoint) float64 { return dcr.Duration.Seconds() }, - ) - uploadRoundTripTimes := utilities.Fmap( - uploadDataCollectionResult.ProbeDataPoints, - func(dcr rpm.ProbeDataPoint) float64 { return dcr.Duration.Seconds() }, - ) - selfProbeRoundTripTimes := append(downloadRoundTripTimes, uploadRoundTripTimes...) - totalSelfRoundTrips := len(selfProbeRoundTripTimes) - selfProbeRoundTripTimeP90 := utilities.CalculatePercentile(selfProbeRoundTripTimes, 90) + // the individual RTTs so we assume that they are roughly equal. The good news is that + // we already did that roughly-equal split up when we added them to the foreignRtts IMS. + foreignProbeRoundTripTimeP90 := foreignRtts.Percentile(90) rpm := 60.0 / (float64(selfProbeRoundTripTimeP90+foreignProbeRoundTripTimeP90) / 2.0) if *debugCliFlag { fmt.Printf( "Total Load-Generating Round Trips: %d, Total New-Connection Round Trips: %d, P90 LG RTT: %f, P90 NC RTT: %f\n", - totalSelfRoundTrips, - totalForeignRoundTrips, + selfRtts.Size(), + foreignRtts.Size(), selfProbeRoundTripTimeP90, foreignProbeRoundTripTimeP90, ) @@ -544,42 +556,47 @@ func main() { fmt.Println(extendedStats.Repr()) } - if !utilities.IsInterfaceNil(selfDataLogger) { - selfDataLogger.Export() - if *debugCliFlag { - fmt.Printf("Closing the self data logger.\n") - } - selfDataLogger.Close() + selfProbeDataLogger.Export() + if *debugCliFlag { + fmt.Printf("Closing the self data logger.\n") } + selfProbeDataLogger.Close() - if !utilities.IsInterfaceNil(foreignDataLogger) { - foreignDataLogger.Export() - if *debugCliFlag { - fmt.Printf("Closing the foreign data logger.\n") - } - foreignDataLogger.Close() + foreignProbeDataLogger.Export() + if *debugCliFlag { + fmt.Printf("Closing the foreign data logger.\n") } + foreignProbeDataLogger.Close() - if !utilities.IsInterfaceNil(downloadThroughputDataLogger) { - downloadThroughputDataLogger.Export() - if *debugCliFlag { - fmt.Printf("Closing the download throughput data logger.\n") - } - downloadThroughputDataLogger.Close() + downloadThroughputDataLogger.Export() + if *debugCliFlag { + fmt.Printf("Closing the download throughput data logger.\n") } + downloadThroughputDataLogger.Close() - if !utilities.IsInterfaceNil(uploadThroughputDataLogger) { - uploadThroughputDataLogger.Export() - if *debugCliFlag { - fmt.Printf("Closing the upload throughput data logger.\n") - } - uploadThroughputDataLogger.Close() + uploadThroughputDataLogger.Export() + if *debugCliFlag { + fmt.Printf("Closing the upload throughput data logger.\n") } + uploadThroughputDataLogger.Close() + + fmt.Printf( + "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", + utilities.ToMbps(lastDownloadThroughputRate), + utilities.ToMBps(lastDownloadThroughputRate), + lastDownloadThroughputOpenConnectionCount, + ) + fmt.Printf( + "Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", + utilities.ToMbps(lastUploadThroughputRate), + utilities.ToMBps(lastUploadThroughputRate), + lastUploadThroughputOpenConnectionCount, + ) - cancelOperatingCtx() if *debugCliFlag { fmt.Printf("In debugging mode, we will cool down.\n") time.Sleep(constants.CooldownPeriod) fmt.Printf("Done cooling down.\n") } + } @@ -25,7 +25,6 @@ import ( "time" "github.com/network-quality/goresponsiveness/constants" - "github.com/network-quality/goresponsiveness/datalogger" "github.com/network-quality/goresponsiveness/debug" "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" @@ -38,16 +37,18 @@ import ( func addFlows( ctx context.Context, toAdd uint64, - lgcs *[]lgc.LoadGeneratingConnection, + lgcc *lgc.LoadGeneratingConnectionCollection, lgcGenerator func() lgc.LoadGeneratingConnection, debug debug.DebugLevel, ) { + lgcc.Lock.Lock() + defer lgcc.Lock.Unlock() for i := uint64(0); i < toAdd; i++ { - *lgcs = append(*lgcs, lgcGenerator()) - if !(*lgcs)[len(*lgcs)-1].Start(ctx, debug) { + *lgcc.LGCs = append(*lgcc.LGCs, lgcGenerator()) + if !(*lgcc.LGCs)[len(*lgcc.LGCs)-1].Start(ctx, debug) { fmt.Printf( "Error starting lgc with id %d!\n", - (*lgcs)[len(*lgcs)-1].ClientId(), + (*lgcc.LGCs)[len(*lgcc.LGCs)-1].ClientId(), ) return } @@ -55,8 +56,7 @@ func addFlows( } type ProbeConfiguration struct { - URL string - DataLogger datalogger.DataLogger[ProbeDataPoint] + URL string } type ProbeDataPoint struct { @@ -65,6 +65,7 @@ type ProbeDataPoint struct { Duration time.Duration `Description:"The duration for this measurement." Formatter:"Seconds"` TCPRtt time.Duration `Description:"The underlying connection's RTT at probe time." Formatter:"Seconds"` TCPCwnd uint32 `Description:"The underlying connection's congestion window at probe time."` + Type ProbeType `Description:"The type of the probe." Formatter:"Value"` } type ThroughputDataPoint struct { @@ -97,7 +98,6 @@ func (pt ProbeType) Value() string { func Probe( parentProbeCtx context.Context, waitGroup *sync.WaitGroup, - logger datalogger.DataLogger[ProbeDataPoint], client *http.Client, probeUrl string, probeType ProbeType, @@ -111,7 +111,7 @@ func Probe( } if client == nil { - return fmt.Errorf("Cannot start a probe with a nil client") + return fmt.Errorf("cannot start a probe with a nil client") } probeId := utilities.GenerateUniqueId() @@ -182,7 +182,7 @@ func Probe( if probeType == Foreign { roundTripCount = 3 } - // TODO: Careful!!! It's possible that this channel has been closed because the Prober that + // Careful!!! It's possible that this channel has been closed because the Prober that // started it has been stopped. Writing to a closed channel will cause a panic. It might not // matter because a panic just stops the go thread containing the paniced code and we are in // a go thread that executes only this function. @@ -200,6 +200,7 @@ func Probe( }() tcpRtt := time.Duration(0 * time.Second) tcpCwnd := uint32(0) + // TODO: Only get the extended stats for a connection if the user has requested them overall. if extendedstats.ExtendedStatsAvailable() { tcpInfo, err := extendedstats.GetTCPInfo(probeTracer.stats.ConnInfo.Conn) if err == nil { @@ -215,9 +216,7 @@ func Probe( Duration: totalDelay, TCPRtt: tcpRtt, TCPCwnd: tcpCwnd, - } - if !utilities.IsInterfaceNil(logger) { - logger.LogRecord(dataPoint) + Type: probeType, } *result <- dataPoint return nil @@ -231,13 +230,17 @@ func CombinedProber( probeInterval time.Duration, keyLogger io.Writer, debugging *debug.DebugWithPrefix, -) (points chan ProbeDataPoint) { - points = make(chan ProbeDataPoint) +) (dataPoints chan ProbeDataPoint) { + + // Make a channel to send back all the generated data points + // when we are probing. + dataPoints = make(chan ProbeDataPoint) go func() { wg := sync.WaitGroup{} probeCount := 0 + // As long as our context says that we can continue to probe! for proberCtx.Err() == nil { time.Sleep(probeInterval) @@ -247,9 +250,9 @@ func CombinedProber( if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) About to send of the %d round of probes!\n", + "(%s) About to send round %d of probes!\n", debugging.Prefix, - probeCount, + probeCount+1, ) } transport := http2.Transport{} @@ -273,28 +276,37 @@ func CombinedProber( } transport.TLSClientConfig.InsecureSkipVerify = true - client := &http.Client{Transport: &transport} + foreignProbeClient := &http.Client{Transport: &transport} probeCount++ go Probe( proberCtx, &wg, - foreignProbeConfiguration.DataLogger, - client, + foreignProbeClient, foreignProbeConfiguration.URL, Foreign, - &points, + &dataPoints, debugging, ) go Probe( proberCtx, &wg, - selfProbeConfiguration.DataLogger, selfProbeConnection.Client(), selfProbeConfiguration.URL, - Self, - &points, + SelfDown, + &dataPoints, + debugging, + ) + + // Start Upload Connection Prober + go Probe( + proberCtx, + &wg, + selfUpProbeConnection.Client(), + selfProbeConfiguration.URL, + SelfUp, + &dataPoints, debugging, ) } @@ -311,22 +323,20 @@ func CombinedProber( debugging.Prefix, ) } - close(points) + close(dataPoints) }() return } func LoadGenerator( networkActivityCtx context.Context, // Create all network connections in this context. - saturationCtx context.Context, // Continue logging, but stop adding flows when this context is canceled! loadGeneratorCtx context.Context, // Stop our activity when we no longer need to generate load. rampupInterval time.Duration, lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection. loadGeneratingConnections *lgc.LoadGeneratingConnectionCollection, debugging *debug.DebugWithPrefix, // How can we forget debugging? -) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to use for self probes. +) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes. throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate. - lgcs []lgc.LoadGeneratingConnection, // The caller will want to look at this if they are interested in doing extended stats. ) { throughputCalculations = make(chan ThroughputDataPoint) @@ -334,25 +344,18 @@ func LoadGenerator( // be read by the caller. We don't want to wait around until they are ready before we start doing our work. // So, we'll make it buffered. probeConnectionCommunicationChannel = make(chan lgc.LoadGeneratingConnection, 1) - lgcs = make([]lgc.LoadGeneratingConnection, 0) go func() { - isSaturated := false - - lgcs := make([]lgc.LoadGeneratingConnection, 0) - flowsCreated := uint64(0) - loadGeneratingConnections.Lock.Lock() addFlows( networkActivityCtx, constants.StartingNumberOfLoadGeneratingConnections, - loadGeneratingConnections.LGCs, + loadGeneratingConnections, lgcGenerator, debugging.Level, ) - loadGeneratingConnections.Lock.Unlock() flowsCreated += constants.StartingNumberOfLoadGeneratingConnections // We have at least a single load-generating channel. This channel will be the one that @@ -363,20 +366,11 @@ func LoadGenerator( for currentInterval := uint64(0); true; currentInterval++ { - // If the operationalCtx is canceled, then that means our work here is done ... + // If the loadGeneratorCtx is canceled, then that means our work here is done ... if loadGeneratorCtx.Err() != nil { break } - if saturationCtx.Err() != nil { - isSaturated = true - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Received the saturated signal; continuing only to log from now on.\n", - debugging, - ) - } - } now := time.Now() // At each 1-second interval if nextSampleStartTime.Sub(now) > 0 { @@ -435,26 +429,14 @@ func LoadGenerator( throughputDataPoint := ThroughputDataPoint{time.Now(), instantaneousTotalThroughput, len(*loadGeneratingConnections.LGCs)} throughputCalculations <- throughputDataPoint - // Log that, if we are configured for logging. - if !utilities.IsInterfaceNil(throughputDataLogger) { - throughputDataLogger.LogRecord(throughputDataPoint) - } - - // We don't actually want to create a new connection if we are saturated! - if isSaturated { - continue - } - // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now! - loadGeneratingConnections.Lock.Lock() addFlows( networkActivityCtx, constants.AdditiveNumberOfLoadGeneratingConnections, - loadGeneratingConnections.LGCs, + loadGeneratingConnections, lgcGenerator, debugging.Level, ) - loadGeneratingConnections.Lock.Unlock() flowsCreated += constants.AdditiveNumberOfLoadGeneratingConnections } @@ -670,17 +652,17 @@ func (probe *ProbeTracer) SetGotConnTimeInfo( os.Stderr, "A self probe sent used a new connection!\n", ) - } else if debug.IsDebug(probe.debug) { - fmt.Printf("Properly reused a connection when doing a self probe!\n") } - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) Got a reused connection for Probe %v at %v with info %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.GetConnectionDoneTime, - probe.stats.ConnInfo, - ) + if gotConnInfo.Reused { + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(%s Probe) Got a reused connection for Probe %v at %v with info %v\n", + probe.probeType.Value(), + probe.ProbeId(), + probe.stats.GetConnectionDoneTime, + probe.stats.ConnInfo, + ) + } } } |
