diff options
| author | Jeroen Schickendantz <[email protected]> | 2022-08-22 11:40:59 -0400 |
|---|---|---|
| committer | Will Hawkins <[email protected]> | 2022-09-30 23:43:23 -0400 |
| commit | cc59a89fc3ac1fbdb16e417de01beecbd49616f0 (patch) | |
| tree | b4456e927c12a42c986e825cd265155d9e6bab8b | |
| parent | a4cabcf65b099b67747569b33fe43c345ea317ad (diff) | |
[Feature] Extend Throughput Logging
Extends throughput logging to continue even after saturation has been reached by the algorithm.
| -rw-r--r-- | extendedstats/darwin.go | 6 | ||||
| -rw-r--r-- | extendedstats/unix.go | 6 | ||||
| -rw-r--r-- | extendedstats/windows.go | 6 | ||||
| -rw-r--r-- | networkQuality.go | 181 | ||||
| -rw-r--r-- | rpm/rpm.go | 80 |
5 files changed, 195 insertions, 84 deletions
diff --git a/extendedstats/darwin.go b/extendedstats/darwin.go index 0f52be5..17298c9 100644 --- a/extendedstats/darwin.go +++ b/extendedstats/darwin.go @@ -12,7 +12,7 @@ import ( "golang.org/x/sys/unix" ) -type ExtendedStats struct { +type AggregateExtendedStats struct { Maxseg uint64 MaxSendMss uint64 MaxRecvMss uint64 @@ -38,7 +38,7 @@ type TCPInfo struct { Snd_cwnd uint32 } -func (es *ExtendedStats) IncorporateConnectionStats(basicConn net.Conn) error { +func (es *AggregateExtendedStats) IncorporateConnectionStats(basicConn net.Conn) error { if info, err := GetTCPInfo(basicConn); err != nil { return fmt.Errorf("OOPS: Could not get the TCP info for the connection: %v", err) } else { @@ -54,7 +54,7 @@ func (es *ExtendedStats) IncorporateConnectionStats(basicConn net.Conn) error { return nil } -func (es *ExtendedStats) Repr() string { +func (es *AggregateExtendedStats) Repr() string { return fmt.Sprintf(`Extended Statistics: Maximum Segment Size: %v Total Bytes Retransmitted: %v diff --git a/extendedstats/unix.go b/extendedstats/unix.go index 3db94fc..270a956 100644 --- a/extendedstats/unix.go +++ b/extendedstats/unix.go @@ -12,7 +12,7 @@ import ( "golang.org/x/sys/unix" ) -type ExtendedStats struct { +type AggregateExtendedStats struct { MaxPathMtu uint64 MaxSendMss uint64 MaxRecvMss uint64 @@ -27,7 +27,7 @@ func ExtendedStatsAvailable() bool { return true } -func (es *ExtendedStats) IncorporateConnectionStats(basicConn net.Conn) error { +func (es *AggregateExtendedStats) IncorporateConnectionStats(basicConn net.Conn) error { if info, err := GetTCPInfo(basicConn); err != nil { return fmt.Errorf("OOPS: Could not get the TCP info for the connection: %v", err) } else { @@ -44,7 +44,7 @@ func (es *ExtendedStats) IncorporateConnectionStats(basicConn net.Conn) error { return nil } -func (es *ExtendedStats) Repr() string { +func (es *AggregateExtendedStats) Repr() string { return fmt.Sprintf(`Extended Statistics: Maximum Path MTU: %v Maximum Send MSS: %v diff --git a/extendedstats/windows.go b/extendedstats/windows.go index 5c0fbd9..f38ffbd 100644 --- a/extendedstats/windows.go +++ b/extendedstats/windows.go @@ -13,7 +13,7 @@ import ( "golang.org/x/sys/windows" ) -type ExtendedStats struct { +type AggregateExtendedStats struct { MaxMss uint64 TotalBytesSent uint64 TotalBytesReceived uint64 @@ -106,7 +106,7 @@ func ExtendedStatsAvailable() bool { return true } -func (es *ExtendedStats) IncorporateConnectionStats(basicConn net.Conn) error { +func (es *AggregateExtendedStats) IncorporateConnectionStats(basicConn net.Conn) error { if info, err := getTCPInfoRaw(basicConn); err != nil { return fmt.Errorf("OOPS: Could not get the TCP info for the connection: %v", err) } else { @@ -124,7 +124,7 @@ func (es *ExtendedStats) IncorporateConnectionStats(basicConn net.Conn) error { return nil } -func (es *ExtendedStats) Repr() string { +func (es *AggregateExtendedStats) Repr() string { return fmt.Sprintf(`Extended Statistics: Maximum Segment Size: %v Total Bytes Retransmitted: %v diff --git a/networkQuality.go b/networkQuality.go index 8cbf148..de10d54 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -94,13 +94,21 @@ func main() { timeoutDuration := time.Second * time.Duration(*sattimeout) timeoutAbsoluteTime := time.Now().Add(timeoutDuration) configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort) + + // This is the overall operating context of the program. All other + // contexts descend from this one. Canceling this one cancels all + // the others. operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background()) - lgDataCollectionCtx, cancelLGDataCollectionCtx := context.WithCancel( - context.Background(), - ) - foreignProbertCtx, foreignProberCtxCancel := context.WithCancel( - context.Background(), - ) + + // + lgDataCollectionCtx, cancelLGDataCollectionCtx := context.WithCancel(operatingCtx) + + // This context is used to control the load-generating network activity (i.e., all + // the connections that are open to do load generation). + lgNetworkActivityCtx, cancelLgNetworkActivityCtx := context.WithCancel(operatingCtx) + + // This context is used to control the activity of the foreign prober. + foreignProbertCtx, foreignProberCtxCancel := context.WithCancel(operatingCtx) config := &config.Config{} var debugLevel debug.DebugLevel = debug.Error @@ -253,7 +261,7 @@ func main() { /* * Create (and then, ironically, name) two anonymous functions that, when invoked, - * will create load-generating connections for upload/download/ + * will create load-generating connections for upload/download */ generate_lgd := func() lgc.LoadGeneratingConnection { return &lgc.LoadGeneratingConnectionDownload{ @@ -292,16 +300,18 @@ func main() { // data collection go routines stops well before the other, they will continue to send probes and we can // generate additional information! - downloadDataCollectionChannel := rpm.LGCollectData( + downloadSaturationComplete, downloadDataCollectionChannel := rpm.LGCollectData( lgDataCollectionCtx, + lgNetworkActivityCtx, operatingCtx, generate_lgd, generateSelfProbeConfiguration, downloadThroughputDataLogger, downloadDebugging, ) - uploadDataCollectionChannel := rpm.LGCollectData( + uploadSaturationComplete, uploadDataCollectionChannel := rpm.LGCollectData( lgDataCollectionCtx, + lgNetworkActivityCtx, operatingCtx, generate_lgu, generateSelfProbeConfiguration, @@ -317,43 +327,29 @@ func main() { ) dataCollectionTimeout := false - uploadDataCollectionComplete := false - downloadDataCollectionComplete := false + uploadDataGenerationComplete := false + downloadDataGenerationComplete := false downloadDataCollectionResult := rpm.SelfDataCollectionResult{} uploadDataCollectionResult := rpm.SelfDataCollectionResult{} - for !(uploadDataCollectionComplete && downloadDataCollectionComplete) { + for !(uploadDataGenerationComplete && downloadDataGenerationComplete) { select { - case downloadDataCollectionResult = <-downloadDataCollectionChannel: + case fullyComplete := <-downloadSaturationComplete: { - downloadDataCollectionComplete = true + downloadDataGenerationComplete = true if *debugCliFlag { fmt.Printf( - "################# download load-generating data collection is %s complete (%fMBps, %d flows)!\n", - utilities.Conditional( - dataCollectionTimeout, - "(provisionally)", - "", - ), - utilities.ToMBps(downloadDataCollectionResult.RateBps), - len(downloadDataCollectionResult.LGCs), - ) + "################# download load-generating data generation is %s complete!\n", + utilities.Conditional(fullyComplete, "", "(provisionally)")) } } - case uploadDataCollectionResult = <-uploadDataCollectionChannel: + case fullyComplete := <-uploadSaturationComplete: { - uploadDataCollectionComplete = true + uploadDataGenerationComplete = true if *debugCliFlag { fmt.Printf( - "################# upload load-generating data collection is %s complete (%fMBps, %d flows)!\n", - utilities.Conditional( - dataCollectionTimeout, - "(provisionally)", - "", - ), - utilities.ToMBps(uploadDataCollectionResult.RateBps), - len(uploadDataCollectionResult.LGCs), - ) + "################# upload load-generating data generation is %s complete!\n", + utilities.Conditional(fullyComplete, "", "(provisionally)")) } } case <-timeoutChannel: @@ -370,15 +366,15 @@ func main() { if *debugCliFlag { time.Sleep(constants.CooldownPeriod) } - return + return // Ends program } dataCollectionTimeout = true // We timed out attempting to collect data about the link. So, we will - // shut down all the collection xfers + // shut down the generators cancelLGDataCollectionCtx() // and then we will give ourselves some additional time in order - // to complete provisional data collection. + // to see if we can get some provisional data. timeoutAbsoluteTime = time.Now(). Add(time.Second * time.Duration(*rpmtimeout)) timeoutChannel = timeoutat.TimeoutAt( @@ -395,31 +391,98 @@ func main() { } } - // Shutdown the new-connection prober! + if *debugCliFlag { + fmt.Printf("Stopping all the load generating data generators.\n") + } + // Just cancel the data collection -- do *not* yet stop the actual load-generating + // network activity. + cancelLGDataCollectionCtx() + + // Shutdown the foreign-connection prober! + if *debugCliFlag { + fmt.Printf("Stopping all foreign probers.\n") + } foreignProberCtxCancel() + // Now that we stopped generation, let's give ourselves some time to collect + // all the data from our data generators. + timeoutAbsoluteTime = time.Now(). + Add(time.Second * time.Duration(*rpmtimeout)) + timeoutChannel = timeoutat.TimeoutAt( + operatingCtx, + timeoutAbsoluteTime, + debugLevel, + ) + + // Now that we have generated the data, let's collect it. + downloadDataCollectionComplete := false + uploadDataCollectionComplete := false + for !(downloadDataCollectionComplete && uploadDataCollectionComplete) { + select { + case downloadDataCollectionResult = <-downloadDataCollectionChannel: + { + downloadDataCollectionComplete = true + if *debugCliFlag { + fmt.Printf( + "################# download load-generating data collection is complete (%fMBps, %d flows)!\n", + utilities.ToMBps(downloadDataCollectionResult.RateBps), + len(downloadDataCollectionResult.LGCs), + ) + } + } + case uploadDataCollectionResult = <-uploadDataCollectionChannel: + { + uploadDataCollectionComplete = true + if *debugCliFlag { + fmt.Printf( + "################# upload load-generating data collection is complete (%fMBps, %d flows)!\n", + utilities.ToMBps(uploadDataCollectionResult.RateBps), + len(uploadDataCollectionResult.LGCs), + ) + } + } + case <-timeoutChannel: + { + // This is just bad news -- we generated data but could not collect it. Let's just fail. + + fmt.Fprint( + os.Stderr, + "Error: Load-Generating data collection could not be completed in time and no provisional data could be gathered. Test failed.\n", + ) + return // Ends program + } + } + } + // In the new version we are no longer going to wait to send probes until after // saturation. When we get here we are now only going to compute the results // and/or extended statistics! - extendedStats := extendedstats.ExtendedStats{} + extendedStats := extendedstats.AggregateExtendedStats{} - for i := 0; i < len(downloadDataCollectionResult.LGCs); i++ { - // Assume that extended statistics are available -- the check was done explicitly at - // program startup if the calculateExtendedStats flag was set by the user on the command line. - if *calculateExtendedStats { - if !extendedstats.ExtendedStatsAvailable() { - panic("Extended stats are not available but the user requested their calculation.") - } - if err := extendedStats.IncorporateConnectionStats(downloadDataCollectionResult.LGCs[i].Stats().ConnInfo.Conn); err != nil { - fmt.Fprintf( - os.Stderr, - "Warning: Could not add extended stats for the connection: %v\n", - err, - ) + if *calculateExtendedStats { + if extendedstats.ExtendedStatsAvailable() { + for i := 0; i < len(downloadDataCollectionResult.LGCs); i++ { + // Assume that extended statistics are available -- the check was done explicitly at + // program startup if the calculateExtendedStats flag was set by the user on the command line. + if err := extendedStats.IncorporateConnectionStats(downloadDataCollectionResult.LGCs[i].Stats().ConnInfo.Conn); err != nil { + fmt.Fprintf( + os.Stderr, + "Warning: Could not add extended stats for the connection: %v\n", + err, + ) + } } + } else { + // TODO: Should we just log here? + panic("Extended stats are not available but the user requested their calculation.") } } + + // And only now, when we are done getting the extended stats from the connections, can + // we actually shut down the load-generating network activity! + cancelLgNetworkActivityCtx() + fmt.Printf( "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", utilities.ToMbps(downloadDataCollectionResult.RateBps), @@ -435,6 +498,16 @@ func main() { foreignProbeDataPoints := utilities.ChannelToSlice(foreignProbeDataPointsChannel) totalForeignRoundTrips := len(foreignProbeDataPoints) + // The specification indicates that we want to calculate the foreign probes as such: + // 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign + // where tcp_foreign, tls_foreign, http_foreign are the P90 RTTs for the connection + // of the tcp, tls and http connections, respectively. However, we cannot break out + // the individual RTTs so we assume that they are roughly equal. Call that _foreign: + // 1/3*_foreign + 1/3*_foreign + 1/3*_foreign = + // 1/3*(3*_foreign) = + // _foreign + // So, there's no need to divide by the number of RTTs defined in the ProbeDataPoints + // in the individual results. foreignProbeRoundTripTimes := utilities.Fmap( foreignProbeDataPoints, func(dp rpm.ProbeDataPoint) float64 { return dp.Duration.Seconds() }, @@ -442,11 +515,11 @@ func main() { foreignProbeRoundTripTimeP90 := utilities.CalculatePercentile(foreignProbeRoundTripTimes, 90) downloadRoundTripTimes := utilities.Fmap( - downloadDataCollectionResult.DataPoints, + downloadDataCollectionResult.ProbeDataPoints, func(dcr rpm.ProbeDataPoint) float64 { return dcr.Duration.Seconds() }, ) uploadRoundTripTimes := utilities.Fmap( - uploadDataCollectionResult.DataPoints, + uploadDataCollectionResult.ProbeDataPoints, func(dcr rpm.ProbeDataPoint) float64 { return dcr.Duration.Seconds() }, ) selfProbeRoundTripTimes := append(downloadRoundTripTimes, uploadRoundTripTimes...) @@ -75,9 +75,10 @@ type ThroughputDataPoint struct { } type SelfDataCollectionResult struct { - RateBps float64 - LGCs []lgc.LoadGeneratingConnection - DataPoints []ProbeDataPoint + RateBps float64 + LGCs []lgc.LoadGeneratingConnection + ProbeDataPoints []ProbeDataPoint + LoggingContinuation func() } type ProbeType int64 @@ -312,6 +313,7 @@ func SelfProber( debugging = debug.NewDebugWithPrefix(debugging.Level, debugging.Prefix+" self probe") go func() { + wg := sync.WaitGroup{} probeCount := 0 for proberCtx.Err() == nil { time.Sleep(selfProbeConfiguration.Interval) @@ -329,7 +331,7 @@ func SelfProber( // yet. go Probe( proberCtx, - nil, + &wg, selfProbeConfiguration.DataLogger, defaultConnection.Client(), selfProbeConfiguration.URL, @@ -340,7 +342,14 @@ func SelfProber( } if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) self probing driver is stopping after sending %d probes.\n", + "(%s) Self probe driver is going to start waiting for its probes to finish.\n", + debugging.Prefix, + ) + } + utilities.OrTimeout(func() { wg.Wait() }, 2*time.Second) + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) Self probe driver is stopping after sending %d probes.\n", debugging.Prefix, probeCount, ) @@ -351,27 +360,31 @@ func SelfProber( } func LGCollectData( - lgDataCollectionCtx context.Context, - operatingCtx context.Context, + saturationCtx context.Context, + networkActivityCtx context.Context, + controlCtx context.Context, lgcGenerator func() lgc.LoadGeneratingConnection, selfProbeConfigurationGenerator func() ProbeConfiguration, throughputDataLogger datalogger.DataLogger[ThroughputDataPoint], debugging *debug.DebugWithPrefix, -) (resulted chan SelfDataCollectionResult) { +) (saturated chan bool, resulted chan SelfDataCollectionResult) { resulted = make(chan SelfDataCollectionResult) + saturated = make(chan bool) go func() { + isSaturated := false + lgcs := make([]lgc.LoadGeneratingConnection, 0) addFlows( - lgDataCollectionCtx, + networkActivityCtx, constants.StartingNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level, ) - selfProbeCtx, selfProbeCtxCancel := context.WithCancel(lgDataCollectionCtx) + selfProbeCtx, selfProbeCtxCancel := context.WithCancel(saturationCtx) probeDataPointsChannel := SelfProber(selfProbeCtx, lgcs[0], &lgcs, @@ -400,11 +413,21 @@ func LGCollectData( for currentInterval := uint64(0); true; currentInterval++ { - // When the program stops operating, then stop. When our invoker tells - // us to stop, then stop. - if operatingCtx.Err() != nil || lgDataCollectionCtx.Err() != nil { - selfProbeCtxCancel() - return + // Stop if the client has reached saturation on both sides (up and down) + if saturationCtx.Err() != nil { + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: Stopping data-collection/saturation loop at %v because both sides are saturated.", debugging, time.Now()) + } + break + } + + // Stop if we timed out! Send back false to indicate that we are returning under duress. + if controlCtx.Err() != nil { + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: Stopping data-collection/saturation loop at %v because our controller told us to do so.", debugging, time.Now()) + } + saturated <- false + break } now := time.Now() @@ -507,7 +530,11 @@ func LGCollectData( // Special case: We won't make any adjustments on the first // iteration. - if currentInterval == 0 { + // Special case: If we are already saturated, let's move on. + // We would already be saturated and want to continue + // to do this loop because we are still generating good + // data! + if currentInterval == 0 || isSaturated { continue } @@ -523,7 +550,7 @@ func LGCollectData( ) } addFlows( - lgDataCollectionCtx, + networkActivityCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, @@ -545,19 +572,30 @@ func LGCollectData( if debug.IsDebug(debugging.Level) { fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging) } - break + // Do not break -- we want to continue looping so that we can continue to log. + // See comment at the beginning of the loop for its terminating condition. + isSaturated = true + + // But, we do send back a flare that says we are saturated (and happily so)! + saturated <- true } else { // Else, add four more flows if debug.IsDebug(debugging.Level) { fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging) } - addFlows(lgDataCollectionCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level) + addFlows(networkActivityCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level) previousFlowIncreaseInterval = currentInterval } } - } + // For whatever reason, we are done. Let's report our results. + + // In the case that we ended happily, there should be no reason to do this (because + // the self-probe context is a descendant of the saturation context). However, if we + // were cancelled because of a timeout, we will need to explicitly cancel it. Multiple + // calls to a cancel function are a-okay. selfProbeCtxCancel() + selfProbeDataPoints := make([]ProbeDataPoint, 0) for dataPoint := range probeDataPointsChannel { selfProbeDataPoints = append(selfProbeDataPoints, dataPoint) @@ -569,7 +607,7 @@ func LGCollectData( len(selfProbeDataPoints), ) } - resulted <- SelfDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, DataPoints: selfProbeDataPoints} + resulted <- SelfDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, ProbeDataPoints: selfProbeDataPoints} }() return } |
