summaryrefslogtreecommitdiff
path: root/rpm/rpm.go
diff options
context:
space:
mode:
Diffstat (limited to 'rpm/rpm.go')
-rw-r--r--rpm/rpm.go80
1 files changed, 59 insertions, 21 deletions
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 61c6658..eab7b90 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -75,9 +75,10 @@ type ThroughputDataPoint struct {
}
type SelfDataCollectionResult struct {
- RateBps float64
- LGCs []lgc.LoadGeneratingConnection
- DataPoints []ProbeDataPoint
+ RateBps float64
+ LGCs []lgc.LoadGeneratingConnection
+ ProbeDataPoints []ProbeDataPoint
+ LoggingContinuation func()
}
type ProbeType int64
@@ -312,6 +313,7 @@ func SelfProber(
debugging = debug.NewDebugWithPrefix(debugging.Level, debugging.Prefix+" self probe")
go func() {
+ wg := sync.WaitGroup{}
probeCount := 0
for proberCtx.Err() == nil {
time.Sleep(selfProbeConfiguration.Interval)
@@ -329,7 +331,7 @@ func SelfProber(
// yet.
go Probe(
proberCtx,
- nil,
+ &wg,
selfProbeConfiguration.DataLogger,
defaultConnection.Client(),
selfProbeConfiguration.URL,
@@ -340,7 +342,14 @@ func SelfProber(
}
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) self probing driver is stopping after sending %d probes.\n",
+ "(%s) Self probe driver is going to start waiting for its probes to finish.\n",
+ debugging.Prefix,
+ )
+ }
+ utilities.OrTimeout(func() { wg.Wait() }, 2*time.Second)
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) Self probe driver is stopping after sending %d probes.\n",
debugging.Prefix,
probeCount,
)
@@ -351,27 +360,31 @@ func SelfProber(
}
func LGCollectData(
- lgDataCollectionCtx context.Context,
- operatingCtx context.Context,
+ saturationCtx context.Context,
+ networkActivityCtx context.Context,
+ controlCtx context.Context,
lgcGenerator func() lgc.LoadGeneratingConnection,
selfProbeConfigurationGenerator func() ProbeConfiguration,
throughputDataLogger datalogger.DataLogger[ThroughputDataPoint],
debugging *debug.DebugWithPrefix,
-) (resulted chan SelfDataCollectionResult) {
+) (saturated chan bool, resulted chan SelfDataCollectionResult) {
resulted = make(chan SelfDataCollectionResult)
+ saturated = make(chan bool)
go func() {
+ isSaturated := false
+
lgcs := make([]lgc.LoadGeneratingConnection, 0)
addFlows(
- lgDataCollectionCtx,
+ networkActivityCtx,
constants.StartingNumberOfLoadGeneratingConnections,
&lgcs,
lgcGenerator,
debugging.Level,
)
- selfProbeCtx, selfProbeCtxCancel := context.WithCancel(lgDataCollectionCtx)
+ selfProbeCtx, selfProbeCtxCancel := context.WithCancel(saturationCtx)
probeDataPointsChannel := SelfProber(selfProbeCtx,
lgcs[0],
&lgcs,
@@ -400,11 +413,21 @@ func LGCollectData(
for currentInterval := uint64(0); true; currentInterval++ {
- // When the program stops operating, then stop. When our invoker tells
- // us to stop, then stop.
- if operatingCtx.Err() != nil || lgDataCollectionCtx.Err() != nil {
- selfProbeCtxCancel()
- return
+ // Stop if the client has reached saturation on both sides (up and down)
+ if saturationCtx.Err() != nil {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf("%v: Stopping data-collection/saturation loop at %v because both sides are saturated.", debugging, time.Now())
+ }
+ break
+ }
+
+ // Stop if we timed out! Send back false to indicate that we are returning under duress.
+ if controlCtx.Err() != nil {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf("%v: Stopping data-collection/saturation loop at %v because our controller told us to do so.", debugging, time.Now())
+ }
+ saturated <- false
+ break
}
now := time.Now()
@@ -507,7 +530,11 @@ func LGCollectData(
// Special case: We won't make any adjustments on the first
// iteration.
- if currentInterval == 0 {
+ // Special case: If we are already saturated, let's move on.
+ // We would already be saturated and want to continue
+ // to do this loop because we are still generating good
+ // data!
+ if currentInterval == 0 || isSaturated {
continue
}
@@ -523,7 +550,7 @@ func LGCollectData(
)
}
addFlows(
- lgDataCollectionCtx,
+ networkActivityCtx,
constants.AdditiveNumberOfLoadGeneratingConnections,
&lgcs,
lgcGenerator,
@@ -545,19 +572,30 @@ func LGCollectData(
if debug.IsDebug(debugging.Level) {
fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging)
}
- break
+ // Do not break -- we want to continue looping so that we can continue to log.
+ // See comment at the beginning of the loop for its terminating condition.
+ isSaturated = true
+
+ // But, we do send back a flare that says we are saturated (and happily so)!
+ saturated <- true
} else {
// Else, add four more flows
if debug.IsDebug(debugging.Level) {
fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging)
}
- addFlows(lgDataCollectionCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level)
+ addFlows(networkActivityCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level)
previousFlowIncreaseInterval = currentInterval
}
}
-
}
+ // For whatever reason, we are done. Let's report our results.
+
+ // In the case that we ended happily, there should be no reason to do this (because
+ // the self-probe context is a descendant of the saturation context). However, if we
+ // were cancelled because of a timeout, we will need to explicitly cancel it. Multiple
+ // calls to a cancel function are a-okay.
selfProbeCtxCancel()
+
selfProbeDataPoints := make([]ProbeDataPoint, 0)
for dataPoint := range probeDataPointsChannel {
selfProbeDataPoints = append(selfProbeDataPoints, dataPoint)
@@ -569,7 +607,7 @@ func LGCollectData(
len(selfProbeDataPoints),
)
}
- resulted <- SelfDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, DataPoints: selfProbeDataPoints}
+ resulted <- SelfDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, ProbeDataPoints: selfProbeDataPoints}
}()
return
}