diff options
Diffstat (limited to 'networkQuality.go')
| -rw-r--r-- | networkQuality.go | 181 |
1 files changed, 127 insertions, 54 deletions
diff --git a/networkQuality.go b/networkQuality.go index 8cbf148..de10d54 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -94,13 +94,21 @@ func main() { timeoutDuration := time.Second * time.Duration(*sattimeout) timeoutAbsoluteTime := time.Now().Add(timeoutDuration) configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort) + + // This is the overall operating context of the program. All other + // contexts descend from this one. Canceling this one cancels all + // the others. operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background()) - lgDataCollectionCtx, cancelLGDataCollectionCtx := context.WithCancel( - context.Background(), - ) - foreignProbertCtx, foreignProberCtxCancel := context.WithCancel( - context.Background(), - ) + + // + lgDataCollectionCtx, cancelLGDataCollectionCtx := context.WithCancel(operatingCtx) + + // This context is used to control the load-generating network activity (i.e., all + // the connections that are open to do load generation). + lgNetworkActivityCtx, cancelLgNetworkActivityCtx := context.WithCancel(operatingCtx) + + // This context is used to control the activity of the foreign prober. + foreignProbertCtx, foreignProberCtxCancel := context.WithCancel(operatingCtx) config := &config.Config{} var debugLevel debug.DebugLevel = debug.Error @@ -253,7 +261,7 @@ func main() { /* * Create (and then, ironically, name) two anonymous functions that, when invoked, - * will create load-generating connections for upload/download/ + * will create load-generating connections for upload/download */ generate_lgd := func() lgc.LoadGeneratingConnection { return &lgc.LoadGeneratingConnectionDownload{ @@ -292,16 +300,18 @@ func main() { // data collection go routines stops well before the other, they will continue to send probes and we can // generate additional information! - downloadDataCollectionChannel := rpm.LGCollectData( + downloadSaturationComplete, downloadDataCollectionChannel := rpm.LGCollectData( lgDataCollectionCtx, + lgNetworkActivityCtx, operatingCtx, generate_lgd, generateSelfProbeConfiguration, downloadThroughputDataLogger, downloadDebugging, ) - uploadDataCollectionChannel := rpm.LGCollectData( + uploadSaturationComplete, uploadDataCollectionChannel := rpm.LGCollectData( lgDataCollectionCtx, + lgNetworkActivityCtx, operatingCtx, generate_lgu, generateSelfProbeConfiguration, @@ -317,43 +327,29 @@ func main() { ) dataCollectionTimeout := false - uploadDataCollectionComplete := false - downloadDataCollectionComplete := false + uploadDataGenerationComplete := false + downloadDataGenerationComplete := false downloadDataCollectionResult := rpm.SelfDataCollectionResult{} uploadDataCollectionResult := rpm.SelfDataCollectionResult{} - for !(uploadDataCollectionComplete && downloadDataCollectionComplete) { + for !(uploadDataGenerationComplete && downloadDataGenerationComplete) { select { - case downloadDataCollectionResult = <-downloadDataCollectionChannel: + case fullyComplete := <-downloadSaturationComplete: { - downloadDataCollectionComplete = true + downloadDataGenerationComplete = true if *debugCliFlag { fmt.Printf( - "################# download load-generating data collection is %s complete (%fMBps, %d flows)!\n", - utilities.Conditional( - dataCollectionTimeout, - "(provisionally)", - "", - ), - utilities.ToMBps(downloadDataCollectionResult.RateBps), - len(downloadDataCollectionResult.LGCs), - ) + "################# download load-generating data generation is %s complete!\n", + utilities.Conditional(fullyComplete, "", "(provisionally)")) } } - case uploadDataCollectionResult = <-uploadDataCollectionChannel: + case fullyComplete := <-uploadSaturationComplete: { - uploadDataCollectionComplete = true + uploadDataGenerationComplete = true if *debugCliFlag { fmt.Printf( - "################# upload load-generating data collection is %s complete (%fMBps, %d flows)!\n", - utilities.Conditional( - dataCollectionTimeout, - "(provisionally)", - "", - ), - utilities.ToMBps(uploadDataCollectionResult.RateBps), - len(uploadDataCollectionResult.LGCs), - ) + "################# upload load-generating data generation is %s complete!\n", + utilities.Conditional(fullyComplete, "", "(provisionally)")) } } case <-timeoutChannel: @@ -370,15 +366,15 @@ func main() { if *debugCliFlag { time.Sleep(constants.CooldownPeriod) } - return + return // Ends program } dataCollectionTimeout = true // We timed out attempting to collect data about the link. So, we will - // shut down all the collection xfers + // shut down the generators cancelLGDataCollectionCtx() // and then we will give ourselves some additional time in order - // to complete provisional data collection. + // to see if we can get some provisional data. timeoutAbsoluteTime = time.Now(). Add(time.Second * time.Duration(*rpmtimeout)) timeoutChannel = timeoutat.TimeoutAt( @@ -395,31 +391,98 @@ func main() { } } - // Shutdown the new-connection prober! + if *debugCliFlag { + fmt.Printf("Stopping all the load generating data generators.\n") + } + // Just cancel the data collection -- do *not* yet stop the actual load-generating + // network activity. + cancelLGDataCollectionCtx() + + // Shutdown the foreign-connection prober! + if *debugCliFlag { + fmt.Printf("Stopping all foreign probers.\n") + } foreignProberCtxCancel() + // Now that we stopped generation, let's give ourselves some time to collect + // all the data from our data generators. + timeoutAbsoluteTime = time.Now(). + Add(time.Second * time.Duration(*rpmtimeout)) + timeoutChannel = timeoutat.TimeoutAt( + operatingCtx, + timeoutAbsoluteTime, + debugLevel, + ) + + // Now that we have generated the data, let's collect it. + downloadDataCollectionComplete := false + uploadDataCollectionComplete := false + for !(downloadDataCollectionComplete && uploadDataCollectionComplete) { + select { + case downloadDataCollectionResult = <-downloadDataCollectionChannel: + { + downloadDataCollectionComplete = true + if *debugCliFlag { + fmt.Printf( + "################# download load-generating data collection is complete (%fMBps, %d flows)!\n", + utilities.ToMBps(downloadDataCollectionResult.RateBps), + len(downloadDataCollectionResult.LGCs), + ) + } + } + case uploadDataCollectionResult = <-uploadDataCollectionChannel: + { + uploadDataCollectionComplete = true + if *debugCliFlag { + fmt.Printf( + "################# upload load-generating data collection is complete (%fMBps, %d flows)!\n", + utilities.ToMBps(uploadDataCollectionResult.RateBps), + len(uploadDataCollectionResult.LGCs), + ) + } + } + case <-timeoutChannel: + { + // This is just bad news -- we generated data but could not collect it. Let's just fail. + + fmt.Fprint( + os.Stderr, + "Error: Load-Generating data collection could not be completed in time and no provisional data could be gathered. Test failed.\n", + ) + return // Ends program + } + } + } + // In the new version we are no longer going to wait to send probes until after // saturation. When we get here we are now only going to compute the results // and/or extended statistics! - extendedStats := extendedstats.ExtendedStats{} + extendedStats := extendedstats.AggregateExtendedStats{} - for i := 0; i < len(downloadDataCollectionResult.LGCs); i++ { - // Assume that extended statistics are available -- the check was done explicitly at - // program startup if the calculateExtendedStats flag was set by the user on the command line. - if *calculateExtendedStats { - if !extendedstats.ExtendedStatsAvailable() { - panic("Extended stats are not available but the user requested their calculation.") - } - if err := extendedStats.IncorporateConnectionStats(downloadDataCollectionResult.LGCs[i].Stats().ConnInfo.Conn); err != nil { - fmt.Fprintf( - os.Stderr, - "Warning: Could not add extended stats for the connection: %v\n", - err, - ) + if *calculateExtendedStats { + if extendedstats.ExtendedStatsAvailable() { + for i := 0; i < len(downloadDataCollectionResult.LGCs); i++ { + // Assume that extended statistics are available -- the check was done explicitly at + // program startup if the calculateExtendedStats flag was set by the user on the command line. + if err := extendedStats.IncorporateConnectionStats(downloadDataCollectionResult.LGCs[i].Stats().ConnInfo.Conn); err != nil { + fmt.Fprintf( + os.Stderr, + "Warning: Could not add extended stats for the connection: %v\n", + err, + ) + } } + } else { + // TODO: Should we just log here? + panic("Extended stats are not available but the user requested their calculation.") } } + + // And only now, when we are done getting the extended stats from the connections, can + // we actually shut down the load-generating network activity! + cancelLgNetworkActivityCtx() + fmt.Printf( "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", utilities.ToMbps(downloadDataCollectionResult.RateBps), @@ -435,6 +498,16 @@ func main() { foreignProbeDataPoints := utilities.ChannelToSlice(foreignProbeDataPointsChannel) totalForeignRoundTrips := len(foreignProbeDataPoints) + // The specification indicates that we want to calculate the foreign probes as such: + // 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign + // where tcp_foreign, tls_foreign, http_foreign are the P90 RTTs for the connection + // of the tcp, tls and http connections, respectively. However, we cannot break out + // the individual RTTs so we assume that they are roughly equal. Call that _foreign: + // 1/3*_foreign + 1/3*_foreign + 1/3*_foreign = + // 1/3*(3*_foreign) = + // _foreign + // So, there's no need to divide by the number of RTTs defined in the ProbeDataPoints + // in the individual results. foreignProbeRoundTripTimes := utilities.Fmap( foreignProbeDataPoints, func(dp rpm.ProbeDataPoint) float64 { return dp.Duration.Seconds() }, @@ -442,11 +515,11 @@ func main() { foreignProbeRoundTripTimeP90 := utilities.CalculatePercentile(foreignProbeRoundTripTimes, 90) downloadRoundTripTimes := utilities.Fmap( - downloadDataCollectionResult.DataPoints, + downloadDataCollectionResult.ProbeDataPoints, func(dcr rpm.ProbeDataPoint) float64 { return dcr.Duration.Seconds() }, ) uploadRoundTripTimes := utilities.Fmap( - uploadDataCollectionResult.DataPoints, + uploadDataCollectionResult.ProbeDataPoints, func(dcr rpm.ProbeDataPoint) float64 { return dcr.Duration.Seconds() }, ) selfProbeRoundTripTimes := append(downloadRoundTripTimes, uploadRoundTripTimes...) |
