diff options
| -rw-r--r-- | rpm/rpm.go | 282 |
1 files changed, 65 insertions, 217 deletions
@@ -29,7 +29,6 @@ import ( "github.com/network-quality/goresponsiveness/debug" "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" - "github.com/network-quality/goresponsiveness/ma" "github.com/network-quality/goresponsiveness/stats" "github.com/network-quality/goresponsiveness/traceable" "github.com/network-quality/goresponsiveness/utilities" @@ -58,7 +57,6 @@ func addFlows( type ProbeConfiguration struct { URL string DataLogger datalogger.DataLogger[ProbeDataPoint] - Interval time.Duration } type ProbeDataPoint struct { @@ -224,26 +222,31 @@ func Probe( return nil } -func ForeignProber( +func CombinedProber( proberCtx context.Context, foreignProbeConfigurationGenerator func() ProbeConfiguration, + selfProbeConfigurationGenerator func() ProbeConfiguration, + selfProbeConnection lgc.LoadGeneratingConnection, + probeInterval time.Duration, keyLogger io.Writer, debugging *debug.DebugWithPrefix, ) (points chan ProbeDataPoint) { points = make(chan ProbeDataPoint) - foreignProbeConfiguration := foreignProbeConfigurationGenerator() - go func() { wg := sync.WaitGroup{} probeCount := 0 for proberCtx.Err() == nil { - time.Sleep(foreignProbeConfiguration.Interval) + + time.Sleep(probeInterval) + + foreignProbeConfiguration := foreignProbeConfigurationGenerator() + selfProbeConfiguration := selfProbeConfigurationGenerator() if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) About to start foreign probe number %d!\n", + "(%s) About to send of the %d round of probes!\n", debugging.Prefix, probeCount, ) @@ -282,58 +285,12 @@ func ForeignProber( &points, debugging, ) - } - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "(%s) Foreign probe driver is going to start waiting for its probes to finish.\n", - debugging.Prefix, - ) - } - utilities.OrTimeout(func() { wg.Wait() }, 2*time.Second) - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "(%s) Foreign probe driver is done waiting for its probes to finish.\n", - debugging.Prefix, - ) - } - close(points) - }() - return -} - -func SelfProber( - proberCtx context.Context, - defaultConnection lgc.LoadGeneratingConnection, - altConnections *[]lgc.LoadGeneratingConnection, - selfProbeConfiguration ProbeConfiguration, - debugging *debug.DebugWithPrefix, -) (points chan ProbeDataPoint) { - points = make(chan ProbeDataPoint) - debugging = debug.NewDebugWithPrefix(debugging.Level, debugging.Prefix+" self probe") - - go func() { - wg := sync.WaitGroup{} - probeCount := 0 - for proberCtx.Err() == nil { - time.Sleep(selfProbeConfiguration.Interval) - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "(%s) About to start self probe number %d!\n", - debugging.Prefix, - probeCount, - ) - } - probeCount++ - // TODO: We do not yet take in to account that the load-generating connection that we were given - // on which to perform measurements might go away during testing. We have access to all the open - // load-generating connections (altConnections) to handle this case, but we just aren't using them - // yet. go Probe( proberCtx, &wg, selfProbeConfiguration.DataLogger, - defaultConnection.Client(), + selfProbeConnection.Client(), selfProbeConfiguration.URL, Self, &points, @@ -342,16 +299,15 @@ func SelfProber( } if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) Self probe driver is going to start waiting for its probes to finish.\n", + "(%s) Combined probe driver is going to start waiting for its probes to finish.\n", debugging.Prefix, ) } utilities.OrTimeout(func() { wg.Wait() }, 2*time.Second) if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) Self probe driver is stopping after sending %d probes.\n", + "(%s) Combined probe driver is done waiting for its probes to finish.\n", debugging.Prefix, - probeCount, ) } close(points) @@ -359,23 +315,31 @@ func SelfProber( return } -func LGCollectData( - saturationCtx context.Context, - networkActivityCtx context.Context, - controlCtx context.Context, - lgcGenerator func() lgc.LoadGeneratingConnection, - selfProbeConfigurationGenerator func() ProbeConfiguration, - throughputDataLogger datalogger.DataLogger[ThroughputDataPoint], - debugging *debug.DebugWithPrefix, -) (saturated chan bool, resulted chan SelfDataCollectionResult) { - resulted = make(chan SelfDataCollectionResult) - saturated = make(chan bool) +func LoadGenerator( + networkActivityCtx context.Context, // Create all network connections in this context. + saturationCtx context.Context, // Continue logging, but stop adding flows when this context is canceled! + loadGeneratorCtx context.Context, // Stop our activity when we no longer need to generate load. + lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection. + throughputDataLogger datalogger.DataLogger[ThroughputDataPoint], // For logging data! + debugging *debug.DebugWithPrefix, // How can we forget debugging? +) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to use for self probes. + throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate. +) { + + throughputCalculations = make(chan ThroughputDataPoint) + // The channel that we are going to use to send back the connection to use for probing may not immediately + // be read by the caller. We don't want to wait around until they are ready before we start doing our work. + // So, we'll make it buffered. + probeConnectionCommunicationChannel = make(chan lgc.LoadGeneratingConnection, 1) + go func() { isSaturated := false lgcs := make([]lgc.LoadGeneratingConnection, 0) + flowsCreated := uint64(0) + addFlows( networkActivityCtx, constants.StartingNumberOfLoadGeneratingConnections, @@ -383,53 +347,30 @@ func LGCollectData( lgcGenerator, debugging.Level, ) + flowsCreated += constants.StartingNumberOfLoadGeneratingConnections - selfProbeCtx, selfProbeCtxCancel := context.WithCancel(saturationCtx) - probeDataPointsChannel := SelfProber(selfProbeCtx, - lgcs[0], - &lgcs, - selfProbeConfigurationGenerator(), - debugging, - ) - - previousFlowIncreaseInterval := uint64(0) - previousMovingAverage := float64(0) - - // The moving average will contain the average for the last - // constants.MovingAverageIntervalCount throughputs. - // ie, ma[i] = (throughput[i-3] + throughput[i-2] + throughput[i-1] + throughput[i])/4 - movingAverage := ma.NewMovingAverage( - constants.MovingAverageIntervalCount, - ) - - // The moving average average will be the average of the last - // constants.MovingAverageIntervalCount moving averages. - // ie, maa[i] = (ma[i-3] + ma[i-2] + ma[i-1] + ma[i])/4 - movingAverageAverage := ma.NewMovingAverage( - constants.MovingAverageIntervalCount, - ) + // We have at least a single load-generating channel. This channel will be the one that + // the self probes use. Let's send it back to the caller so that they can pass it on if they need to. + probeConnectionCommunicationChannel <- lgcs[0] nextSampleStartTime := time.Now().Add(time.Second) for currentInterval := uint64(0); true; currentInterval++ { - // Stop if the client has reached saturation on both sides (up and down) - if saturationCtx.Err() != nil { - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: Stopping data-collection/saturation loop at %v because both sides are saturated.", debugging, time.Now()) - } + // If the operationalCtx is canceled, then that means our work here is done ... + if loadGeneratorCtx.Err() != nil { break } - // Stop if we timed out! Send back false to indicate that we are returning under duress. - if controlCtx.Err() != nil { + if saturationCtx.Err() != nil { + isSaturated = true if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: Stopping data-collection/saturation loop at %v because our controller told us to do so.", debugging, time.Now()) + fmt.Printf( + "%v: Received the saturated signal; continuing only to log from now on.\n", + debugging, + ) } - saturated <- false - break } - now := time.Now() // At each 1-second interval if nextSampleStartTime.Sub(now) > 0 { @@ -448,7 +389,7 @@ func LGCollectData( // Compute "instantaneous aggregate" goodput which is the number of // bytes transferred within the last second. - var totalTransfer float64 = 0 + var instantaneousTotalThroughput float64 = 0 allInvalid := true for i := range lgcs { if !lgcs[i].IsValid() { @@ -464,12 +405,12 @@ func LGCollectData( allInvalid = false currentTransferred, currentInterval := lgcs[i].TransferredInInterval() // normalize to a second-long interval! - instantaneousTransferred := float64( + instantaneousConnectionThroughput := float64( currentTransferred, ) / float64( currentInterval.Seconds(), ) - totalTransfer += instantaneousTransferred + instantaneousTotalThroughput += instantaneousConnectionThroughput } // For some reason, all the lgcs are invalid. This likely means that @@ -484,130 +425,37 @@ func LGCollectData( break } - // Compute a moving average of the last - // constants.MovingAverageIntervalCount "instantaneous aggregate - // goodput" measurements - movingAverage.AddMeasurement(float64(totalTransfer)) - currentMovingAverage := movingAverage.CalculateAverage() - movingAverageAverage.AddMeasurement(currentMovingAverage) - movingAverageDelta := utilities.SignedPercentDifference( - currentMovingAverage, - previousMovingAverage, - ) + // We have generated a throughput calculation -- let's send it back to the coordinator + throughputDataPoint := ThroughputDataPoint{time.Now(), instantaneousTotalThroughput} + throughputCalculations <- throughputDataPoint + // Log that, if we are configured for logging. if !utilities.IsInterfaceNil(throughputDataLogger) { - throughputDataLogger.LogRecord( - ThroughputDataPoint{time.Now(), currentMovingAverage}, - ) - } - - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Instantaneous goodput: %f MB.\n", - debugging, - utilities.ToMBps(float64(totalTransfer)), - ) - fmt.Printf( - "%v: Previous moving average: %f MB.\n", - debugging, - utilities.ToMBps(previousMovingAverage), - ) - fmt.Printf( - "%v: Current moving average: %f MB.\n", - debugging, - utilities.ToMBps(currentMovingAverage), - ) - fmt.Printf( - "%v: Moving average delta: %f.\n", - debugging, - movingAverageDelta, - ) + throughputDataLogger.LogRecord(throughputDataPoint) } - previousMovingAverage = currentMovingAverage - - intervalsSinceLastFlowIncrease := currentInterval - previousFlowIncreaseInterval - - // Special case: We won't make any adjustments on the first - // iteration. - // Special case: If we are already saturated, let's move on. - // We would already be saturated and want to continue - // to do this loop because we are still generating good - // data! - if currentInterval == 0 || isSaturated { + // We don't actually want to create a new connection if we are saturated! + if isSaturated { continue } - // If moving average > "previous" moving average + InstabilityDelta: - if movingAverageDelta > constants.InstabilityDelta { - // Network did not yet reach saturation. If no flows added - // within the last 4 seconds, add 4 more flows - if intervalsSinceLastFlowIncrease > constants.MovingAverageStabilitySpan { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Adding flows because we are unsaturated and waited a while.\n", - debugging, - ) - } - addFlows( - networkActivityCtx, - constants.AdditiveNumberOfLoadGeneratingConnections, - &lgcs, - lgcGenerator, - debugging.Level, - ) - previousFlowIncreaseInterval = currentInterval - } else { - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debugging) - } - } - } else { // Else, network reached saturation for the current flow count. - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: Network reached saturation with current flow count.\n", debugging) - } - // If new flows added and for 4 seconds the moving average - // throughput did not change: network reached stable saturation - if intervalsSinceLastFlowIncrease < constants.MovingAverageStabilitySpan && movingAverageAverage.AllSequentialIncreasesLessThan(constants.InstabilityDelta) { - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging) - } - // Do not break -- we want to continue looping so that we can continue to log. - // See comment at the beginning of the loop for its terminating condition. - isSaturated = true + // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now! + addFlows( + networkActivityCtx, + constants.AdditiveNumberOfLoadGeneratingConnections, + &lgcs, + lgcGenerator, + debugging.Level, + ) - // But, we do send back a flare that says we are saturated (and happily so)! - saturated <- true - } else { - // Else, add four more flows - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging) - } - addFlows(networkActivityCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level) - previousFlowIncreaseInterval = currentInterval - } - } + flowsCreated += constants.AdditiveNumberOfLoadGeneratingConnections } - // For whatever reason, we are done. Let's report our results. - - // In the case that we ended happily, there should be no reason to do this (because - // the self-probe context is a descendant of the saturation context). However, if we - // were cancelled because of a timeout, we will need to explicitly cancel it. Multiple - // calls to a cancel function are a-okay. - selfProbeCtxCancel() - selfProbeDataPoints := make([]ProbeDataPoint, 0) - for dataPoint := range probeDataPointsChannel { - selfProbeDataPoints = append(selfProbeDataPoints, dataPoint) - } if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) Collected %d self data points\n", - debugging.Prefix, - len(selfProbeDataPoints), - ) + "(%s) Stopping a load generator after creating %d flows.\n", + debugging.Prefix, flowsCreated) } - resulted <- SelfDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, ProbeDataPoints: selfProbeDataPoints} }() return } |
