diff options
Diffstat (limited to 'networkQuality.go')
| -rw-r--r-- | networkQuality.go | 844 |
1 files changed, 455 insertions, 389 deletions
diff --git a/networkQuality.go b/networkQuality.go index 340a316..2e13893 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -31,6 +31,7 @@ import ( "github.com/network-quality/goresponsiveness/datalogger" "github.com/network-quality/goresponsiveness/debug" "github.com/network-quality/goresponsiveness/direction" + "github.com/network-quality/goresponsiveness/executor" "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" "github.com/network-quality/goresponsiveness/probe" @@ -167,9 +168,20 @@ var ( ) withL4S = flag.Bool("with-l4s", false, "Use L4S (with default TCP prague congestion control algorithm.)") withL4SAlgorithm = flag.String("with-l4s-algorithm", "", "Use L4S (with specified congestion control algorithm.)") + + parallelTestExecutionPolicy = constants.DefaultTestExecutionPolicy ) func main() { + // Add one final command-line argument + flag.BoolFunc("rpm.parallel", "Parallel test execution policy.", func(value string) error { + if value != "true" { + return fmt.Errorf("-parallel can only be used to enable parallel test execution policy") + } + parallelTestExecutionPolicy = executor.Parallel + return nil + }) + flag.Parse() if *showVersion { @@ -184,7 +196,7 @@ func main() { } specParameters, err := rpm.SpecParametersFromArguments(*rpmtimeout, *rpmmad, *rpmid, - *rpmtmp, *rpmsdt, *rpmmnp, *rpmmps, *rpmptc, *rpmp) + *rpmtmp, *rpmsdt, *rpmmnp, *rpmmps, *rpmptc, *rpmp, parallelTestExecutionPolicy) if err != nil { fmt.Fprintf( os.Stderr, @@ -639,479 +651,533 @@ func main() { } - // All tests will accumulate data to these series because it will all matter for RPM calculation! - selfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) - foreignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) - var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil if *printQualityAttenuation { selfRttsQualityAttenuation = qualityattenuation.NewSimpleQualityAttenuation() } + directionExecutionUnits := make([]executor.ExecutionUnit, 0) + for _, direction := range directions { + // Make a copy here to make sure that we do not get go-wierdness in our closure (see https://github.com/golang/go/discussions/56010). + direction := direction - timeoutDuration := specParameters.TestTimeout - timeoutAbsoluteTime := time.Now().Add(timeoutDuration) + directionExecutionUnit := func() { + timeoutDuration := specParameters.TestTimeout + timeoutAbsoluteTime := time.Now().Add(timeoutDuration) - timeoutChannel := timeoutat.TimeoutAt( - operatingCtx, - timeoutAbsoluteTime, - debugLevel, - ) - if debug.IsDebug(debugLevel) { - fmt.Printf("%s Test will end no later than %v\n", direction.DirectionLabel, timeoutAbsoluteTime) - } + timeoutChannel := timeoutat.TimeoutAt( + operatingCtx, + timeoutAbsoluteTime, + debugLevel, + ) + if debug.IsDebug(debugLevel) { + fmt.Printf("%s Test will end no later than %v\n", + direction.DirectionLabel, timeoutAbsoluteTime) + } - throughputCtx, throughputCtxCancel := context.WithCancel(operatingCtx) - proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx) + throughputOperatorCtx, throughputOperatorCtxCancel := context.WithCancel(operatingCtx) + proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx) - // This context is used to control the network activity (i.e., it controls all - // the connections that are open to do load generation and probing). Cancelling this context will close - // all the network connections that are responsible for generating the load. - networkActivityCtx, networkActivityCtxCancel := context.WithCancel(operatingCtx) + // This context is used to control the network activity (i.e., it controls all + // the connections that are open to do load generation and probing). Cancelling this context will close + // all the network connections that are responsible for generating the load. + probeNetworkActivityCtx, probeNetworkActivityCtxCancel := context.WithCancel(operatingCtx) + throughputCtx, throughputCtxCancel := context.WithCancel(operatingCtx) + direction.ThroughputActivityCtx, direction.ThroughputActivityCtxCancel = &throughputCtx, &throughputCtxCancel - throughputGeneratorCtx, throughputGeneratorCtxCancel := context.WithCancel(throughputCtx) + reachWorkingConditionsCtx, reachWorkingConditionsCtxCancel := + context.WithCancel(throughputOperatorCtx) - lgStabilizationCommunicationChannel := rpm.LoadGenerator( - throughputCtx, - networkActivityCtx, - throughputGeneratorCtx, - specParameters.EvalInterval, - direction.CreateLgdc, - &direction.Lgcc, - &globalNumericBucketGenerator, - specParameters.MaxParallelConns, - *calculateExtendedStats, - direction.DirectionDebugging, - ) + lgStabilizationCommunicationChannel := rpm.LoadGenerator( + throughputOperatorCtx, + *direction.ThroughputActivityCtx, + reachWorkingConditionsCtx, + specParameters.EvalInterval, + direction.CreateLgdc, + &direction.Lgcc, + &globalNumericBucketGenerator, + specParameters.MaxParallelConns, + *calculateExtendedStats, + direction.DirectionDebugging, + ) - throughputStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, - fmt.Sprintf("%v Throughput Stabilizer", direction.DirectionLabel)) - downloadThroughputStabilizerDebugLevel := debug.Error - if *debugCliFlag { - downloadThroughputStabilizerDebugLevel = debug.Debug - } - throughputStabilizer := stabilizer.NewStabilizer[float64, uint64]( - specParameters.MovingAvgDist, specParameters.StdDevTolerance, 0, "bytes", - downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig) + throughputStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, + fmt.Sprintf("%v Throughput Stabilizer", direction.DirectionLabel)) + downloadThroughputStabilizerDebugLevel := debug.Error + if *debugCliFlag { + downloadThroughputStabilizerDebugLevel = debug.Debug + } + throughputStabilizer := stabilizer.NewStabilizer[float64, uint64]( + specParameters.MovingAvgDist, specParameters.StdDevTolerance, 0, "bytes", + downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig) - responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, - fmt.Sprintf("%v Responsiveness Stabilizer", direction.DirectionLabel)) - responsivenessStabilizerDebugLevel := debug.Error - if *debugCliFlag { - responsivenessStabilizerDebugLevel = debug.Debug - } - responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint64]( - specParameters.MovingAvgDist, specParameters.StdDevTolerance, - specParameters.TrimmedMeanPct, "milliseconds", - responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig) + responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, + fmt.Sprintf("%v Responsiveness Stabilizer", direction.DirectionLabel)) + responsivenessStabilizerDebugLevel := debug.Error + if *debugCliFlag { + responsivenessStabilizerDebugLevel = debug.Debug + } + responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint64]( + specParameters.MovingAvgDist, specParameters.StdDevTolerance, + specParameters.TrimmedMeanPct, "milliseconds", + responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig) - // For later debugging output, record the last throughputs on load-generating connectings - // and the number of open connections. - lastThroughputRate := float64(0) - lastThroughputOpenConnectionCount := int(0) + // For later debugging output, record the last throughputs on load-generating connectings + // and the number of open connections. + lastThroughputRate := float64(0) + lastThroughputOpenConnectionCount := int(0) - stabilityCheckTime := time.Now().Add(specParameters.EvalInterval) - stabilityCheckTimeChannel := timeoutat.TimeoutAt( - operatingCtx, - stabilityCheckTime, - debugLevel, - ) + stabilityCheckTime := time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel := timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) - lg_timeout: - for !direction.StableThroughput { - select { - case throughputMeasurement := <-lgStabilizationCommunicationChannel: - { - switch throughputMeasurement.Type { - case series.SeriesMessageReserve: - { - throughputStabilizer.Reserve(throughputMeasurement.Bucket) - if *debugCliFlag { - fmt.Printf( - "%s: Reserving a throughput bucket with id %v.\n", - direction.DirectionLabel, throughputMeasurement.Bucket) + lg_timeout: + for !direction.StableThroughput { + select { + case throughputMeasurement := <-lgStabilizationCommunicationChannel: + { + switch throughputMeasurement.Type { + case series.SeriesMessageReserve: + { + throughputStabilizer.Reserve(throughputMeasurement.Bucket) + if *debugCliFlag { + fmt.Printf( + "%s: Reserving a throughput bucket with id %v.\n", + direction.DirectionLabel, throughputMeasurement.Bucket) + } } - } - case series.SeriesMessageMeasure: - { - bucket := throughputMeasurement.Bucket - measurement := utilities.GetSome(throughputMeasurement.Measure) + case series.SeriesMessageMeasure: + { + bucket := throughputMeasurement.Bucket + measurement := utilities.GetSome(throughputMeasurement.Measure) - throughputStabilizer.AddMeasurement(bucket, measurement.Throughput) + throughputStabilizer.AddMeasurement(bucket, measurement.Throughput) - direction.ThroughputDataLogger.LogRecord(measurement) - for _, v := range measurement.GranularThroughputDataPoints { - v.Direction = "Download" - direction.GranularThroughputDataLogger.LogRecord(v) - } + direction.ThroughputDataLogger.LogRecord(measurement) + for _, v := range measurement.GranularThroughputDataPoints { + v.Direction = "Download" + direction.GranularThroughputDataLogger.LogRecord(v) + } - lastThroughputRate = measurement.Throughput - lastThroughputOpenConnectionCount = measurement.Connections + lastThroughputRate = measurement.Throughput + lastThroughputOpenConnectionCount = measurement.Connections + } } } - } - case <-stabilityCheckTimeChannel: - { - if *debugCliFlag { - fmt.Printf( - "%v throughput stability interval is complete.\n", direction.DirectionLabel) - } - stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) - stabilityCheckTimeChannel = timeoutat.TimeoutAt( - operatingCtx, - stabilityCheckTime, - debugLevel, - ) + case <-stabilityCheckTimeChannel: + { + if *debugCliFlag { + fmt.Printf( + "%v throughput stability interval is complete.\n", direction.DirectionLabel) + } + stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel = timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) - direction.StableThroughput = throughputStabilizer.IsStable() - if *debugCliFlag { - fmt.Printf( - "%v is instantaneously %s.\n", direction.DirectionLabel, - utilities.Conditional(direction.StableThroughput, "stable", "unstable")) - } + direction.StableThroughput = throughputStabilizer.IsStable() + if *debugCliFlag { + fmt.Printf( + "%v is instantaneously %s.\n", direction.DirectionLabel, + utilities.Conditional(direction.StableThroughput, "stable", "unstable")) + } - if direction.StableThroughput { - throughputGeneratorCtxCancel() + throughputStabilizer.Interval() + } + case <-timeoutChannel: + { + break lg_timeout } - throughputStabilizer.Interval() - } - case <-timeoutChannel: - { - break lg_timeout } } - } - if direction.StableThroughput { - if *debugCliFlag { - fmt.Printf("Throughput is stable; beginning responsiveness testing.\n") + if direction.StableThroughput { + if *debugCliFlag { + fmt.Printf("Throughput is stable; beginning responsiveness testing.\n") + } + } else { + fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Making the test 15 seconds longer to calculate speculative RPM results.\n") + speculativeTimeoutDuration := time.Second * 15 + speculativeAbsoluteTimeoutTime := time.Now().Add(speculativeTimeoutDuration) + timeoutChannel = timeoutat.TimeoutAt( + operatingCtx, + speculativeAbsoluteTimeoutTime, + debugLevel, + ) } - } else { - fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Adding 15 seconds to calculate speculative RPM results.\n") - speculativeTimeoutDuration := time.Second * 15 - speculativeAbsoluteTimeoutTime := time.Now().Add(speculativeTimeoutDuration) - timeoutChannel = timeoutat.TimeoutAt( - operatingCtx, - speculativeAbsoluteTimeoutTime, - debugLevel, - ) - } - perDirectionSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) - perDirectionForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) + // No matter what, we will stop adding additional load-generating connections! + reachWorkingConditionsCtxCancel() - responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber( - proberOperatorCtx, - networkActivityCtx, - generateForeignProbeConfiguration, - generateSelfProbeConfiguration, - &direction.Lgcc, - &globalNumericBucketGenerator, - direction.CreateLgdc().Direction(), // TODO: This could be better! - specParameters.ProbeInterval, - sslKeyFileConcurrentWriter, - *calculateExtendedStats, - direction.ProbeDebugging, - ) + direction.SelfRtts = series.NewWindowSeries[float64, uint64](series.Forever, 0) + direction.ForeignRtts = series.NewWindowSeries[float64, uint64](series.Forever, 0) - responsiveness_timeout: - for !direction.StableResponsiveness { - select { - case probeMeasurement := <-responsivenessStabilizationCommunicationChannel: - { - switch probeMeasurement.Type { - case series.SeriesMessageReserve: - { - bucket := probeMeasurement.Bucket - if *debugCliFlag { - fmt.Printf( - "%s: Reserving a responsiveness bucket with id %v.\n", direction.DirectionLabel, bucket) - } - responsivenessStabilizer.Reserve(bucket) - selfRtts.Reserve(bucket) - foreignRtts.Reserve(bucket) - perDirectionForeignRtts.Reserve(bucket) - perDirectionSelfRtts.Reserve(bucket) - } - case series.SeriesMessageMeasure: - { - bucket := probeMeasurement.Bucket - measurement := utilities.GetSome(probeMeasurement.Measure) - foreignDataPoint := measurement.Foreign - selfDataPoint := measurement.Self + responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber( + proberOperatorCtx, + probeNetworkActivityCtx, + generateForeignProbeConfiguration, + generateSelfProbeConfiguration, + &direction.Lgcc, + &globalNumericBucketGenerator, + direction.CreateLgdc().Direction(), // TODO: This could be better! + specParameters.ProbeInterval, + sslKeyFileConcurrentWriter, + *calculateExtendedStats, + direction.ProbeDebugging, + ) - if *debugCliFlag { - fmt.Printf( - "%s: Filling a responsiveness bucket with id %v with value %v.\n", - direction.DirectionLabel, bucket, measurement) + responsiveness_timeout: + for !direction.StableResponsiveness { + select { + case probeMeasurement := <-responsivenessStabilizationCommunicationChannel: + { + switch probeMeasurement.Type { + case series.SeriesMessageReserve: + { + bucket := probeMeasurement.Bucket + if *debugCliFlag { + fmt.Printf( + "%s: Reserving a responsiveness bucket with id %v.\n", direction.DirectionLabel, bucket) + } + responsivenessStabilizer.Reserve(bucket) + direction.ForeignRtts.Reserve(bucket) + direction.SelfRtts.Reserve(bucket) } - responsivenessStabilizer.AddMeasurement(bucket, - (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds()) + case series.SeriesMessageMeasure: + { + bucket := probeMeasurement.Bucket + measurement := utilities.GetSome(probeMeasurement.Measure) + foreignDataPoint := measurement.Foreign + selfDataPoint := measurement.Self - if err := selfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil { - fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (selfRtts)\n", bucket) - } - if err := perDirectionSelfRtts.Fill(bucket, - selfDataPoint.Duration.Seconds()); err != nil { - fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionSelfRtts)\n", bucket) - } + if *debugCliFlag { + fmt.Printf( + "%s: Filling a responsiveness bucket with id %v with value %v.\n", + direction.DirectionLabel, bucket, measurement) + } + responsivenessStabilizer.AddMeasurement(bucket, + (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds()) - if err := foreignRtts.Fill(bucket, foreignDataPoint.Duration.Seconds()); err != nil { - fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (foreignRtts)\n", bucket) - } + if err := direction.SelfRtts.Fill(bucket, + selfDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionSelfRtts)\n", bucket) + } - if err := perDirectionForeignRtts.Fill(bucket, - foreignDataPoint.Duration.Seconds()); err != nil { - fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionForeignRtts)\n", bucket) - } + if err := direction.ForeignRtts.Fill(bucket, + foreignDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionForeignRtts)\n", bucket) + } - if selfRttsQualityAttenuation != nil { - selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds()) - } + if selfRttsQualityAttenuation != nil { + selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds()) + } - direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint) - direction.SelfProbeDataLogger.LogRecord(*selfDataPoint) + direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint) + direction.SelfProbeDataLogger.LogRecord(*selfDataPoint) + } } } - } - case throughputMeasurement := <-lgStabilizationCommunicationChannel: - { - switch throughputMeasurement.Type { - case series.SeriesMessageReserve: - { - // We are no longer tracking stability, so reservation messages are useless! - if *debugCliFlag { - fmt.Printf( - "%s: Discarding a throughput bucket with id %v when ascertaining responsiveness.\n", - direction.DirectionLabel, throughputMeasurement.Bucket) + case throughputMeasurement := <-lgStabilizationCommunicationChannel: + { + switch throughputMeasurement.Type { + case series.SeriesMessageReserve: + { + // We are no longer tracking stability, so reservation messages are useless! + if *debugCliFlag { + fmt.Printf( + "%s: Discarding a throughput bucket with id %v when ascertaining responsiveness.\n", + direction.DirectionLabel, throughputMeasurement.Bucket) + } } - } - case series.SeriesMessageMeasure: - { - measurement := utilities.GetSome(throughputMeasurement.Measure) + case series.SeriesMessageMeasure: + { + measurement := utilities.GetSome(throughputMeasurement.Measure) - if *debugCliFlag { - fmt.Printf("Adding a throughput measurement (while ascertaining responsiveness).\n") - } - // There may be more than one round trip accumulated together. If that is the case, - direction.ThroughputDataLogger.LogRecord(measurement) - for _, v := range measurement.GranularThroughputDataPoints { - v.Direction = direction.DirectionLabel - direction.GranularThroughputDataLogger.LogRecord(v) - } + if *debugCliFlag { + fmt.Printf("Adding a throughput measurement (while ascertaining responsiveness).\n") + } + // There may be more than one round trip accumulated together. If that is the case, + direction.ThroughputDataLogger.LogRecord(measurement) + for _, v := range measurement.GranularThroughputDataPoints { + v.Direction = direction.DirectionLabel + direction.GranularThroughputDataLogger.LogRecord(v) + } - lastThroughputRate = measurement.Throughput - lastThroughputOpenConnectionCount = measurement.Connections + lastThroughputRate = measurement.Throughput + lastThroughputOpenConnectionCount = measurement.Connections + } } } - } - case <-timeoutChannel: - { - break responsiveness_timeout - } - case <-stabilityCheckTimeChannel: - { - if *debugCliFlag { - fmt.Printf( - "%v responsiveness stability interval is complete.\n", direction.DirectionLabel) + case <-timeoutChannel: + { + break responsiveness_timeout } + case <-stabilityCheckTimeChannel: + { + if *debugCliFlag { + fmt.Printf( + "%v responsiveness stability interval is complete.\n", direction.DirectionLabel) + } - stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) - stabilityCheckTimeChannel = timeoutat.TimeoutAt( - operatingCtx, - stabilityCheckTime, - debugLevel, - ) + stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel = timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) - // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy - // is *actually* important, but it can't hurt? - direction.StableResponsiveness = responsivenessStabilizer.IsStable() + // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy + // is *actually* important, but it can't hurt? + direction.StableResponsiveness = responsivenessStabilizer.IsStable() - if *debugCliFlag { - fmt.Printf( - "Responsiveness is instantaneously %s.\n", - utilities.Conditional(direction.StableResponsiveness, "stable", "unstable")) - } + if *debugCliFlag { + fmt.Printf( + "Responsiveness is instantaneously %s.\n", + utilities.Conditional(direction.StableResponsiveness, "stable", "unstable")) + } - responsivenessStabilizer.Interval() + responsivenessStabilizer.Interval() + } } } - } - - // Did the test run to stability? - testRanToStability := direction.StableThroughput && direction.StableResponsiveness - - if *debugCliFlag { - fmt.Printf("Stopping all the load generating data generators (stability: %s).\n", - utilities.Conditional(testRanToStability, "success", "failure")) - } - /* At this point there are - 1. Load generators running - -- uploadLoadGeneratorOperatorCtx - -- downloadLoadGeneratorOperatorCtx - 2. Network connections opened by those load generators: - -- lgNetworkActivityCtx - 3. Probes - -- proberCtx - */ + // Did the test run to stability? + testRanToStability := direction.StableThroughput && direction.StableResponsiveness - // First, stop the load generator and the probe operators (but *not* the network activity) - proberOperatorCtxCancel() - throughputCtxCancel() + if *debugCliFlag { + fmt.Printf("Stopping all the load generating data generators (stability: %s).\n", + utilities.Conditional(testRanToStability, "success", "failure")) + } - // Second, calculate the extended stats (if the user requested and they are available for the direction) - extendedStats := extendedstats.AggregateExtendedStats{} - if *calculateExtendedStats && direction.ExtendedStatsEligible { - if extendedstats.ExtendedStatsAvailable() { - func() { - // Put inside an IIFE so that we can use a defer! - direction.Lgcc.Lock.Lock() - defer direction.Lgcc.Lock.Unlock() + /* At this point there are + 1. Load generators running + -- uploadLoadGeneratorOperatorCtx + -- downloadLoadGeneratorOperatorCtx + 2. Network connections opened by those load generators: + -- lgNetworkActivityCtx + 3. Probes + -- proberCtx + */ - lgcCount, err := direction.Lgcc.Len() - if err != nil { - fmt.Fprintf( - os.Stderr, - "Warning: Could not calculate the number of %v load-generating connections; aborting extended stats preparation.\n", direction.DirectionLabel, - ) - return - } + // First, stop the load generator and the probe operators (but *not* the network activity) + proberOperatorCtxCancel() + throughputOperatorCtxCancel() - for i := 0; i < lgcCount; i++ { - // Assume that extended statistics are available -- the check was done explicitly at - // program startup if the calculateExtendedStats flag was set by the user on the command line. - currentLgc, _ := direction.Lgcc.Get(i) + // Second, calculate the extended stats (if the user requested and they are available for the direction) + extendedStats := extendedstats.AggregateExtendedStats{} + if *calculateExtendedStats && direction.ExtendedStatsEligible { + if extendedstats.ExtendedStatsAvailable() { + func() { + // Put inside an IIFE so that we can use a defer! + direction.Lgcc.Lock.Lock() + defer direction.Lgcc.Lock.Unlock() - if currentLgc == nil || (*currentLgc).Stats() == nil { + lgcCount, err := direction.Lgcc.Len() + if err != nil { fmt.Fprintf( os.Stderr, - "Warning: Could not add extended stats for the connection: The LGC was nil or there were no stats available.\n", + "Warning: Could not calculate the number of %v load-generating connections; aborting extended stats preparation.\n", direction.DirectionLabel, ) - continue + return } - if err := extendedStats.IncorporateConnectionStats( - (*currentLgc).Stats().ConnInfo.Conn); err != nil { - fmt.Fprintf( - os.Stderr, - "Warning: Could not add extended stats for the connection: %v.\n", - err, - ) + + for i := 0; i < lgcCount; i++ { + // Assume that extended statistics are available -- the check was done explicitly at + // program startup if the calculateExtendedStats flag was set by the user on the command line. + currentLgc, _ := direction.Lgcc.Get(i) + + if currentLgc == nil || (*currentLgc).Stats() == nil { + fmt.Fprintf( + os.Stderr, + "Warning: Could not add extended stats for the connection: The LGC was nil or there were no stats available.\n", + ) + continue + } + if err := extendedStats.IncorporateConnectionStats( + (*currentLgc).Stats().ConnInfo.Conn); err != nil { + fmt.Fprintf( + os.Stderr, + "Warning: Could not add extended stats for the connection: %v.\n", + err, + ) + } } - } - }() - } else { - // TODO: Should we just log here? - panic("Extended stats are not available but the user requested their calculation.") + }() + } else { + // TODO: Should we just log here? + panic("Extended stats are not available but the user requested their calculation.") + } } - } - // Third, stop the network connections opened by the load generators and probers. - networkActivityCtxCancel() + // *Always* stop the probers! But, conditionally stop the througput. + probeNetworkActivityCtxCancel() + if parallelTestExecutionPolicy != executor.Parallel { + if direction.ThroughputActivityCtxCancel == nil { + panic(fmt.Sprintf("The cancellation function for the %v direction's throughput is nil!", direction.DirectionLabel)) + } + (*direction.ThroughputActivityCtxCancel)() + } - fmt.Printf( - "%v: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", - direction.DirectionLabel, - utilities.ToMbps(lastThroughputRate), - utilities.ToMBps(lastThroughputRate), - lastThroughputOpenConnectionCount, - ) + direction.FormattedResults += fmt.Sprintf( + "%v: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", + direction.DirectionLabel, + utilities.ToMbps(lastThroughputRate), + utilities.ToMBps(lastThroughputRate), + lastThroughputOpenConnectionCount, + ) - if *calculateExtendedStats { - fmt.Println(extendedStats.Repr()) - } - directionResult := rpm.CalculateRpm(perDirectionSelfRtts, perDirectionForeignRtts, - specParameters.TrimmedMeanPct, specParameters.Percentile) - if *debugCliFlag { - fmt.Printf("(%s RPM Calculation stats): %v\n", direction.DirectionLabel, directionResult.ToString()) - } - if *printQualityAttenuation { - fmt.Println("Quality Attenuation Statistics:") - fmt.Printf( - `Number of losses: %d -Number of samples: %d -Min: %.6f s -Max: %.6f s -Mean: %.6f s -Variance: %.6f s -Standard Deviation: %.6f s -PDV(90): %.6f s -PDV(99): %.6f s -P(90): %.6f s -P(99): %.6f s -RPM: %.0f -Gaming QoO: %.0f + if *calculateExtendedStats { + direction.FormattedResults += fmt.Sprintf("%v\n", extendedStats.Repr()) + } + directionResult := rpm.CalculateRpm(direction.SelfRtts, direction.ForeignRtts, + specParameters.TrimmedMeanPct, specParameters.Percentile) + if *debugCliFlag { + direction.FormattedResults += fmt.Sprintf("(%s RPM Calculation stats): %v\n", + direction.DirectionLabel, directionResult.ToString()) + } + if *printQualityAttenuation { + direction.FormattedResults += "Quality Attenuation Statistics:\n" + direction.FormattedResults += fmt.Sprintf( + ` Number of losses: %d + Number of samples: %d + Min: %.6f s + Max: %.6f s + Mean: %.6f s + Variance: %.6f s + Standard Deviation: %.6f s + PDV(90): %.6f s + PDV(99): %.6f s + P(90): %.6f s + P(99): %.6f s + RPM: %.0f + Gaming QoO: %.0f `, selfRttsQualityAttenuation.GetNumberOfLosses(), - selfRttsQualityAttenuation.GetNumberOfSamples(), - selfRttsQualityAttenuation.GetMinimum(), - selfRttsQualityAttenuation.GetMaximum(), - selfRttsQualityAttenuation.GetAverage(), - selfRttsQualityAttenuation.GetVariance(), - selfRttsQualityAttenuation.GetStandardDeviation(), - selfRttsQualityAttenuation.GetPDV(90), - selfRttsQualityAttenuation.GetPDV(99), - selfRttsQualityAttenuation.GetPercentile(90), - selfRttsQualityAttenuation.GetPercentile(99), - selfRttsQualityAttenuation.GetRPM(), - selfRttsQualityAttenuation.GetGamingQoO()) - } + selfRttsQualityAttenuation.GetNumberOfSamples(), + selfRttsQualityAttenuation.GetMinimum(), + selfRttsQualityAttenuation.GetMaximum(), + selfRttsQualityAttenuation.GetAverage(), + selfRttsQualityAttenuation.GetVariance(), + selfRttsQualityAttenuation.GetStandardDeviation(), + selfRttsQualityAttenuation.GetPDV(90), + selfRttsQualityAttenuation.GetPDV(99), + selfRttsQualityAttenuation.GetPercentile(90), + selfRttsQualityAttenuation.GetPercentile(99), + selfRttsQualityAttenuation.GetRPM(), + selfRttsQualityAttenuation.GetGamingQoO()) + } - if !testRanToStability { - fmt.Printf("Test did not run to stability, these results are estimates:\n") - } + if !testRanToStability { + direction.FormattedResults += "Test did not run to stability, these results are estimates:\n" + } + + direction.FormattedResults += fmt.Sprintf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, + directionResult.PNRpm, specParameters.Percentile) + direction.FormattedResults += fmt.Sprintf( + "%s RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel, + directionResult.MeanRpm, specParameters.TrimmedMeanPct) + + if len(*prometheusStatsFilename) > 0 { + var testStable int + if testRanToStability { + testStable = 1 + } + var buffer bytes.Buffer + buffer.WriteString(fmt.Sprintf("networkquality_%v_test_stable %d\n", + strings.ToLower(direction.DirectionLabel), testStable)) + buffer.WriteString(fmt.Sprintf("networkquality_%v_p90_rpm_value %d\n", + strings.ToLower(direction.DirectionLabel), int64(directionResult.PNRpm))) + buffer.WriteString(fmt.Sprintf("networkquality_%v_trimmed_rpm_value %d\n", + strings.ToLower(direction.DirectionLabel), + int64(directionResult.MeanRpm))) - fmt.Printf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, directionResult.PNRpm, specParameters.Percentile) - fmt.Printf("%s RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel, - directionResult.MeanRpm, specParameters.TrimmedMeanPct) + buffer.WriteString(fmt.Sprintf("networkquality_%v_bits_per_second %d\n", + strings.ToLower(direction.DirectionLabel), int64(lastThroughputRate))) + buffer.WriteString(fmt.Sprintf("networkquality_%v_connections %d\n", + strings.ToLower(direction.DirectionLabel), + int64(lastThroughputOpenConnectionCount))) - if len(*prometheusStatsFilename) > 0 { - var testStable int - if testRanToStability { - testStable = 1 + if err := os.WriteFile(*prometheusStatsFilename, buffer.Bytes(), 0o644); err != nil { + fmt.Printf("could not write %s: %s", *prometheusStatsFilename, err) + os.Exit(1) + } } - var buffer bytes.Buffer - buffer.WriteString(fmt.Sprintf("networkquality_%v_test_stable %d\n", - strings.ToLower(direction.DirectionLabel), testStable)) - buffer.WriteString(fmt.Sprintf("networkquality_%v_p90_rpm_value %d\n", - strings.ToLower(direction.DirectionLabel), int64(directionResult.PNRpm))) - buffer.WriteString(fmt.Sprintf("networkquality_%v_trimmed_rpm_value %d\n", - strings.ToLower(direction.DirectionLabel), - int64(directionResult.MeanRpm))) - buffer.WriteString(fmt.Sprintf("networkquality_%v_bits_per_second %d\n", - strings.ToLower(direction.DirectionLabel), int64(lastThroughputRate))) - buffer.WriteString(fmt.Sprintf("networkquality_%v_connections %d\n", - strings.ToLower(direction.DirectionLabel), - int64(lastThroughputOpenConnectionCount))) + direction.ThroughputDataLogger.Export() + if *debugCliFlag { + fmt.Printf("Closing the %v throughput data logger.\n", direction.DirectionLabel) + } + direction.ThroughputDataLogger.Close() - if err := os.WriteFile(*prometheusStatsFilename, buffer.Bytes(), 0o644); err != nil { - fmt.Printf("could not write %s: %s", *prometheusStatsFilename, err) - os.Exit(1) + direction.GranularThroughputDataLogger.Export() + if *debugCliFlag { + fmt.Printf("Closing the %v granular throughput data logger.\n", direction.DirectionLabel) } - } + direction.GranularThroughputDataLogger.Close() - direction.ThroughputDataLogger.Export() - if *debugCliFlag { - fmt.Printf("Closing the %v throughput data logger.\n", direction.DirectionLabel) + if *debugCliFlag { + fmt.Printf("In debugging mode, we will cool down between tests.\n") + time.Sleep(constants.CooldownPeriod) + fmt.Printf("Done cooling down.\n") + } } - direction.ThroughputDataLogger.Close() + directionExecutionUnits = append(directionExecutionUnits, directionExecutionUnit) + } // End of direction testing. - direction.GranularThroughputDataLogger.Export() - if *debugCliFlag { - fmt.Printf("Closing the %v granular throughput data logger.\n", direction.DirectionLabel) - } - direction.GranularThroughputDataLogger.Close() + waiter := executor.Execute(parallelTestExecutionPolicy, directionExecutionUnits) + waiter.Wait() - if *debugCliFlag { - fmt.Printf("In debugging mode, we will cool down between tests.\n") - time.Sleep(constants.CooldownPeriod) - fmt.Printf("Done cooling down.\n") + // If we were testing in parallel mode, then the throughputs for each direction are still + // running. We left them running in case one of the directions reached stability before the + // other! + if parallelTestExecutionPolicy == executor.Parallel { + for _, direction := range directions { + if *debugCliFlag { + fmt.Printf("Stopping the throughput connections for the %v test.\n", direction.DirectionLabel) + } + if direction.ThroughputActivityCtxCancel == nil { + panic(fmt.Sprintf("The cancellation function for the %v direction's throughput is nil!", direction.DirectionLabel)) + } + if (*direction.ThroughputActivityCtx).Err() != nil { + fmt.Fprintf(os.Stderr, "Warning: The throughput for the %v direction was already cancelled but should have been ongoing.\n", direction.DirectionLabel) + continue + } + (*direction.ThroughputActivityCtxCancel)() + } + } else { + for _, direction := range directions { + if direction.ThroughputActivityCtxCancel == nil { + panic(fmt.Sprintf("The cancellation function for the %v direction's throughput is nil!", direction.DirectionLabel)) + } + if (*direction.ThroughputActivityCtx).Err() == nil { + fmt.Fprintf(os.Stderr, "Warning: The throughput for the %v direction should have already been stopped but it was not.\n", direction.DirectionLabel) + } } } - result := rpm.CalculateRpm(selfRtts, foreignRtts, specParameters.TrimmedMeanPct, specParameters.Percentile) + fmt.Printf("Results:\n") + fmt.Printf("========\n") + // Print out the formatted results from each of the directions. + for _, direction := range directions { + fmt.Print(direction.FormattedResults) + fmt.Printf("========\n") + } + + allSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) + allForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) + + allSelfRtts.Append(&downloadDirection.SelfRtts) + allSelfRtts.Append(&uploadDirection.SelfRtts) + allForeignRtts.Append(&downloadDirection.ForeignRtts) + allForeignRtts.Append(&uploadDirection.ForeignRtts) + + result := rpm.CalculateRpm(allSelfRtts, allForeignRtts, specParameters.TrimmedMeanPct, specParameters.Percentile) if *debugCliFlag { fmt.Printf("(Final RPM Calculation stats): %v\n", result.ToString()) |
