diff options
Diffstat (limited to 'rpm/rpm.go')
| -rw-r--r-- | rpm/rpm.go | 80 |
1 files changed, 59 insertions, 21 deletions
@@ -75,9 +75,10 @@ type ThroughputDataPoint struct { } type SelfDataCollectionResult struct { - RateBps float64 - LGCs []lgc.LoadGeneratingConnection - DataPoints []ProbeDataPoint + RateBps float64 + LGCs []lgc.LoadGeneratingConnection + ProbeDataPoints []ProbeDataPoint + LoggingContinuation func() } type ProbeType int64 @@ -312,6 +313,7 @@ func SelfProber( debugging = debug.NewDebugWithPrefix(debugging.Level, debugging.Prefix+" self probe") go func() { + wg := sync.WaitGroup{} probeCount := 0 for proberCtx.Err() == nil { time.Sleep(selfProbeConfiguration.Interval) @@ -329,7 +331,7 @@ func SelfProber( // yet. go Probe( proberCtx, - nil, + &wg, selfProbeConfiguration.DataLogger, defaultConnection.Client(), selfProbeConfiguration.URL, @@ -340,7 +342,14 @@ func SelfProber( } if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) self probing driver is stopping after sending %d probes.\n", + "(%s) Self probe driver is going to start waiting for its probes to finish.\n", + debugging.Prefix, + ) + } + utilities.OrTimeout(func() { wg.Wait() }, 2*time.Second) + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) Self probe driver is stopping after sending %d probes.\n", debugging.Prefix, probeCount, ) @@ -351,27 +360,31 @@ func SelfProber( } func LGCollectData( - lgDataCollectionCtx context.Context, - operatingCtx context.Context, + saturationCtx context.Context, + networkActivityCtx context.Context, + controlCtx context.Context, lgcGenerator func() lgc.LoadGeneratingConnection, selfProbeConfigurationGenerator func() ProbeConfiguration, throughputDataLogger datalogger.DataLogger[ThroughputDataPoint], debugging *debug.DebugWithPrefix, -) (resulted chan SelfDataCollectionResult) { +) (saturated chan bool, resulted chan SelfDataCollectionResult) { resulted = make(chan SelfDataCollectionResult) + saturated = make(chan bool) go func() { + isSaturated := false + lgcs := make([]lgc.LoadGeneratingConnection, 0) addFlows( - lgDataCollectionCtx, + networkActivityCtx, constants.StartingNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level, ) - selfProbeCtx, selfProbeCtxCancel := context.WithCancel(lgDataCollectionCtx) + selfProbeCtx, selfProbeCtxCancel := context.WithCancel(saturationCtx) probeDataPointsChannel := SelfProber(selfProbeCtx, lgcs[0], &lgcs, @@ -400,11 +413,21 @@ func LGCollectData( for currentInterval := uint64(0); true; currentInterval++ { - // When the program stops operating, then stop. When our invoker tells - // us to stop, then stop. - if operatingCtx.Err() != nil || lgDataCollectionCtx.Err() != nil { - selfProbeCtxCancel() - return + // Stop if the client has reached saturation on both sides (up and down) + if saturationCtx.Err() != nil { + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: Stopping data-collection/saturation loop at %v because both sides are saturated.", debugging, time.Now()) + } + break + } + + // Stop if we timed out! Send back false to indicate that we are returning under duress. + if controlCtx.Err() != nil { + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: Stopping data-collection/saturation loop at %v because our controller told us to do so.", debugging, time.Now()) + } + saturated <- false + break } now := time.Now() @@ -507,7 +530,11 @@ func LGCollectData( // Special case: We won't make any adjustments on the first // iteration. - if currentInterval == 0 { + // Special case: If we are already saturated, let's move on. + // We would already be saturated and want to continue + // to do this loop because we are still generating good + // data! + if currentInterval == 0 || isSaturated { continue } @@ -523,7 +550,7 @@ func LGCollectData( ) } addFlows( - lgDataCollectionCtx, + networkActivityCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, @@ -545,19 +572,30 @@ func LGCollectData( if debug.IsDebug(debugging.Level) { fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging) } - break + // Do not break -- we want to continue looping so that we can continue to log. + // See comment at the beginning of the loop for its terminating condition. + isSaturated = true + + // But, we do send back a flare that says we are saturated (and happily so)! + saturated <- true } else { // Else, add four more flows if debug.IsDebug(debugging.Level) { fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging) } - addFlows(lgDataCollectionCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level) + addFlows(networkActivityCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level) previousFlowIncreaseInterval = currentInterval } } - } + // For whatever reason, we are done. Let's report our results. + + // In the case that we ended happily, there should be no reason to do this (because + // the self-probe context is a descendant of the saturation context). However, if we + // were cancelled because of a timeout, we will need to explicitly cancel it. Multiple + // calls to a cancel function are a-okay. selfProbeCtxCancel() + selfProbeDataPoints := make([]ProbeDataPoint, 0) for dataPoint := range probeDataPointsChannel { selfProbeDataPoints = append(selfProbeDataPoints, dataPoint) @@ -569,7 +607,7 @@ func LGCollectData( len(selfProbeDataPoints), ) } - resulted <- SelfDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, DataPoints: selfProbeDataPoints} + resulted <- SelfDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, ProbeDataPoints: selfProbeDataPoints} }() return } |
