summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rpm/rpm.go282
1 files changed, 65 insertions, 217 deletions
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 1158204..c5fcb8d 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -29,7 +29,6 @@ import (
"github.com/network-quality/goresponsiveness/debug"
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
- "github.com/network-quality/goresponsiveness/ma"
"github.com/network-quality/goresponsiveness/stats"
"github.com/network-quality/goresponsiveness/traceable"
"github.com/network-quality/goresponsiveness/utilities"
@@ -58,7 +57,6 @@ func addFlows(
type ProbeConfiguration struct {
URL string
DataLogger datalogger.DataLogger[ProbeDataPoint]
- Interval time.Duration
}
type ProbeDataPoint struct {
@@ -224,26 +222,31 @@ func Probe(
return nil
}
-func ForeignProber(
+func CombinedProber(
proberCtx context.Context,
foreignProbeConfigurationGenerator func() ProbeConfiguration,
+ selfProbeConfigurationGenerator func() ProbeConfiguration,
+ selfProbeConnection lgc.LoadGeneratingConnection,
+ probeInterval time.Duration,
keyLogger io.Writer,
debugging *debug.DebugWithPrefix,
) (points chan ProbeDataPoint) {
points = make(chan ProbeDataPoint)
- foreignProbeConfiguration := foreignProbeConfigurationGenerator()
-
go func() {
wg := sync.WaitGroup{}
probeCount := 0
for proberCtx.Err() == nil {
- time.Sleep(foreignProbeConfiguration.Interval)
+
+ time.Sleep(probeInterval)
+
+ foreignProbeConfiguration := foreignProbeConfigurationGenerator()
+ selfProbeConfiguration := selfProbeConfigurationGenerator()
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) About to start foreign probe number %d!\n",
+ "(%s) About to send of the %d round of probes!\n",
debugging.Prefix,
probeCount,
)
@@ -282,58 +285,12 @@ func ForeignProber(
&points,
debugging,
)
- }
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "(%s) Foreign probe driver is going to start waiting for its probes to finish.\n",
- debugging.Prefix,
- )
- }
- utilities.OrTimeout(func() { wg.Wait() }, 2*time.Second)
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "(%s) Foreign probe driver is done waiting for its probes to finish.\n",
- debugging.Prefix,
- )
- }
- close(points)
- }()
- return
-}
-
-func SelfProber(
- proberCtx context.Context,
- defaultConnection lgc.LoadGeneratingConnection,
- altConnections *[]lgc.LoadGeneratingConnection,
- selfProbeConfiguration ProbeConfiguration,
- debugging *debug.DebugWithPrefix,
-) (points chan ProbeDataPoint) {
- points = make(chan ProbeDataPoint)
- debugging = debug.NewDebugWithPrefix(debugging.Level, debugging.Prefix+" self probe")
-
- go func() {
- wg := sync.WaitGroup{}
- probeCount := 0
- for proberCtx.Err() == nil {
- time.Sleep(selfProbeConfiguration.Interval)
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "(%s) About to start self probe number %d!\n",
- debugging.Prefix,
- probeCount,
- )
- }
- probeCount++
- // TODO: We do not yet take in to account that the load-generating connection that we were given
- // on which to perform measurements might go away during testing. We have access to all the open
- // load-generating connections (altConnections) to handle this case, but we just aren't using them
- // yet.
go Probe(
proberCtx,
&wg,
selfProbeConfiguration.DataLogger,
- defaultConnection.Client(),
+ selfProbeConnection.Client(),
selfProbeConfiguration.URL,
Self,
&points,
@@ -342,16 +299,15 @@ func SelfProber(
}
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) Self probe driver is going to start waiting for its probes to finish.\n",
+ "(%s) Combined probe driver is going to start waiting for its probes to finish.\n",
debugging.Prefix,
)
}
utilities.OrTimeout(func() { wg.Wait() }, 2*time.Second)
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) Self probe driver is stopping after sending %d probes.\n",
+ "(%s) Combined probe driver is done waiting for its probes to finish.\n",
debugging.Prefix,
- probeCount,
)
}
close(points)
@@ -359,23 +315,31 @@ func SelfProber(
return
}
-func LGCollectData(
- saturationCtx context.Context,
- networkActivityCtx context.Context,
- controlCtx context.Context,
- lgcGenerator func() lgc.LoadGeneratingConnection,
- selfProbeConfigurationGenerator func() ProbeConfiguration,
- throughputDataLogger datalogger.DataLogger[ThroughputDataPoint],
- debugging *debug.DebugWithPrefix,
-) (saturated chan bool, resulted chan SelfDataCollectionResult) {
- resulted = make(chan SelfDataCollectionResult)
- saturated = make(chan bool)
+func LoadGenerator(
+ networkActivityCtx context.Context, // Create all network connections in this context.
+ saturationCtx context.Context, // Continue logging, but stop adding flows when this context is canceled!
+ loadGeneratorCtx context.Context, // Stop our activity when we no longer need to generate load.
+ lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection.
+ throughputDataLogger datalogger.DataLogger[ThroughputDataPoint], // For logging data!
+ debugging *debug.DebugWithPrefix, // How can we forget debugging?
+) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to use for self probes.
+ throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate.
+) {
+
+ throughputCalculations = make(chan ThroughputDataPoint)
+ // The channel that we are going to use to send back the connection to use for probing may not immediately
+ // be read by the caller. We don't want to wait around until they are ready before we start doing our work.
+ // So, we'll make it buffered.
+ probeConnectionCommunicationChannel = make(chan lgc.LoadGeneratingConnection, 1)
+
go func() {
isSaturated := false
lgcs := make([]lgc.LoadGeneratingConnection, 0)
+ flowsCreated := uint64(0)
+
addFlows(
networkActivityCtx,
constants.StartingNumberOfLoadGeneratingConnections,
@@ -383,53 +347,30 @@ func LGCollectData(
lgcGenerator,
debugging.Level,
)
+ flowsCreated += constants.StartingNumberOfLoadGeneratingConnections
- selfProbeCtx, selfProbeCtxCancel := context.WithCancel(saturationCtx)
- probeDataPointsChannel := SelfProber(selfProbeCtx,
- lgcs[0],
- &lgcs,
- selfProbeConfigurationGenerator(),
- debugging,
- )
-
- previousFlowIncreaseInterval := uint64(0)
- previousMovingAverage := float64(0)
-
- // The moving average will contain the average for the last
- // constants.MovingAverageIntervalCount throughputs.
- // ie, ma[i] = (throughput[i-3] + throughput[i-2] + throughput[i-1] + throughput[i])/4
- movingAverage := ma.NewMovingAverage(
- constants.MovingAverageIntervalCount,
- )
-
- // The moving average average will be the average of the last
- // constants.MovingAverageIntervalCount moving averages.
- // ie, maa[i] = (ma[i-3] + ma[i-2] + ma[i-1] + ma[i])/4
- movingAverageAverage := ma.NewMovingAverage(
- constants.MovingAverageIntervalCount,
- )
+ // We have at least a single load-generating channel. This channel will be the one that
+ // the self probes use. Let's send it back to the caller so that they can pass it on if they need to.
+ probeConnectionCommunicationChannel <- lgcs[0]
nextSampleStartTime := time.Now().Add(time.Second)
for currentInterval := uint64(0); true; currentInterval++ {
- // Stop if the client has reached saturation on both sides (up and down)
- if saturationCtx.Err() != nil {
- if debug.IsDebug(debugging.Level) {
- fmt.Printf("%v: Stopping data-collection/saturation loop at %v because both sides are saturated.", debugging, time.Now())
- }
+ // If the operationalCtx is canceled, then that means our work here is done ...
+ if loadGeneratorCtx.Err() != nil {
break
}
- // Stop if we timed out! Send back false to indicate that we are returning under duress.
- if controlCtx.Err() != nil {
+ if saturationCtx.Err() != nil {
+ isSaturated = true
if debug.IsDebug(debugging.Level) {
- fmt.Printf("%v: Stopping data-collection/saturation loop at %v because our controller told us to do so.", debugging, time.Now())
+ fmt.Printf(
+ "%v: Received the saturated signal; continuing only to log from now on.\n",
+ debugging,
+ )
}
- saturated <- false
- break
}
-
now := time.Now()
// At each 1-second interval
if nextSampleStartTime.Sub(now) > 0 {
@@ -448,7 +389,7 @@ func LGCollectData(
// Compute "instantaneous aggregate" goodput which is the number of
// bytes transferred within the last second.
- var totalTransfer float64 = 0
+ var instantaneousTotalThroughput float64 = 0
allInvalid := true
for i := range lgcs {
if !lgcs[i].IsValid() {
@@ -464,12 +405,12 @@ func LGCollectData(
allInvalid = false
currentTransferred, currentInterval := lgcs[i].TransferredInInterval()
// normalize to a second-long interval!
- instantaneousTransferred := float64(
+ instantaneousConnectionThroughput := float64(
currentTransferred,
) / float64(
currentInterval.Seconds(),
)
- totalTransfer += instantaneousTransferred
+ instantaneousTotalThroughput += instantaneousConnectionThroughput
}
// For some reason, all the lgcs are invalid. This likely means that
@@ -484,130 +425,37 @@ func LGCollectData(
break
}
- // Compute a moving average of the last
- // constants.MovingAverageIntervalCount "instantaneous aggregate
- // goodput" measurements
- movingAverage.AddMeasurement(float64(totalTransfer))
- currentMovingAverage := movingAverage.CalculateAverage()
- movingAverageAverage.AddMeasurement(currentMovingAverage)
- movingAverageDelta := utilities.SignedPercentDifference(
- currentMovingAverage,
- previousMovingAverage,
- )
+ // We have generated a throughput calculation -- let's send it back to the coordinator
+ throughputDataPoint := ThroughputDataPoint{time.Now(), instantaneousTotalThroughput}
+ throughputCalculations <- throughputDataPoint
+ // Log that, if we are configured for logging.
if !utilities.IsInterfaceNil(throughputDataLogger) {
- throughputDataLogger.LogRecord(
- ThroughputDataPoint{time.Now(), currentMovingAverage},
- )
- }
-
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "%v: Instantaneous goodput: %f MB.\n",
- debugging,
- utilities.ToMBps(float64(totalTransfer)),
- )
- fmt.Printf(
- "%v: Previous moving average: %f MB.\n",
- debugging,
- utilities.ToMBps(previousMovingAverage),
- )
- fmt.Printf(
- "%v: Current moving average: %f MB.\n",
- debugging,
- utilities.ToMBps(currentMovingAverage),
- )
- fmt.Printf(
- "%v: Moving average delta: %f.\n",
- debugging,
- movingAverageDelta,
- )
+ throughputDataLogger.LogRecord(throughputDataPoint)
}
- previousMovingAverage = currentMovingAverage
-
- intervalsSinceLastFlowIncrease := currentInterval - previousFlowIncreaseInterval
-
- // Special case: We won't make any adjustments on the first
- // iteration.
- // Special case: If we are already saturated, let's move on.
- // We would already be saturated and want to continue
- // to do this loop because we are still generating good
- // data!
- if currentInterval == 0 || isSaturated {
+ // We don't actually want to create a new connection if we are saturated!
+ if isSaturated {
continue
}
- // If moving average > "previous" moving average + InstabilityDelta:
- if movingAverageDelta > constants.InstabilityDelta {
- // Network did not yet reach saturation. If no flows added
- // within the last 4 seconds, add 4 more flows
- if intervalsSinceLastFlowIncrease > constants.MovingAverageStabilitySpan {
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "%v: Adding flows because we are unsaturated and waited a while.\n",
- debugging,
- )
- }
- addFlows(
- networkActivityCtx,
- constants.AdditiveNumberOfLoadGeneratingConnections,
- &lgcs,
- lgcGenerator,
- debugging.Level,
- )
- previousFlowIncreaseInterval = currentInterval
- } else {
- if debug.IsDebug(debugging.Level) {
- fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debugging)
- }
- }
- } else { // Else, network reached saturation for the current flow count.
- if debug.IsDebug(debugging.Level) {
- fmt.Printf("%v: Network reached saturation with current flow count.\n", debugging)
- }
- // If new flows added and for 4 seconds the moving average
- // throughput did not change: network reached stable saturation
- if intervalsSinceLastFlowIncrease < constants.MovingAverageStabilitySpan && movingAverageAverage.AllSequentialIncreasesLessThan(constants.InstabilityDelta) {
- if debug.IsDebug(debugging.Level) {
- fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging)
- }
- // Do not break -- we want to continue looping so that we can continue to log.
- // See comment at the beginning of the loop for its terminating condition.
- isSaturated = true
+ // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now!
+ addFlows(
+ networkActivityCtx,
+ constants.AdditiveNumberOfLoadGeneratingConnections,
+ &lgcs,
+ lgcGenerator,
+ debugging.Level,
+ )
- // But, we do send back a flare that says we are saturated (and happily so)!
- saturated <- true
- } else {
- // Else, add four more flows
- if debug.IsDebug(debugging.Level) {
- fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging)
- }
- addFlows(networkActivityCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level)
- previousFlowIncreaseInterval = currentInterval
- }
- }
+ flowsCreated += constants.AdditiveNumberOfLoadGeneratingConnections
}
- // For whatever reason, we are done. Let's report our results.
-
- // In the case that we ended happily, there should be no reason to do this (because
- // the self-probe context is a descendant of the saturation context). However, if we
- // were cancelled because of a timeout, we will need to explicitly cancel it. Multiple
- // calls to a cancel function are a-okay.
- selfProbeCtxCancel()
- selfProbeDataPoints := make([]ProbeDataPoint, 0)
- for dataPoint := range probeDataPointsChannel {
- selfProbeDataPoints = append(selfProbeDataPoints, dataPoint)
- }
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) Collected %d self data points\n",
- debugging.Prefix,
- len(selfProbeDataPoints),
- )
+ "(%s) Stopping a load generator after creating %d flows.\n",
+ debugging.Prefix, flowsCreated)
}
- resulted <- SelfDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, ProbeDataPoints: selfProbeDataPoints}
}()
return
}