summaryrefslogtreecommitdiff
path: root/networkQuality.go
diff options
context:
space:
mode:
Diffstat (limited to 'networkQuality.go')
-rw-r--r--networkQuality.go844
1 files changed, 455 insertions, 389 deletions
diff --git a/networkQuality.go b/networkQuality.go
index 340a316..2e13893 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -31,6 +31,7 @@ import (
"github.com/network-quality/goresponsiveness/datalogger"
"github.com/network-quality/goresponsiveness/debug"
"github.com/network-quality/goresponsiveness/direction"
+ "github.com/network-quality/goresponsiveness/executor"
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
"github.com/network-quality/goresponsiveness/probe"
@@ -167,9 +168,20 @@ var (
)
withL4S = flag.Bool("with-l4s", false, "Use L4S (with default TCP prague congestion control algorithm.)")
withL4SAlgorithm = flag.String("with-l4s-algorithm", "", "Use L4S (with specified congestion control algorithm.)")
+
+ parallelTestExecutionPolicy = constants.DefaultTestExecutionPolicy
)
func main() {
+ // Add one final command-line argument
+ flag.BoolFunc("rpm.parallel", "Parallel test execution policy.", func(value string) error {
+ if value != "true" {
+ return fmt.Errorf("-parallel can only be used to enable parallel test execution policy")
+ }
+ parallelTestExecutionPolicy = executor.Parallel
+ return nil
+ })
+
flag.Parse()
if *showVersion {
@@ -184,7 +196,7 @@ func main() {
}
specParameters, err := rpm.SpecParametersFromArguments(*rpmtimeout, *rpmmad, *rpmid,
- *rpmtmp, *rpmsdt, *rpmmnp, *rpmmps, *rpmptc, *rpmp)
+ *rpmtmp, *rpmsdt, *rpmmnp, *rpmmps, *rpmptc, *rpmp, parallelTestExecutionPolicy)
if err != nil {
fmt.Fprintf(
os.Stderr,
@@ -639,479 +651,533 @@ func main() {
}
- // All tests will accumulate data to these series because it will all matter for RPM calculation!
- selfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
- foreignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
-
var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil
if *printQualityAttenuation {
selfRttsQualityAttenuation = qualityattenuation.NewSimpleQualityAttenuation()
}
+ directionExecutionUnits := make([]executor.ExecutionUnit, 0)
+
for _, direction := range directions {
+ // Make a copy here to make sure that we do not get go-wierdness in our closure (see https://github.com/golang/go/discussions/56010).
+ direction := direction
- timeoutDuration := specParameters.TestTimeout
- timeoutAbsoluteTime := time.Now().Add(timeoutDuration)
+ directionExecutionUnit := func() {
+ timeoutDuration := specParameters.TestTimeout
+ timeoutAbsoluteTime := time.Now().Add(timeoutDuration)
- timeoutChannel := timeoutat.TimeoutAt(
- operatingCtx,
- timeoutAbsoluteTime,
- debugLevel,
- )
- if debug.IsDebug(debugLevel) {
- fmt.Printf("%s Test will end no later than %v\n", direction.DirectionLabel, timeoutAbsoluteTime)
- }
+ timeoutChannel := timeoutat.TimeoutAt(
+ operatingCtx,
+ timeoutAbsoluteTime,
+ debugLevel,
+ )
+ if debug.IsDebug(debugLevel) {
+ fmt.Printf("%s Test will end no later than %v\n",
+ direction.DirectionLabel, timeoutAbsoluteTime)
+ }
- throughputCtx, throughputCtxCancel := context.WithCancel(operatingCtx)
- proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx)
+ throughputOperatorCtx, throughputOperatorCtxCancel := context.WithCancel(operatingCtx)
+ proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx)
- // This context is used to control the network activity (i.e., it controls all
- // the connections that are open to do load generation and probing). Cancelling this context will close
- // all the network connections that are responsible for generating the load.
- networkActivityCtx, networkActivityCtxCancel := context.WithCancel(operatingCtx)
+ // This context is used to control the network activity (i.e., it controls all
+ // the connections that are open to do load generation and probing). Cancelling this context will close
+ // all the network connections that are responsible for generating the load.
+ probeNetworkActivityCtx, probeNetworkActivityCtxCancel := context.WithCancel(operatingCtx)
+ throughputCtx, throughputCtxCancel := context.WithCancel(operatingCtx)
+ direction.ThroughputActivityCtx, direction.ThroughputActivityCtxCancel = &throughputCtx, &throughputCtxCancel
- throughputGeneratorCtx, throughputGeneratorCtxCancel := context.WithCancel(throughputCtx)
+ reachWorkingConditionsCtx, reachWorkingConditionsCtxCancel :=
+ context.WithCancel(throughputOperatorCtx)
- lgStabilizationCommunicationChannel := rpm.LoadGenerator(
- throughputCtx,
- networkActivityCtx,
- throughputGeneratorCtx,
- specParameters.EvalInterval,
- direction.CreateLgdc,
- &direction.Lgcc,
- &globalNumericBucketGenerator,
- specParameters.MaxParallelConns,
- *calculateExtendedStats,
- direction.DirectionDebugging,
- )
+ lgStabilizationCommunicationChannel := rpm.LoadGenerator(
+ throughputOperatorCtx,
+ *direction.ThroughputActivityCtx,
+ reachWorkingConditionsCtx,
+ specParameters.EvalInterval,
+ direction.CreateLgdc,
+ &direction.Lgcc,
+ &globalNumericBucketGenerator,
+ specParameters.MaxParallelConns,
+ *calculateExtendedStats,
+ direction.DirectionDebugging,
+ )
- throughputStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug,
- fmt.Sprintf("%v Throughput Stabilizer", direction.DirectionLabel))
- downloadThroughputStabilizerDebugLevel := debug.Error
- if *debugCliFlag {
- downloadThroughputStabilizerDebugLevel = debug.Debug
- }
- throughputStabilizer := stabilizer.NewStabilizer[float64, uint64](
- specParameters.MovingAvgDist, specParameters.StdDevTolerance, 0, "bytes",
- downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig)
+ throughputStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug,
+ fmt.Sprintf("%v Throughput Stabilizer", direction.DirectionLabel))
+ downloadThroughputStabilizerDebugLevel := debug.Error
+ if *debugCliFlag {
+ downloadThroughputStabilizerDebugLevel = debug.Debug
+ }
+ throughputStabilizer := stabilizer.NewStabilizer[float64, uint64](
+ specParameters.MovingAvgDist, specParameters.StdDevTolerance, 0, "bytes",
+ downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig)
- responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug,
- fmt.Sprintf("%v Responsiveness Stabilizer", direction.DirectionLabel))
- responsivenessStabilizerDebugLevel := debug.Error
- if *debugCliFlag {
- responsivenessStabilizerDebugLevel = debug.Debug
- }
- responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint64](
- specParameters.MovingAvgDist, specParameters.StdDevTolerance,
- specParameters.TrimmedMeanPct, "milliseconds",
- responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig)
+ responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug,
+ fmt.Sprintf("%v Responsiveness Stabilizer", direction.DirectionLabel))
+ responsivenessStabilizerDebugLevel := debug.Error
+ if *debugCliFlag {
+ responsivenessStabilizerDebugLevel = debug.Debug
+ }
+ responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint64](
+ specParameters.MovingAvgDist, specParameters.StdDevTolerance,
+ specParameters.TrimmedMeanPct, "milliseconds",
+ responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig)
- // For later debugging output, record the last throughputs on load-generating connectings
- // and the number of open connections.
- lastThroughputRate := float64(0)
- lastThroughputOpenConnectionCount := int(0)
+ // For later debugging output, record the last throughputs on load-generating connectings
+ // and the number of open connections.
+ lastThroughputRate := float64(0)
+ lastThroughputOpenConnectionCount := int(0)
- stabilityCheckTime := time.Now().Add(specParameters.EvalInterval)
- stabilityCheckTimeChannel := timeoutat.TimeoutAt(
- operatingCtx,
- stabilityCheckTime,
- debugLevel,
- )
+ stabilityCheckTime := time.Now().Add(specParameters.EvalInterval)
+ stabilityCheckTimeChannel := timeoutat.TimeoutAt(
+ operatingCtx,
+ stabilityCheckTime,
+ debugLevel,
+ )
- lg_timeout:
- for !direction.StableThroughput {
- select {
- case throughputMeasurement := <-lgStabilizationCommunicationChannel:
- {
- switch throughputMeasurement.Type {
- case series.SeriesMessageReserve:
- {
- throughputStabilizer.Reserve(throughputMeasurement.Bucket)
- if *debugCliFlag {
- fmt.Printf(
- "%s: Reserving a throughput bucket with id %v.\n",
- direction.DirectionLabel, throughputMeasurement.Bucket)
+ lg_timeout:
+ for !direction.StableThroughput {
+ select {
+ case throughputMeasurement := <-lgStabilizationCommunicationChannel:
+ {
+ switch throughputMeasurement.Type {
+ case series.SeriesMessageReserve:
+ {
+ throughputStabilizer.Reserve(throughputMeasurement.Bucket)
+ if *debugCliFlag {
+ fmt.Printf(
+ "%s: Reserving a throughput bucket with id %v.\n",
+ direction.DirectionLabel, throughputMeasurement.Bucket)
+ }
}
- }
- case series.SeriesMessageMeasure:
- {
- bucket := throughputMeasurement.Bucket
- measurement := utilities.GetSome(throughputMeasurement.Measure)
+ case series.SeriesMessageMeasure:
+ {
+ bucket := throughputMeasurement.Bucket
+ measurement := utilities.GetSome(throughputMeasurement.Measure)
- throughputStabilizer.AddMeasurement(bucket, measurement.Throughput)
+ throughputStabilizer.AddMeasurement(bucket, measurement.Throughput)
- direction.ThroughputDataLogger.LogRecord(measurement)
- for _, v := range measurement.GranularThroughputDataPoints {
- v.Direction = "Download"
- direction.GranularThroughputDataLogger.LogRecord(v)
- }
+ direction.ThroughputDataLogger.LogRecord(measurement)
+ for _, v := range measurement.GranularThroughputDataPoints {
+ v.Direction = "Download"
+ direction.GranularThroughputDataLogger.LogRecord(v)
+ }
- lastThroughputRate = measurement.Throughput
- lastThroughputOpenConnectionCount = measurement.Connections
+ lastThroughputRate = measurement.Throughput
+ lastThroughputOpenConnectionCount = measurement.Connections
+ }
}
}
- }
- case <-stabilityCheckTimeChannel:
- {
- if *debugCliFlag {
- fmt.Printf(
- "%v throughput stability interval is complete.\n", direction.DirectionLabel)
- }
- stabilityCheckTime = time.Now().Add(specParameters.EvalInterval)
- stabilityCheckTimeChannel = timeoutat.TimeoutAt(
- operatingCtx,
- stabilityCheckTime,
- debugLevel,
- )
+ case <-stabilityCheckTimeChannel:
+ {
+ if *debugCliFlag {
+ fmt.Printf(
+ "%v throughput stability interval is complete.\n", direction.DirectionLabel)
+ }
+ stabilityCheckTime = time.Now().Add(specParameters.EvalInterval)
+ stabilityCheckTimeChannel = timeoutat.TimeoutAt(
+ operatingCtx,
+ stabilityCheckTime,
+ debugLevel,
+ )
- direction.StableThroughput = throughputStabilizer.IsStable()
- if *debugCliFlag {
- fmt.Printf(
- "%v is instantaneously %s.\n", direction.DirectionLabel,
- utilities.Conditional(direction.StableThroughput, "stable", "unstable"))
- }
+ direction.StableThroughput = throughputStabilizer.IsStable()
+ if *debugCliFlag {
+ fmt.Printf(
+ "%v is instantaneously %s.\n", direction.DirectionLabel,
+ utilities.Conditional(direction.StableThroughput, "stable", "unstable"))
+ }
- if direction.StableThroughput {
- throughputGeneratorCtxCancel()
+ throughputStabilizer.Interval()
+ }
+ case <-timeoutChannel:
+ {
+ break lg_timeout
}
- throughputStabilizer.Interval()
- }
- case <-timeoutChannel:
- {
- break lg_timeout
}
}
- }
- if direction.StableThroughput {
- if *debugCliFlag {
- fmt.Printf("Throughput is stable; beginning responsiveness testing.\n")
+ if direction.StableThroughput {
+ if *debugCliFlag {
+ fmt.Printf("Throughput is stable; beginning responsiveness testing.\n")
+ }
+ } else {
+ fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Making the test 15 seconds longer to calculate speculative RPM results.\n")
+ speculativeTimeoutDuration := time.Second * 15
+ speculativeAbsoluteTimeoutTime := time.Now().Add(speculativeTimeoutDuration)
+ timeoutChannel = timeoutat.TimeoutAt(
+ operatingCtx,
+ speculativeAbsoluteTimeoutTime,
+ debugLevel,
+ )
}
- } else {
- fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Adding 15 seconds to calculate speculative RPM results.\n")
- speculativeTimeoutDuration := time.Second * 15
- speculativeAbsoluteTimeoutTime := time.Now().Add(speculativeTimeoutDuration)
- timeoutChannel = timeoutat.TimeoutAt(
- operatingCtx,
- speculativeAbsoluteTimeoutTime,
- debugLevel,
- )
- }
- perDirectionSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
- perDirectionForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
+ // No matter what, we will stop adding additional load-generating connections!
+ reachWorkingConditionsCtxCancel()
- responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber(
- proberOperatorCtx,
- networkActivityCtx,
- generateForeignProbeConfiguration,
- generateSelfProbeConfiguration,
- &direction.Lgcc,
- &globalNumericBucketGenerator,
- direction.CreateLgdc().Direction(), // TODO: This could be better!
- specParameters.ProbeInterval,
- sslKeyFileConcurrentWriter,
- *calculateExtendedStats,
- direction.ProbeDebugging,
- )
+ direction.SelfRtts = series.NewWindowSeries[float64, uint64](series.Forever, 0)
+ direction.ForeignRtts = series.NewWindowSeries[float64, uint64](series.Forever, 0)
- responsiveness_timeout:
- for !direction.StableResponsiveness {
- select {
- case probeMeasurement := <-responsivenessStabilizationCommunicationChannel:
- {
- switch probeMeasurement.Type {
- case series.SeriesMessageReserve:
- {
- bucket := probeMeasurement.Bucket
- if *debugCliFlag {
- fmt.Printf(
- "%s: Reserving a responsiveness bucket with id %v.\n", direction.DirectionLabel, bucket)
- }
- responsivenessStabilizer.Reserve(bucket)
- selfRtts.Reserve(bucket)
- foreignRtts.Reserve(bucket)
- perDirectionForeignRtts.Reserve(bucket)
- perDirectionSelfRtts.Reserve(bucket)
- }
- case series.SeriesMessageMeasure:
- {
- bucket := probeMeasurement.Bucket
- measurement := utilities.GetSome(probeMeasurement.Measure)
- foreignDataPoint := measurement.Foreign
- selfDataPoint := measurement.Self
+ responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber(
+ proberOperatorCtx,
+ probeNetworkActivityCtx,
+ generateForeignProbeConfiguration,
+ generateSelfProbeConfiguration,
+ &direction.Lgcc,
+ &globalNumericBucketGenerator,
+ direction.CreateLgdc().Direction(), // TODO: This could be better!
+ specParameters.ProbeInterval,
+ sslKeyFileConcurrentWriter,
+ *calculateExtendedStats,
+ direction.ProbeDebugging,
+ )
- if *debugCliFlag {
- fmt.Printf(
- "%s: Filling a responsiveness bucket with id %v with value %v.\n",
- direction.DirectionLabel, bucket, measurement)
+ responsiveness_timeout:
+ for !direction.StableResponsiveness {
+ select {
+ case probeMeasurement := <-responsivenessStabilizationCommunicationChannel:
+ {
+ switch probeMeasurement.Type {
+ case series.SeriesMessageReserve:
+ {
+ bucket := probeMeasurement.Bucket
+ if *debugCliFlag {
+ fmt.Printf(
+ "%s: Reserving a responsiveness bucket with id %v.\n", direction.DirectionLabel, bucket)
+ }
+ responsivenessStabilizer.Reserve(bucket)
+ direction.ForeignRtts.Reserve(bucket)
+ direction.SelfRtts.Reserve(bucket)
}
- responsivenessStabilizer.AddMeasurement(bucket,
- (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds())
+ case series.SeriesMessageMeasure:
+ {
+ bucket := probeMeasurement.Bucket
+ measurement := utilities.GetSome(probeMeasurement.Measure)
+ foreignDataPoint := measurement.Foreign
+ selfDataPoint := measurement.Self
- if err := selfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil {
- fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (selfRtts)\n", bucket)
- }
- if err := perDirectionSelfRtts.Fill(bucket,
- selfDataPoint.Duration.Seconds()); err != nil {
- fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionSelfRtts)\n", bucket)
- }
+ if *debugCliFlag {
+ fmt.Printf(
+ "%s: Filling a responsiveness bucket with id %v with value %v.\n",
+ direction.DirectionLabel, bucket, measurement)
+ }
+ responsivenessStabilizer.AddMeasurement(bucket,
+ (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds())
- if err := foreignRtts.Fill(bucket, foreignDataPoint.Duration.Seconds()); err != nil {
- fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (foreignRtts)\n", bucket)
- }
+ if err := direction.SelfRtts.Fill(bucket,
+ selfDataPoint.Duration.Seconds()); err != nil {
+ fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionSelfRtts)\n", bucket)
+ }
- if err := perDirectionForeignRtts.Fill(bucket,
- foreignDataPoint.Duration.Seconds()); err != nil {
- fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionForeignRtts)\n", bucket)
- }
+ if err := direction.ForeignRtts.Fill(bucket,
+ foreignDataPoint.Duration.Seconds()); err != nil {
+ fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionForeignRtts)\n", bucket)
+ }
- if selfRttsQualityAttenuation != nil {
- selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds())
- }
+ if selfRttsQualityAttenuation != nil {
+ selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds())
+ }
- direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint)
- direction.SelfProbeDataLogger.LogRecord(*selfDataPoint)
+ direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint)
+ direction.SelfProbeDataLogger.LogRecord(*selfDataPoint)
+ }
}
}
- }
- case throughputMeasurement := <-lgStabilizationCommunicationChannel:
- {
- switch throughputMeasurement.Type {
- case series.SeriesMessageReserve:
- {
- // We are no longer tracking stability, so reservation messages are useless!
- if *debugCliFlag {
- fmt.Printf(
- "%s: Discarding a throughput bucket with id %v when ascertaining responsiveness.\n",
- direction.DirectionLabel, throughputMeasurement.Bucket)
+ case throughputMeasurement := <-lgStabilizationCommunicationChannel:
+ {
+ switch throughputMeasurement.Type {
+ case series.SeriesMessageReserve:
+ {
+ // We are no longer tracking stability, so reservation messages are useless!
+ if *debugCliFlag {
+ fmt.Printf(
+ "%s: Discarding a throughput bucket with id %v when ascertaining responsiveness.\n",
+ direction.DirectionLabel, throughputMeasurement.Bucket)
+ }
}
- }
- case series.SeriesMessageMeasure:
- {
- measurement := utilities.GetSome(throughputMeasurement.Measure)
+ case series.SeriesMessageMeasure:
+ {
+ measurement := utilities.GetSome(throughputMeasurement.Measure)
- if *debugCliFlag {
- fmt.Printf("Adding a throughput measurement (while ascertaining responsiveness).\n")
- }
- // There may be more than one round trip accumulated together. If that is the case,
- direction.ThroughputDataLogger.LogRecord(measurement)
- for _, v := range measurement.GranularThroughputDataPoints {
- v.Direction = direction.DirectionLabel
- direction.GranularThroughputDataLogger.LogRecord(v)
- }
+ if *debugCliFlag {
+ fmt.Printf("Adding a throughput measurement (while ascertaining responsiveness).\n")
+ }
+ // There may be more than one round trip accumulated together. If that is the case,
+ direction.ThroughputDataLogger.LogRecord(measurement)
+ for _, v := range measurement.GranularThroughputDataPoints {
+ v.Direction = direction.DirectionLabel
+ direction.GranularThroughputDataLogger.LogRecord(v)
+ }
- lastThroughputRate = measurement.Throughput
- lastThroughputOpenConnectionCount = measurement.Connections
+ lastThroughputRate = measurement.Throughput
+ lastThroughputOpenConnectionCount = measurement.Connections
+ }
}
}
- }
- case <-timeoutChannel:
- {
- break responsiveness_timeout
- }
- case <-stabilityCheckTimeChannel:
- {
- if *debugCliFlag {
- fmt.Printf(
- "%v responsiveness stability interval is complete.\n", direction.DirectionLabel)
+ case <-timeoutChannel:
+ {
+ break responsiveness_timeout
}
+ case <-stabilityCheckTimeChannel:
+ {
+ if *debugCliFlag {
+ fmt.Printf(
+ "%v responsiveness stability interval is complete.\n", direction.DirectionLabel)
+ }
- stabilityCheckTime = time.Now().Add(specParameters.EvalInterval)
- stabilityCheckTimeChannel = timeoutat.TimeoutAt(
- operatingCtx,
- stabilityCheckTime,
- debugLevel,
- )
+ stabilityCheckTime = time.Now().Add(specParameters.EvalInterval)
+ stabilityCheckTimeChannel = timeoutat.TimeoutAt(
+ operatingCtx,
+ stabilityCheckTime,
+ debugLevel,
+ )
- // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy
- // is *actually* important, but it can't hurt?
- direction.StableResponsiveness = responsivenessStabilizer.IsStable()
+ // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy
+ // is *actually* important, but it can't hurt?
+ direction.StableResponsiveness = responsivenessStabilizer.IsStable()
- if *debugCliFlag {
- fmt.Printf(
- "Responsiveness is instantaneously %s.\n",
- utilities.Conditional(direction.StableResponsiveness, "stable", "unstable"))
- }
+ if *debugCliFlag {
+ fmt.Printf(
+ "Responsiveness is instantaneously %s.\n",
+ utilities.Conditional(direction.StableResponsiveness, "stable", "unstable"))
+ }
- responsivenessStabilizer.Interval()
+ responsivenessStabilizer.Interval()
+ }
}
}
- }
-
- // Did the test run to stability?
- testRanToStability := direction.StableThroughput && direction.StableResponsiveness
-
- if *debugCliFlag {
- fmt.Printf("Stopping all the load generating data generators (stability: %s).\n",
- utilities.Conditional(testRanToStability, "success", "failure"))
- }
- /* At this point there are
- 1. Load generators running
- -- uploadLoadGeneratorOperatorCtx
- -- downloadLoadGeneratorOperatorCtx
- 2. Network connections opened by those load generators:
- -- lgNetworkActivityCtx
- 3. Probes
- -- proberCtx
- */
+ // Did the test run to stability?
+ testRanToStability := direction.StableThroughput && direction.StableResponsiveness
- // First, stop the load generator and the probe operators (but *not* the network activity)
- proberOperatorCtxCancel()
- throughputCtxCancel()
+ if *debugCliFlag {
+ fmt.Printf("Stopping all the load generating data generators (stability: %s).\n",
+ utilities.Conditional(testRanToStability, "success", "failure"))
+ }
- // Second, calculate the extended stats (if the user requested and they are available for the direction)
- extendedStats := extendedstats.AggregateExtendedStats{}
- if *calculateExtendedStats && direction.ExtendedStatsEligible {
- if extendedstats.ExtendedStatsAvailable() {
- func() {
- // Put inside an IIFE so that we can use a defer!
- direction.Lgcc.Lock.Lock()
- defer direction.Lgcc.Lock.Unlock()
+ /* At this point there are
+ 1. Load generators running
+ -- uploadLoadGeneratorOperatorCtx
+ -- downloadLoadGeneratorOperatorCtx
+ 2. Network connections opened by those load generators:
+ -- lgNetworkActivityCtx
+ 3. Probes
+ -- proberCtx
+ */
- lgcCount, err := direction.Lgcc.Len()
- if err != nil {
- fmt.Fprintf(
- os.Stderr,
- "Warning: Could not calculate the number of %v load-generating connections; aborting extended stats preparation.\n", direction.DirectionLabel,
- )
- return
- }
+ // First, stop the load generator and the probe operators (but *not* the network activity)
+ proberOperatorCtxCancel()
+ throughputOperatorCtxCancel()
- for i := 0; i < lgcCount; i++ {
- // Assume that extended statistics are available -- the check was done explicitly at
- // program startup if the calculateExtendedStats flag was set by the user on the command line.
- currentLgc, _ := direction.Lgcc.Get(i)
+ // Second, calculate the extended stats (if the user requested and they are available for the direction)
+ extendedStats := extendedstats.AggregateExtendedStats{}
+ if *calculateExtendedStats && direction.ExtendedStatsEligible {
+ if extendedstats.ExtendedStatsAvailable() {
+ func() {
+ // Put inside an IIFE so that we can use a defer!
+ direction.Lgcc.Lock.Lock()
+ defer direction.Lgcc.Lock.Unlock()
- if currentLgc == nil || (*currentLgc).Stats() == nil {
+ lgcCount, err := direction.Lgcc.Len()
+ if err != nil {
fmt.Fprintf(
os.Stderr,
- "Warning: Could not add extended stats for the connection: The LGC was nil or there were no stats available.\n",
+ "Warning: Could not calculate the number of %v load-generating connections; aborting extended stats preparation.\n", direction.DirectionLabel,
)
- continue
+ return
}
- if err := extendedStats.IncorporateConnectionStats(
- (*currentLgc).Stats().ConnInfo.Conn); err != nil {
- fmt.Fprintf(
- os.Stderr,
- "Warning: Could not add extended stats for the connection: %v.\n",
- err,
- )
+
+ for i := 0; i < lgcCount; i++ {
+ // Assume that extended statistics are available -- the check was done explicitly at
+ // program startup if the calculateExtendedStats flag was set by the user on the command line.
+ currentLgc, _ := direction.Lgcc.Get(i)
+
+ if currentLgc == nil || (*currentLgc).Stats() == nil {
+ fmt.Fprintf(
+ os.Stderr,
+ "Warning: Could not add extended stats for the connection: The LGC was nil or there were no stats available.\n",
+ )
+ continue
+ }
+ if err := extendedStats.IncorporateConnectionStats(
+ (*currentLgc).Stats().ConnInfo.Conn); err != nil {
+ fmt.Fprintf(
+ os.Stderr,
+ "Warning: Could not add extended stats for the connection: %v.\n",
+ err,
+ )
+ }
}
- }
- }()
- } else {
- // TODO: Should we just log here?
- panic("Extended stats are not available but the user requested their calculation.")
+ }()
+ } else {
+ // TODO: Should we just log here?
+ panic("Extended stats are not available but the user requested their calculation.")
+ }
}
- }
- // Third, stop the network connections opened by the load generators and probers.
- networkActivityCtxCancel()
+ // *Always* stop the probers! But, conditionally stop the througput.
+ probeNetworkActivityCtxCancel()
+ if parallelTestExecutionPolicy != executor.Parallel {
+ if direction.ThroughputActivityCtxCancel == nil {
+ panic(fmt.Sprintf("The cancellation function for the %v direction's throughput is nil!", direction.DirectionLabel))
+ }
+ (*direction.ThroughputActivityCtxCancel)()
+ }
- fmt.Printf(
- "%v: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
- direction.DirectionLabel,
- utilities.ToMbps(lastThroughputRate),
- utilities.ToMBps(lastThroughputRate),
- lastThroughputOpenConnectionCount,
- )
+ direction.FormattedResults += fmt.Sprintf(
+ "%v: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
+ direction.DirectionLabel,
+ utilities.ToMbps(lastThroughputRate),
+ utilities.ToMBps(lastThroughputRate),
+ lastThroughputOpenConnectionCount,
+ )
- if *calculateExtendedStats {
- fmt.Println(extendedStats.Repr())
- }
- directionResult := rpm.CalculateRpm(perDirectionSelfRtts, perDirectionForeignRtts,
- specParameters.TrimmedMeanPct, specParameters.Percentile)
- if *debugCliFlag {
- fmt.Printf("(%s RPM Calculation stats): %v\n", direction.DirectionLabel, directionResult.ToString())
- }
- if *printQualityAttenuation {
- fmt.Println("Quality Attenuation Statistics:")
- fmt.Printf(
- `Number of losses: %d
-Number of samples: %d
-Min: %.6f s
-Max: %.6f s
-Mean: %.6f s
-Variance: %.6f s
-Standard Deviation: %.6f s
-PDV(90): %.6f s
-PDV(99): %.6f s
-P(90): %.6f s
-P(99): %.6f s
-RPM: %.0f
-Gaming QoO: %.0f
+ if *calculateExtendedStats {
+ direction.FormattedResults += fmt.Sprintf("%v\n", extendedStats.Repr())
+ }
+ directionResult := rpm.CalculateRpm(direction.SelfRtts, direction.ForeignRtts,
+ specParameters.TrimmedMeanPct, specParameters.Percentile)
+ if *debugCliFlag {
+ direction.FormattedResults += fmt.Sprintf("(%s RPM Calculation stats): %v\n",
+ direction.DirectionLabel, directionResult.ToString())
+ }
+ if *printQualityAttenuation {
+ direction.FormattedResults += "Quality Attenuation Statistics:\n"
+ direction.FormattedResults += fmt.Sprintf(
+ ` Number of losses: %d
+ Number of samples: %d
+ Min: %.6f s
+ Max: %.6f s
+ Mean: %.6f s
+ Variance: %.6f s
+ Standard Deviation: %.6f s
+ PDV(90): %.6f s
+ PDV(99): %.6f s
+ P(90): %.6f s
+ P(99): %.6f s
+ RPM: %.0f
+ Gaming QoO: %.0f
`, selfRttsQualityAttenuation.GetNumberOfLosses(),
- selfRttsQualityAttenuation.GetNumberOfSamples(),
- selfRttsQualityAttenuation.GetMinimum(),
- selfRttsQualityAttenuation.GetMaximum(),
- selfRttsQualityAttenuation.GetAverage(),
- selfRttsQualityAttenuation.GetVariance(),
- selfRttsQualityAttenuation.GetStandardDeviation(),
- selfRttsQualityAttenuation.GetPDV(90),
- selfRttsQualityAttenuation.GetPDV(99),
- selfRttsQualityAttenuation.GetPercentile(90),
- selfRttsQualityAttenuation.GetPercentile(99),
- selfRttsQualityAttenuation.GetRPM(),
- selfRttsQualityAttenuation.GetGamingQoO())
- }
+ selfRttsQualityAttenuation.GetNumberOfSamples(),
+ selfRttsQualityAttenuation.GetMinimum(),
+ selfRttsQualityAttenuation.GetMaximum(),
+ selfRttsQualityAttenuation.GetAverage(),
+ selfRttsQualityAttenuation.GetVariance(),
+ selfRttsQualityAttenuation.GetStandardDeviation(),
+ selfRttsQualityAttenuation.GetPDV(90),
+ selfRttsQualityAttenuation.GetPDV(99),
+ selfRttsQualityAttenuation.GetPercentile(90),
+ selfRttsQualityAttenuation.GetPercentile(99),
+ selfRttsQualityAttenuation.GetRPM(),
+ selfRttsQualityAttenuation.GetGamingQoO())
+ }
- if !testRanToStability {
- fmt.Printf("Test did not run to stability, these results are estimates:\n")
- }
+ if !testRanToStability {
+ direction.FormattedResults += "Test did not run to stability, these results are estimates:\n"
+ }
+
+ direction.FormattedResults += fmt.Sprintf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel,
+ directionResult.PNRpm, specParameters.Percentile)
+ direction.FormattedResults += fmt.Sprintf(
+ "%s RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel,
+ directionResult.MeanRpm, specParameters.TrimmedMeanPct)
+
+ if len(*prometheusStatsFilename) > 0 {
+ var testStable int
+ if testRanToStability {
+ testStable = 1
+ }
+ var buffer bytes.Buffer
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_test_stable %d\n",
+ strings.ToLower(direction.DirectionLabel), testStable))
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_p90_rpm_value %d\n",
+ strings.ToLower(direction.DirectionLabel), int64(directionResult.PNRpm)))
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_trimmed_rpm_value %d\n",
+ strings.ToLower(direction.DirectionLabel),
+ int64(directionResult.MeanRpm)))
- fmt.Printf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, directionResult.PNRpm, specParameters.Percentile)
- fmt.Printf("%s RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel,
- directionResult.MeanRpm, specParameters.TrimmedMeanPct)
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_bits_per_second %d\n",
+ strings.ToLower(direction.DirectionLabel), int64(lastThroughputRate)))
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_connections %d\n",
+ strings.ToLower(direction.DirectionLabel),
+ int64(lastThroughputOpenConnectionCount)))
- if len(*prometheusStatsFilename) > 0 {
- var testStable int
- if testRanToStability {
- testStable = 1
+ if err := os.WriteFile(*prometheusStatsFilename, buffer.Bytes(), 0o644); err != nil {
+ fmt.Printf("could not write %s: %s", *prometheusStatsFilename, err)
+ os.Exit(1)
+ }
}
- var buffer bytes.Buffer
- buffer.WriteString(fmt.Sprintf("networkquality_%v_test_stable %d\n",
- strings.ToLower(direction.DirectionLabel), testStable))
- buffer.WriteString(fmt.Sprintf("networkquality_%v_p90_rpm_value %d\n",
- strings.ToLower(direction.DirectionLabel), int64(directionResult.PNRpm)))
- buffer.WriteString(fmt.Sprintf("networkquality_%v_trimmed_rpm_value %d\n",
- strings.ToLower(direction.DirectionLabel),
- int64(directionResult.MeanRpm)))
- buffer.WriteString(fmt.Sprintf("networkquality_%v_bits_per_second %d\n",
- strings.ToLower(direction.DirectionLabel), int64(lastThroughputRate)))
- buffer.WriteString(fmt.Sprintf("networkquality_%v_connections %d\n",
- strings.ToLower(direction.DirectionLabel),
- int64(lastThroughputOpenConnectionCount)))
+ direction.ThroughputDataLogger.Export()
+ if *debugCliFlag {
+ fmt.Printf("Closing the %v throughput data logger.\n", direction.DirectionLabel)
+ }
+ direction.ThroughputDataLogger.Close()
- if err := os.WriteFile(*prometheusStatsFilename, buffer.Bytes(), 0o644); err != nil {
- fmt.Printf("could not write %s: %s", *prometheusStatsFilename, err)
- os.Exit(1)
+ direction.GranularThroughputDataLogger.Export()
+ if *debugCliFlag {
+ fmt.Printf("Closing the %v granular throughput data logger.\n", direction.DirectionLabel)
}
- }
+ direction.GranularThroughputDataLogger.Close()
- direction.ThroughputDataLogger.Export()
- if *debugCliFlag {
- fmt.Printf("Closing the %v throughput data logger.\n", direction.DirectionLabel)
+ if *debugCliFlag {
+ fmt.Printf("In debugging mode, we will cool down between tests.\n")
+ time.Sleep(constants.CooldownPeriod)
+ fmt.Printf("Done cooling down.\n")
+ }
}
- direction.ThroughputDataLogger.Close()
+ directionExecutionUnits = append(directionExecutionUnits, directionExecutionUnit)
+ } // End of direction testing.
- direction.GranularThroughputDataLogger.Export()
- if *debugCliFlag {
- fmt.Printf("Closing the %v granular throughput data logger.\n", direction.DirectionLabel)
- }
- direction.GranularThroughputDataLogger.Close()
+ waiter := executor.Execute(parallelTestExecutionPolicy, directionExecutionUnits)
+ waiter.Wait()
- if *debugCliFlag {
- fmt.Printf("In debugging mode, we will cool down between tests.\n")
- time.Sleep(constants.CooldownPeriod)
- fmt.Printf("Done cooling down.\n")
+ // If we were testing in parallel mode, then the throughputs for each direction are still
+ // running. We left them running in case one of the directions reached stability before the
+ // other!
+ if parallelTestExecutionPolicy == executor.Parallel {
+ for _, direction := range directions {
+ if *debugCliFlag {
+ fmt.Printf("Stopping the throughput connections for the %v test.\n", direction.DirectionLabel)
+ }
+ if direction.ThroughputActivityCtxCancel == nil {
+ panic(fmt.Sprintf("The cancellation function for the %v direction's throughput is nil!", direction.DirectionLabel))
+ }
+ if (*direction.ThroughputActivityCtx).Err() != nil {
+ fmt.Fprintf(os.Stderr, "Warning: The throughput for the %v direction was already cancelled but should have been ongoing.\n", direction.DirectionLabel)
+ continue
+ }
+ (*direction.ThroughputActivityCtxCancel)()
+ }
+ } else {
+ for _, direction := range directions {
+ if direction.ThroughputActivityCtxCancel == nil {
+ panic(fmt.Sprintf("The cancellation function for the %v direction's throughput is nil!", direction.DirectionLabel))
+ }
+ if (*direction.ThroughputActivityCtx).Err() == nil {
+ fmt.Fprintf(os.Stderr, "Warning: The throughput for the %v direction should have already been stopped but it was not.\n", direction.DirectionLabel)
+ }
}
}
- result := rpm.CalculateRpm(selfRtts, foreignRtts, specParameters.TrimmedMeanPct, specParameters.Percentile)
+ fmt.Printf("Results:\n")
+ fmt.Printf("========\n")
+ // Print out the formatted results from each of the directions.
+ for _, direction := range directions {
+ fmt.Print(direction.FormattedResults)
+ fmt.Printf("========\n")
+ }
+
+ allSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
+ allForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
+
+ allSelfRtts.Append(&downloadDirection.SelfRtts)
+ allSelfRtts.Append(&uploadDirection.SelfRtts)
+ allForeignRtts.Append(&downloadDirection.ForeignRtts)
+ allForeignRtts.Append(&uploadDirection.ForeignRtts)
+
+ result := rpm.CalculateRpm(allSelfRtts, allForeignRtts, specParameters.TrimmedMeanPct, specParameters.Percentile)
if *debugCliFlag {
fmt.Printf("(Final RPM Calculation stats): %v\n", result.ToString())