diff options
| author | Will Hawkins <[email protected]> | 2023-12-13 19:56:03 -0500 |
|---|---|---|
| committer | Will Hawkins <[email protected]> | 2024-01-04 19:10:37 -0500 |
| commit | f3990f950277c2f61e0e1811b4b8a81fc0219da4 (patch) | |
| tree | 6969e4ac2c4e94e75fe2e0c5581da5c07785dce8 | |
| parent | 552f01ad73248474553ce471695745db58c862ea (diff) | |
[Feature] Support for testing upload/download in parallel
Use the `--rpm.parallel` to test in parallel mode. The default testing
mode is serial.
Signed-off-by: Will Hawkins <[email protected]>
| -rw-r--r-- | constants/constants.go | 9 | ||||
| -rw-r--r-- | direction/direction.go | 8 | ||||
| -rw-r--r-- | executor/executor.go | 53 | ||||
| -rw-r--r-- | executor/executor_test.go | 56 | ||||
| -rw-r--r-- | networkQuality.go | 844 | ||||
| -rw-r--r-- | rpm/parameters.go | 15 | ||||
| -rw-r--r-- | rpm/parameters_test.go | 34 | ||||
| -rw-r--r-- | rpm/rpm.go | 3 | ||||
| -rw-r--r-- | series/series.go | 62 | ||||
| -rw-r--r-- | series/series_test.go | 64 | ||||
| -rw-r--r-- | testing/utilities.go | 30 | ||||
| -rw-r--r-- | tools/graphing.py | 8 |
12 files changed, 772 insertions, 414 deletions
diff --git a/constants/constants.go b/constants/constants.go index 8621de8..81c932e 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -14,7 +14,11 @@ package constants -import "time" +import ( + "time" + + "github.com/network-quality/goresponsiveness/executor" +) var ( // The initial number of load-generating connections when attempting to saturate the network. @@ -38,6 +42,9 @@ var ( DefaultInsecureSkipVerify bool = true DefaultL4SCongestionControlAlgorithm string = "prague" + + // The default execution policy for running a test (serial) + DefaultTestExecutionPolicy = executor.Serial ) type SpecParametersCliOptions struct { diff --git a/direction/direction.go b/direction/direction.go index 865a273..9b41b26 100644 --- a/direction/direction.go +++ b/direction/direction.go @@ -15,11 +15,14 @@ package direction import ( + "context" + "github.com/network-quality/goresponsiveness/datalogger" "github.com/network-quality/goresponsiveness/debug" "github.com/network-quality/goresponsiveness/lgc" "github.com/network-quality/goresponsiveness/probe" "github.com/network-quality/goresponsiveness/rpm" + "github.com/network-quality/goresponsiveness/series" ) type Direction struct { @@ -37,4 +40,9 @@ type Direction struct { ExtendedStatsEligible bool StableThroughput bool StableResponsiveness bool + SelfRtts series.WindowSeries[float64, uint64] + ForeignRtts series.WindowSeries[float64, uint64] + ThroughputActivityCtx *context.Context + ThroughputActivityCtxCancel *context.CancelFunc + FormattedResults string } diff --git a/executor/executor.go b/executor/executor.go new file mode 100644 index 0000000..22c1235 --- /dev/null +++ b/executor/executor.go @@ -0,0 +1,53 @@ +package executor + +import ( + "sync" +) + +type ExecutionMethod int + +const ( + Parallel ExecutionMethod = iota + Serial +) + +type ExecutionUnit func() + +func (ep ExecutionMethod) ToString() string { + switch ep { + case Parallel: + return "Parallel" + case Serial: + return "Serial" + } + return "Unrecognized execution method" +} + +func Execute(executionMethod ExecutionMethod, executionUnits []ExecutionUnit) *sync.WaitGroup { + waiter := &sync.WaitGroup{} + + // Make sure that we Add to the wait group all the execution units + // before starting to run any -- there is a potential race condition + // otherwise. + (*waiter).Add(len(executionUnits)) + + for _, executionUnit := range executionUnits { + // Stupid capture in Go! Argh. + executionUnit := executionUnit + + invoker := func() { + executionUnit() + (*waiter).Done() + } + switch executionMethod { + case Parallel: + go invoker() + case Serial: + invoker() + default: + panic("Invalid execution method value given.") + } + } + + return waiter +} diff --git a/executor/executor_test.go b/executor/executor_test.go new file mode 100644 index 0000000..9c3af7e --- /dev/null +++ b/executor/executor_test.go @@ -0,0 +1,56 @@ +package executor_test + +import ( + "testing" + "time" + + "github.com/network-quality/goresponsiveness/executor" +) + +var countToFive = func() { + time.Sleep(5 * time.Second) +} + +var countToThree = func() { + time.Sleep(3 * time.Second) +} + +var executionUnits = []executor.ExecutionUnit{countToFive, countToThree} + +func TestSerial(t *testing.T) { + then := time.Now() + waiter := executor.Execute(executor.Serial, executionUnits) + waiter.Wait() + when := time.Now() + + if when.Sub(then) < 7*time.Second { + t.Fatalf("Execution did not happen serially -- the wait was too short: %v", when.Sub(then).Seconds()) + } +} + +func TestParallel(t *testing.T) { + then := time.Now() + waiter := executor.Execute(executor.Parallel, executionUnits) + waiter.Wait() + when := time.Now() + + if when.Sub(then) > 6*time.Second { + t.Fatalf("Execution did not happen in parallel -- the wait was too long: %v", when.Sub(then).Seconds()) + } +} + +func TestExecutionMethodParallelToString(t *testing.T) { + executionMethod := executor.Parallel + + if executionMethod.ToString() != "Parallel" { + t.Fatalf("Incorrect result from ExecutionMethod.ToString; expected Parallel but got %v", executionMethod.ToString()) + } +} + +func TestExecutionMethodSerialToString(t *testing.T) { + executionMethod := executor.Serial + + if executionMethod.ToString() != "Serial" { + t.Fatalf("Incorrect result from ExecutionMethod.ToString; expected Serial but got %v", executionMethod.ToString()) + } +} diff --git a/networkQuality.go b/networkQuality.go index 340a316..2e13893 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -31,6 +31,7 @@ import ( "github.com/network-quality/goresponsiveness/datalogger" "github.com/network-quality/goresponsiveness/debug" "github.com/network-quality/goresponsiveness/direction" + "github.com/network-quality/goresponsiveness/executor" "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" "github.com/network-quality/goresponsiveness/probe" @@ -167,9 +168,20 @@ var ( ) withL4S = flag.Bool("with-l4s", false, "Use L4S (with default TCP prague congestion control algorithm.)") withL4SAlgorithm = flag.String("with-l4s-algorithm", "", "Use L4S (with specified congestion control algorithm.)") + + parallelTestExecutionPolicy = constants.DefaultTestExecutionPolicy ) func main() { + // Add one final command-line argument + flag.BoolFunc("rpm.parallel", "Parallel test execution policy.", func(value string) error { + if value != "true" { + return fmt.Errorf("-parallel can only be used to enable parallel test execution policy") + } + parallelTestExecutionPolicy = executor.Parallel + return nil + }) + flag.Parse() if *showVersion { @@ -184,7 +196,7 @@ func main() { } specParameters, err := rpm.SpecParametersFromArguments(*rpmtimeout, *rpmmad, *rpmid, - *rpmtmp, *rpmsdt, *rpmmnp, *rpmmps, *rpmptc, *rpmp) + *rpmtmp, *rpmsdt, *rpmmnp, *rpmmps, *rpmptc, *rpmp, parallelTestExecutionPolicy) if err != nil { fmt.Fprintf( os.Stderr, @@ -639,479 +651,533 @@ func main() { } - // All tests will accumulate data to these series because it will all matter for RPM calculation! - selfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) - foreignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) - var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil if *printQualityAttenuation { selfRttsQualityAttenuation = qualityattenuation.NewSimpleQualityAttenuation() } + directionExecutionUnits := make([]executor.ExecutionUnit, 0) + for _, direction := range directions { + // Make a copy here to make sure that we do not get go-wierdness in our closure (see https://github.com/golang/go/discussions/56010). + direction := direction - timeoutDuration := specParameters.TestTimeout - timeoutAbsoluteTime := time.Now().Add(timeoutDuration) + directionExecutionUnit := func() { + timeoutDuration := specParameters.TestTimeout + timeoutAbsoluteTime := time.Now().Add(timeoutDuration) - timeoutChannel := timeoutat.TimeoutAt( - operatingCtx, - timeoutAbsoluteTime, - debugLevel, - ) - if debug.IsDebug(debugLevel) { - fmt.Printf("%s Test will end no later than %v\n", direction.DirectionLabel, timeoutAbsoluteTime) - } + timeoutChannel := timeoutat.TimeoutAt( + operatingCtx, + timeoutAbsoluteTime, + debugLevel, + ) + if debug.IsDebug(debugLevel) { + fmt.Printf("%s Test will end no later than %v\n", + direction.DirectionLabel, timeoutAbsoluteTime) + } - throughputCtx, throughputCtxCancel := context.WithCancel(operatingCtx) - proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx) + throughputOperatorCtx, throughputOperatorCtxCancel := context.WithCancel(operatingCtx) + proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx) - // This context is used to control the network activity (i.e., it controls all - // the connections that are open to do load generation and probing). Cancelling this context will close - // all the network connections that are responsible for generating the load. - networkActivityCtx, networkActivityCtxCancel := context.WithCancel(operatingCtx) + // This context is used to control the network activity (i.e., it controls all + // the connections that are open to do load generation and probing). Cancelling this context will close + // all the network connections that are responsible for generating the load. + probeNetworkActivityCtx, probeNetworkActivityCtxCancel := context.WithCancel(operatingCtx) + throughputCtx, throughputCtxCancel := context.WithCancel(operatingCtx) + direction.ThroughputActivityCtx, direction.ThroughputActivityCtxCancel = &throughputCtx, &throughputCtxCancel - throughputGeneratorCtx, throughputGeneratorCtxCancel := context.WithCancel(throughputCtx) + reachWorkingConditionsCtx, reachWorkingConditionsCtxCancel := + context.WithCancel(throughputOperatorCtx) - lgStabilizationCommunicationChannel := rpm.LoadGenerator( - throughputCtx, - networkActivityCtx, - throughputGeneratorCtx, - specParameters.EvalInterval, - direction.CreateLgdc, - &direction.Lgcc, - &globalNumericBucketGenerator, - specParameters.MaxParallelConns, - *calculateExtendedStats, - direction.DirectionDebugging, - ) + lgStabilizationCommunicationChannel := rpm.LoadGenerator( + throughputOperatorCtx, + *direction.ThroughputActivityCtx, + reachWorkingConditionsCtx, + specParameters.EvalInterval, + direction.CreateLgdc, + &direction.Lgcc, + &globalNumericBucketGenerator, + specParameters.MaxParallelConns, + *calculateExtendedStats, + direction.DirectionDebugging, + ) - throughputStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, - fmt.Sprintf("%v Throughput Stabilizer", direction.DirectionLabel)) - downloadThroughputStabilizerDebugLevel := debug.Error - if *debugCliFlag { - downloadThroughputStabilizerDebugLevel = debug.Debug - } - throughputStabilizer := stabilizer.NewStabilizer[float64, uint64]( - specParameters.MovingAvgDist, specParameters.StdDevTolerance, 0, "bytes", - downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig) + throughputStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, + fmt.Sprintf("%v Throughput Stabilizer", direction.DirectionLabel)) + downloadThroughputStabilizerDebugLevel := debug.Error + if *debugCliFlag { + downloadThroughputStabilizerDebugLevel = debug.Debug + } + throughputStabilizer := stabilizer.NewStabilizer[float64, uint64]( + specParameters.MovingAvgDist, specParameters.StdDevTolerance, 0, "bytes", + downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig) - responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, - fmt.Sprintf("%v Responsiveness Stabilizer", direction.DirectionLabel)) - responsivenessStabilizerDebugLevel := debug.Error - if *debugCliFlag { - responsivenessStabilizerDebugLevel = debug.Debug - } - responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint64]( - specParameters.MovingAvgDist, specParameters.StdDevTolerance, - specParameters.TrimmedMeanPct, "milliseconds", - responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig) + responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, + fmt.Sprintf("%v Responsiveness Stabilizer", direction.DirectionLabel)) + responsivenessStabilizerDebugLevel := debug.Error + if *debugCliFlag { + responsivenessStabilizerDebugLevel = debug.Debug + } + responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint64]( + specParameters.MovingAvgDist, specParameters.StdDevTolerance, + specParameters.TrimmedMeanPct, "milliseconds", + responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig) - // For later debugging output, record the last throughputs on load-generating connectings - // and the number of open connections. - lastThroughputRate := float64(0) - lastThroughputOpenConnectionCount := int(0) + // For later debugging output, record the last throughputs on load-generating connectings + // and the number of open connections. + lastThroughputRate := float64(0) + lastThroughputOpenConnectionCount := int(0) - stabilityCheckTime := time.Now().Add(specParameters.EvalInterval) - stabilityCheckTimeChannel := timeoutat.TimeoutAt( - operatingCtx, - stabilityCheckTime, - debugLevel, - ) + stabilityCheckTime := time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel := timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) - lg_timeout: - for !direction.StableThroughput { - select { - case throughputMeasurement := <-lgStabilizationCommunicationChannel: - { - switch throughputMeasurement.Type { - case series.SeriesMessageReserve: - { - throughputStabilizer.Reserve(throughputMeasurement.Bucket) - if *debugCliFlag { - fmt.Printf( - "%s: Reserving a throughput bucket with id %v.\n", - direction.DirectionLabel, throughputMeasurement.Bucket) + lg_timeout: + for !direction.StableThroughput { + select { + case throughputMeasurement := <-lgStabilizationCommunicationChannel: + { + switch throughputMeasurement.Type { + case series.SeriesMessageReserve: + { + throughputStabilizer.Reserve(throughputMeasurement.Bucket) + if *debugCliFlag { + fmt.Printf( + "%s: Reserving a throughput bucket with id %v.\n", + direction.DirectionLabel, throughputMeasurement.Bucket) + } } - } - case series.SeriesMessageMeasure: - { - bucket := throughputMeasurement.Bucket - measurement := utilities.GetSome(throughputMeasurement.Measure) + case series.SeriesMessageMeasure: + { + bucket := throughputMeasurement.Bucket + measurement := utilities.GetSome(throughputMeasurement.Measure) - throughputStabilizer.AddMeasurement(bucket, measurement.Throughput) + throughputStabilizer.AddMeasurement(bucket, measurement.Throughput) - direction.ThroughputDataLogger.LogRecord(measurement) - for _, v := range measurement.GranularThroughputDataPoints { - v.Direction = "Download" - direction.GranularThroughputDataLogger.LogRecord(v) - } + direction.ThroughputDataLogger.LogRecord(measurement) + for _, v := range measurement.GranularThroughputDataPoints { + v.Direction = "Download" + direction.GranularThroughputDataLogger.LogRecord(v) + } - lastThroughputRate = measurement.Throughput - lastThroughputOpenConnectionCount = measurement.Connections + lastThroughputRate = measurement.Throughput + lastThroughputOpenConnectionCount = measurement.Connections + } } } - } - case <-stabilityCheckTimeChannel: - { - if *debugCliFlag { - fmt.Printf( - "%v throughput stability interval is complete.\n", direction.DirectionLabel) - } - stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) - stabilityCheckTimeChannel = timeoutat.TimeoutAt( - operatingCtx, - stabilityCheckTime, - debugLevel, - ) + case <-stabilityCheckTimeChannel: + { + if *debugCliFlag { + fmt.Printf( + "%v throughput stability interval is complete.\n", direction.DirectionLabel) + } + stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel = timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) - direction.StableThroughput = throughputStabilizer.IsStable() - if *debugCliFlag { - fmt.Printf( - "%v is instantaneously %s.\n", direction.DirectionLabel, - utilities.Conditional(direction.StableThroughput, "stable", "unstable")) - } + direction.StableThroughput = throughputStabilizer.IsStable() + if *debugCliFlag { + fmt.Printf( + "%v is instantaneously %s.\n", direction.DirectionLabel, + utilities.Conditional(direction.StableThroughput, "stable", "unstable")) + } - if direction.StableThroughput { - throughputGeneratorCtxCancel() + throughputStabilizer.Interval() + } + case <-timeoutChannel: + { + break lg_timeout } - throughputStabilizer.Interval() - } - case <-timeoutChannel: - { - break lg_timeout } } - } - if direction.StableThroughput { - if *debugCliFlag { - fmt.Printf("Throughput is stable; beginning responsiveness testing.\n") + if direction.StableThroughput { + if *debugCliFlag { + fmt.Printf("Throughput is stable; beginning responsiveness testing.\n") + } + } else { + fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Making the test 15 seconds longer to calculate speculative RPM results.\n") + speculativeTimeoutDuration := time.Second * 15 + speculativeAbsoluteTimeoutTime := time.Now().Add(speculativeTimeoutDuration) + timeoutChannel = timeoutat.TimeoutAt( + operatingCtx, + speculativeAbsoluteTimeoutTime, + debugLevel, + ) } - } else { - fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Adding 15 seconds to calculate speculative RPM results.\n") - speculativeTimeoutDuration := time.Second * 15 - speculativeAbsoluteTimeoutTime := time.Now().Add(speculativeTimeoutDuration) - timeoutChannel = timeoutat.TimeoutAt( - operatingCtx, - speculativeAbsoluteTimeoutTime, - debugLevel, - ) - } - perDirectionSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) - perDirectionForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) + // No matter what, we will stop adding additional load-generating connections! + reachWorkingConditionsCtxCancel() - responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber( - proberOperatorCtx, - networkActivityCtx, - generateForeignProbeConfiguration, - generateSelfProbeConfiguration, - &direction.Lgcc, - &globalNumericBucketGenerator, - direction.CreateLgdc().Direction(), // TODO: This could be better! - specParameters.ProbeInterval, - sslKeyFileConcurrentWriter, - *calculateExtendedStats, - direction.ProbeDebugging, - ) + direction.SelfRtts = series.NewWindowSeries[float64, uint64](series.Forever, 0) + direction.ForeignRtts = series.NewWindowSeries[float64, uint64](series.Forever, 0) - responsiveness_timeout: - for !direction.StableResponsiveness { - select { - case probeMeasurement := <-responsivenessStabilizationCommunicationChannel: - { - switch probeMeasurement.Type { - case series.SeriesMessageReserve: - { - bucket := probeMeasurement.Bucket - if *debugCliFlag { - fmt.Printf( - "%s: Reserving a responsiveness bucket with id %v.\n", direction.DirectionLabel, bucket) - } - responsivenessStabilizer.Reserve(bucket) - selfRtts.Reserve(bucket) - foreignRtts.Reserve(bucket) - perDirectionForeignRtts.Reserve(bucket) - perDirectionSelfRtts.Reserve(bucket) - } - case series.SeriesMessageMeasure: - { - bucket := probeMeasurement.Bucket - measurement := utilities.GetSome(probeMeasurement.Measure) - foreignDataPoint := measurement.Foreign - selfDataPoint := measurement.Self + responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber( + proberOperatorCtx, + probeNetworkActivityCtx, + generateForeignProbeConfiguration, + generateSelfProbeConfiguration, + &direction.Lgcc, + &globalNumericBucketGenerator, + direction.CreateLgdc().Direction(), // TODO: This could be better! + specParameters.ProbeInterval, + sslKeyFileConcurrentWriter, + *calculateExtendedStats, + direction.ProbeDebugging, + ) - if *debugCliFlag { - fmt.Printf( - "%s: Filling a responsiveness bucket with id %v with value %v.\n", - direction.DirectionLabel, bucket, measurement) + responsiveness_timeout: + for !direction.StableResponsiveness { + select { + case probeMeasurement := <-responsivenessStabilizationCommunicationChannel: + { + switch probeMeasurement.Type { + case series.SeriesMessageReserve: + { + bucket := probeMeasurement.Bucket + if *debugCliFlag { + fmt.Printf( + "%s: Reserving a responsiveness bucket with id %v.\n", direction.DirectionLabel, bucket) + } + responsivenessStabilizer.Reserve(bucket) + direction.ForeignRtts.Reserve(bucket) + direction.SelfRtts.Reserve(bucket) } - responsivenessStabilizer.AddMeasurement(bucket, - (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds()) + case series.SeriesMessageMeasure: + { + bucket := probeMeasurement.Bucket + measurement := utilities.GetSome(probeMeasurement.Measure) + foreignDataPoint := measurement.Foreign + selfDataPoint := measurement.Self - if err := selfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil { - fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (selfRtts)\n", bucket) - } - if err := perDirectionSelfRtts.Fill(bucket, - selfDataPoint.Duration.Seconds()); err != nil { - fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionSelfRtts)\n", bucket) - } + if *debugCliFlag { + fmt.Printf( + "%s: Filling a responsiveness bucket with id %v with value %v.\n", + direction.DirectionLabel, bucket, measurement) + } + responsivenessStabilizer.AddMeasurement(bucket, + (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds()) - if err := foreignRtts.Fill(bucket, foreignDataPoint.Duration.Seconds()); err != nil { - fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (foreignRtts)\n", bucket) - } + if err := direction.SelfRtts.Fill(bucket, + selfDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionSelfRtts)\n", bucket) + } - if err := perDirectionForeignRtts.Fill(bucket, - foreignDataPoint.Duration.Seconds()); err != nil { - fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionForeignRtts)\n", bucket) - } + if err := direction.ForeignRtts.Fill(bucket, + foreignDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionForeignRtts)\n", bucket) + } - if selfRttsQualityAttenuation != nil { - selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds()) - } + if selfRttsQualityAttenuation != nil { + selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds()) + } - direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint) - direction.SelfProbeDataLogger.LogRecord(*selfDataPoint) + direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint) + direction.SelfProbeDataLogger.LogRecord(*selfDataPoint) + } } } - } - case throughputMeasurement := <-lgStabilizationCommunicationChannel: - { - switch throughputMeasurement.Type { - case series.SeriesMessageReserve: - { - // We are no longer tracking stability, so reservation messages are useless! - if *debugCliFlag { - fmt.Printf( - "%s: Discarding a throughput bucket with id %v when ascertaining responsiveness.\n", - direction.DirectionLabel, throughputMeasurement.Bucket) + case throughputMeasurement := <-lgStabilizationCommunicationChannel: + { + switch throughputMeasurement.Type { + case series.SeriesMessageReserve: + { + // We are no longer tracking stability, so reservation messages are useless! + if *debugCliFlag { + fmt.Printf( + "%s: Discarding a throughput bucket with id %v when ascertaining responsiveness.\n", + direction.DirectionLabel, throughputMeasurement.Bucket) + } } - } - case series.SeriesMessageMeasure: - { - measurement := utilities.GetSome(throughputMeasurement.Measure) + case series.SeriesMessageMeasure: + { + measurement := utilities.GetSome(throughputMeasurement.Measure) - if *debugCliFlag { - fmt.Printf("Adding a throughput measurement (while ascertaining responsiveness).\n") - } - // There may be more than one round trip accumulated together. If that is the case, - direction.ThroughputDataLogger.LogRecord(measurement) - for _, v := range measurement.GranularThroughputDataPoints { - v.Direction = direction.DirectionLabel - direction.GranularThroughputDataLogger.LogRecord(v) - } + if *debugCliFlag { + fmt.Printf("Adding a throughput measurement (while ascertaining responsiveness).\n") + } + // There may be more than one round trip accumulated together. If that is the case, + direction.ThroughputDataLogger.LogRecord(measurement) + for _, v := range measurement.GranularThroughputDataPoints { + v.Direction = direction.DirectionLabel + direction.GranularThroughputDataLogger.LogRecord(v) + } - lastThroughputRate = measurement.Throughput - lastThroughputOpenConnectionCount = measurement.Connections + lastThroughputRate = measurement.Throughput + lastThroughputOpenConnectionCount = measurement.Connections + } } } - } - case <-timeoutChannel: - { - break responsiveness_timeout - } - case <-stabilityCheckTimeChannel: - { - if *debugCliFlag { - fmt.Printf( - "%v responsiveness stability interval is complete.\n", direction.DirectionLabel) + case <-timeoutChannel: + { + break responsiveness_timeout } + case <-stabilityCheckTimeChannel: + { + if *debugCliFlag { + fmt.Printf( + "%v responsiveness stability interval is complete.\n", direction.DirectionLabel) + } - stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) - stabilityCheckTimeChannel = timeoutat.TimeoutAt( - operatingCtx, - stabilityCheckTime, - debugLevel, - ) + stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel = timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) - // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy - // is *actually* important, but it can't hurt? - direction.StableResponsiveness = responsivenessStabilizer.IsStable() + // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy + // is *actually* important, but it can't hurt? + direction.StableResponsiveness = responsivenessStabilizer.IsStable() - if *debugCliFlag { - fmt.Printf( - "Responsiveness is instantaneously %s.\n", - utilities.Conditional(direction.StableResponsiveness, "stable", "unstable")) - } + if *debugCliFlag { + fmt.Printf( + "Responsiveness is instantaneously %s.\n", + utilities.Conditional(direction.StableResponsiveness, "stable", "unstable")) + } - responsivenessStabilizer.Interval() + responsivenessStabilizer.Interval() + } } } - } - - // Did the test run to stability? - testRanToStability := direction.StableThroughput && direction.StableResponsiveness - - if *debugCliFlag { - fmt.Printf("Stopping all the load generating data generators (stability: %s).\n", - utilities.Conditional(testRanToStability, "success", "failure")) - } - /* At this point there are - 1. Load generators running - -- uploadLoadGeneratorOperatorCtx - -- downloadLoadGeneratorOperatorCtx - 2. Network connections opened by those load generators: - -- lgNetworkActivityCtx - 3. Probes - -- proberCtx - */ + // Did the test run to stability? + testRanToStability := direction.StableThroughput && direction.StableResponsiveness - // First, stop the load generator and the probe operators (but *not* the network activity) - proberOperatorCtxCancel() - throughputCtxCancel() + if *debugCliFlag { + fmt.Printf("Stopping all the load generating data generators (stability: %s).\n", + utilities.Conditional(testRanToStability, "success", "failure")) + } - // Second, calculate the extended stats (if the user requested and they are available for the direction) - extendedStats := extendedstats.AggregateExtendedStats{} - if *calculateExtendedStats && direction.ExtendedStatsEligible { - if extendedstats.ExtendedStatsAvailable() { - func() { - // Put inside an IIFE so that we can use a defer! - direction.Lgcc.Lock.Lock() - defer direction.Lgcc.Lock.Unlock() + /* At this point there are + 1. Load generators running + -- uploadLoadGeneratorOperatorCtx + -- downloadLoadGeneratorOperatorCtx + 2. Network connections opened by those load generators: + -- lgNetworkActivityCtx + 3. Probes + -- proberCtx + */ - lgcCount, err := direction.Lgcc.Len() - if err != nil { - fmt.Fprintf( - os.Stderr, - "Warning: Could not calculate the number of %v load-generating connections; aborting extended stats preparation.\n", direction.DirectionLabel, - ) - return - } + // First, stop the load generator and the probe operators (but *not* the network activity) + proberOperatorCtxCancel() + throughputOperatorCtxCancel() - for i := 0; i < lgcCount; i++ { - // Assume that extended statistics are available -- the check was done explicitly at - // program startup if the calculateExtendedStats flag was set by the user on the command line. - currentLgc, _ := direction.Lgcc.Get(i) + // Second, calculate the extended stats (if the user requested and they are available for the direction) + extendedStats := extendedstats.AggregateExtendedStats{} + if *calculateExtendedStats && direction.ExtendedStatsEligible { + if extendedstats.ExtendedStatsAvailable() { + func() { + // Put inside an IIFE so that we can use a defer! + direction.Lgcc.Lock.Lock() + defer direction.Lgcc.Lock.Unlock() - if currentLgc == nil || (*currentLgc).Stats() == nil { + lgcCount, err := direction.Lgcc.Len() + if err != nil { fmt.Fprintf( os.Stderr, - "Warning: Could not add extended stats for the connection: The LGC was nil or there were no stats available.\n", + "Warning: Could not calculate the number of %v load-generating connections; aborting extended stats preparation.\n", direction.DirectionLabel, ) - continue + return } - if err := extendedStats.IncorporateConnectionStats( - (*currentLgc).Stats().ConnInfo.Conn); err != nil { - fmt.Fprintf( - os.Stderr, - "Warning: Could not add extended stats for the connection: %v.\n", - err, - ) + + for i := 0; i < lgcCount; i++ { + // Assume that extended statistics are available -- the check was done explicitly at + // program startup if the calculateExtendedStats flag was set by the user on the command line. + currentLgc, _ := direction.Lgcc.Get(i) + + if currentLgc == nil || (*currentLgc).Stats() == nil { + fmt.Fprintf( + os.Stderr, + "Warning: Could not add extended stats for the connection: The LGC was nil or there were no stats available.\n", + ) + continue + } + if err := extendedStats.IncorporateConnectionStats( + (*currentLgc).Stats().ConnInfo.Conn); err != nil { + fmt.Fprintf( + os.Stderr, + "Warning: Could not add extended stats for the connection: %v.\n", + err, + ) + } } - } - }() - } else { - // TODO: Should we just log here? - panic("Extended stats are not available but the user requested their calculation.") + }() + } else { + // TODO: Should we just log here? + panic("Extended stats are not available but the user requested their calculation.") + } } - } - // Third, stop the network connections opened by the load generators and probers. - networkActivityCtxCancel() + // *Always* stop the probers! But, conditionally stop the througput. + probeNetworkActivityCtxCancel() + if parallelTestExecutionPolicy != executor.Parallel { + if direction.ThroughputActivityCtxCancel == nil { + panic(fmt.Sprintf("The cancellation function for the %v direction's throughput is nil!", direction.DirectionLabel)) + } + (*direction.ThroughputActivityCtxCancel)() + } - fmt.Printf( - "%v: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", - direction.DirectionLabel, - utilities.ToMbps(lastThroughputRate), - utilities.ToMBps(lastThroughputRate), - lastThroughputOpenConnectionCount, - ) + direction.FormattedResults += fmt.Sprintf( + "%v: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", + direction.DirectionLabel, + utilities.ToMbps(lastThroughputRate), + utilities.ToMBps(lastThroughputRate), + lastThroughputOpenConnectionCount, + ) - if *calculateExtendedStats { - fmt.Println(extendedStats.Repr()) - } - directionResult := rpm.CalculateRpm(perDirectionSelfRtts, perDirectionForeignRtts, - specParameters.TrimmedMeanPct, specParameters.Percentile) - if *debugCliFlag { - fmt.Printf("(%s RPM Calculation stats): %v\n", direction.DirectionLabel, directionResult.ToString()) - } - if *printQualityAttenuation { - fmt.Println("Quality Attenuation Statistics:") - fmt.Printf( - `Number of losses: %d -Number of samples: %d -Min: %.6f s -Max: %.6f s -Mean: %.6f s -Variance: %.6f s -Standard Deviation: %.6f s -PDV(90): %.6f s -PDV(99): %.6f s -P(90): %.6f s -P(99): %.6f s -RPM: %.0f -Gaming QoO: %.0f + if *calculateExtendedStats { + direction.FormattedResults += fmt.Sprintf("%v\n", extendedStats.Repr()) + } + directionResult := rpm.CalculateRpm(direction.SelfRtts, direction.ForeignRtts, + specParameters.TrimmedMeanPct, specParameters.Percentile) + if *debugCliFlag { + direction.FormattedResults += fmt.Sprintf("(%s RPM Calculation stats): %v\n", + direction.DirectionLabel, directionResult.ToString()) + } + if *printQualityAttenuation { + direction.FormattedResults += "Quality Attenuation Statistics:\n" + direction.FormattedResults += fmt.Sprintf( + ` Number of losses: %d + Number of samples: %d + Min: %.6f s + Max: %.6f s + Mean: %.6f s + Variance: %.6f s + Standard Deviation: %.6f s + PDV(90): %.6f s + PDV(99): %.6f s + P(90): %.6f s + P(99): %.6f s + RPM: %.0f + Gaming QoO: %.0f `, selfRttsQualityAttenuation.GetNumberOfLosses(), - selfRttsQualityAttenuation.GetNumberOfSamples(), - selfRttsQualityAttenuation.GetMinimum(), - selfRttsQualityAttenuation.GetMaximum(), - selfRttsQualityAttenuation.GetAverage(), - selfRttsQualityAttenuation.GetVariance(), - selfRttsQualityAttenuation.GetStandardDeviation(), - selfRttsQualityAttenuation.GetPDV(90), - selfRttsQualityAttenuation.GetPDV(99), - selfRttsQualityAttenuation.GetPercentile(90), - selfRttsQualityAttenuation.GetPercentile(99), - selfRttsQualityAttenuation.GetRPM(), - selfRttsQualityAttenuation.GetGamingQoO()) - } + selfRttsQualityAttenuation.GetNumberOfSamples(), + selfRttsQualityAttenuation.GetMinimum(), + selfRttsQualityAttenuation.GetMaximum(), + selfRttsQualityAttenuation.GetAverage(), + selfRttsQualityAttenuation.GetVariance(), + selfRttsQualityAttenuation.GetStandardDeviation(), + selfRttsQualityAttenuation.GetPDV(90), + selfRttsQualityAttenuation.GetPDV(99), + selfRttsQualityAttenuation.GetPercentile(90), + selfRttsQualityAttenuation.GetPercentile(99), + selfRttsQualityAttenuation.GetRPM(), + selfRttsQualityAttenuation.GetGamingQoO()) + } - if !testRanToStability { - fmt.Printf("Test did not run to stability, these results are estimates:\n") - } + if !testRanToStability { + direction.FormattedResults += "Test did not run to stability, these results are estimates:\n" + } + + direction.FormattedResults += fmt.Sprintf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, + directionResult.PNRpm, specParameters.Percentile) + direction.FormattedResults += fmt.Sprintf( + "%s RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel, + directionResult.MeanRpm, specParameters.TrimmedMeanPct) + + if len(*prometheusStatsFilename) > 0 { + var testStable int + if testRanToStability { + testStable = 1 + } + var buffer bytes.Buffer + buffer.WriteString(fmt.Sprintf("networkquality_%v_test_stable %d\n", + strings.ToLower(direction.DirectionLabel), testStable)) + buffer.WriteString(fmt.Sprintf("networkquality_%v_p90_rpm_value %d\n", + strings.ToLower(direction.DirectionLabel), int64(directionResult.PNRpm))) + buffer.WriteString(fmt.Sprintf("networkquality_%v_trimmed_rpm_value %d\n", + strings.ToLower(direction.DirectionLabel), + int64(directionResult.MeanRpm))) - fmt.Printf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, directionResult.PNRpm, specParameters.Percentile) - fmt.Printf("%s RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel, - directionResult.MeanRpm, specParameters.TrimmedMeanPct) + buffer.WriteString(fmt.Sprintf("networkquality_%v_bits_per_second %d\n", + strings.ToLower(direction.DirectionLabel), int64(lastThroughputRate))) + buffer.WriteString(fmt.Sprintf("networkquality_%v_connections %d\n", + strings.ToLower(direction.DirectionLabel), + int64(lastThroughputOpenConnectionCount))) - if len(*prometheusStatsFilename) > 0 { - var testStable int - if testRanToStability { - testStable = 1 + if err := os.WriteFile(*prometheusStatsFilename, buffer.Bytes(), 0o644); err != nil { + fmt.Printf("could not write %s: %s", *prometheusStatsFilename, err) + os.Exit(1) + } } - var buffer bytes.Buffer - buffer.WriteString(fmt.Sprintf("networkquality_%v_test_stable %d\n", - strings.ToLower(direction.DirectionLabel), testStable)) - buffer.WriteString(fmt.Sprintf("networkquality_%v_p90_rpm_value %d\n", - strings.ToLower(direction.DirectionLabel), int64(directionResult.PNRpm))) - buffer.WriteString(fmt.Sprintf("networkquality_%v_trimmed_rpm_value %d\n", - strings.ToLower(direction.DirectionLabel), - int64(directionResult.MeanRpm))) - buffer.WriteString(fmt.Sprintf("networkquality_%v_bits_per_second %d\n", - strings.ToLower(direction.DirectionLabel), int64(lastThroughputRate))) - buffer.WriteString(fmt.Sprintf("networkquality_%v_connections %d\n", - strings.ToLower(direction.DirectionLabel), - int64(lastThroughputOpenConnectionCount))) + direction.ThroughputDataLogger.Export() + if *debugCliFlag { + fmt.Printf("Closing the %v throughput data logger.\n", direction.DirectionLabel) + } + direction.ThroughputDataLogger.Close() - if err := os.WriteFile(*prometheusStatsFilename, buffer.Bytes(), 0o644); err != nil { - fmt.Printf("could not write %s: %s", *prometheusStatsFilename, err) - os.Exit(1) + direction.GranularThroughputDataLogger.Export() + if *debugCliFlag { + fmt.Printf("Closing the %v granular throughput data logger.\n", direction.DirectionLabel) } - } + direction.GranularThroughputDataLogger.Close() - direction.ThroughputDataLogger.Export() - if *debugCliFlag { - fmt.Printf("Closing the %v throughput data logger.\n", direction.DirectionLabel) + if *debugCliFlag { + fmt.Printf("In debugging mode, we will cool down between tests.\n") + time.Sleep(constants.CooldownPeriod) + fmt.Printf("Done cooling down.\n") + } } - direction.ThroughputDataLogger.Close() + directionExecutionUnits = append(directionExecutionUnits, directionExecutionUnit) + } // End of direction testing. - direction.GranularThroughputDataLogger.Export() - if *debugCliFlag { - fmt.Printf("Closing the %v granular throughput data logger.\n", direction.DirectionLabel) - } - direction.GranularThroughputDataLogger.Close() + waiter := executor.Execute(parallelTestExecutionPolicy, directionExecutionUnits) + waiter.Wait() - if *debugCliFlag { - fmt.Printf("In debugging mode, we will cool down between tests.\n") - time.Sleep(constants.CooldownPeriod) - fmt.Printf("Done cooling down.\n") + // If we were testing in parallel mode, then the throughputs for each direction are still + // running. We left them running in case one of the directions reached stability before the + // other! + if parallelTestExecutionPolicy == executor.Parallel { + for _, direction := range directions { + if *debugCliFlag { + fmt.Printf("Stopping the throughput connections for the %v test.\n", direction.DirectionLabel) + } + if direction.ThroughputActivityCtxCancel == nil { + panic(fmt.Sprintf("The cancellation function for the %v direction's throughput is nil!", direction.DirectionLabel)) + } + if (*direction.ThroughputActivityCtx).Err() != nil { + fmt.Fprintf(os.Stderr, "Warning: The throughput for the %v direction was already cancelled but should have been ongoing.\n", direction.DirectionLabel) + continue + } + (*direction.ThroughputActivityCtxCancel)() + } + } else { + for _, direction := range directions { + if direction.ThroughputActivityCtxCancel == nil { + panic(fmt.Sprintf("The cancellation function for the %v direction's throughput is nil!", direction.DirectionLabel)) + } + if (*direction.ThroughputActivityCtx).Err() == nil { + fmt.Fprintf(os.Stderr, "Warning: The throughput for the %v direction should have already been stopped but it was not.\n", direction.DirectionLabel) + } } } - result := rpm.CalculateRpm(selfRtts, foreignRtts, specParameters.TrimmedMeanPct, specParameters.Percentile) + fmt.Printf("Results:\n") + fmt.Printf("========\n") + // Print out the formatted results from each of the directions. + for _, direction := range directions { + fmt.Print(direction.FormattedResults) + fmt.Printf("========\n") + } + + allSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) + allForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) + + allSelfRtts.Append(&downloadDirection.SelfRtts) + allSelfRtts.Append(&uploadDirection.SelfRtts) + allForeignRtts.Append(&downloadDirection.ForeignRtts) + allForeignRtts.Append(&uploadDirection.ForeignRtts) + + result := rpm.CalculateRpm(allSelfRtts, allForeignRtts, specParameters.TrimmedMeanPct, specParameters.Percentile) if *debugCliFlag { fmt.Printf("(Final RPM Calculation stats): %v\n", result.ToString()) diff --git a/rpm/parameters.go b/rpm/parameters.go index 1d7e065..cc9fe7f 100644 --- a/rpm/parameters.go +++ b/rpm/parameters.go @@ -18,6 +18,7 @@ import ( "fmt" "time" + "github.com/network-quality/goresponsiveness/executor" "github.com/network-quality/goresponsiveness/utilities" ) @@ -31,9 +32,12 @@ type SpecParameters struct { ProbeInterval time.Duration ProbeCapacityPct float64 Percentile uint + ExecutionPolicy executor.ExecutionMethod } -func SpecParametersFromArguments(timeout int, mad int, id int, tmp uint, sdt float64, mnp int, mps int, ptc float64, p int) (*SpecParameters, error) { +func SpecParametersFromArguments(timeout int, mad int, id int, tmp uint, sdt float64, mnp int, + mps int, ptc float64, p int, executionPolicy executor.ExecutionMethod, +) (*SpecParameters, error) { if timeout <= 0 { return nil, fmt.Errorf("cannot specify a 0 or negative timeout for the test") } @@ -68,7 +72,8 @@ func SpecParametersFromArguments(timeout int, mad int, id int, tmp uint, sdt flo params := SpecParameters{ TestTimeout: testTimeout, MovingAvgDist: mad, EvalInterval: evalInterval, TrimmedMeanPct: tmp, StdDevTolerance: sdt, - MaxParallelConns: mnp, ProbeInterval: probeInterval, ProbeCapacityPct: ptc, Percentile: uint(p), + MaxParallelConns: mnp, ProbeInterval: probeInterval, ProbeCapacityPct: ptc, + Percentile: uint(p), ExecutionPolicy: executionPolicy, } return ¶ms, nil } @@ -82,8 +87,10 @@ Trimmed-Mean Percentage: %v, Standard-Deviation Tolerance: %v, Maximum number of parallel connections: %v, Probe Interval: %v (derived from given maximum-probes-per-second parameter), -Maximum Percentage Of Throughput For Probes: %v`, +Maximum Percentage Of Throughput For Probes: %v +Execution Policy: %v`, parameters.TestTimeout, parameters.MovingAvgDist, parameters.EvalInterval, parameters.TrimmedMeanPct, - parameters.StdDevTolerance, parameters.MaxParallelConns, parameters.ProbeInterval, parameters.ProbeCapacityPct, + parameters.StdDevTolerance, parameters.MaxParallelConns, parameters.ProbeInterval, + parameters.ProbeCapacityPct, parameters.ExecutionPolicy.ToString(), ) } diff --git a/rpm/parameters_test.go b/rpm/parameters_test.go index 2035a99..0070e99 100644 --- a/rpm/parameters_test.go +++ b/rpm/parameters_test.go @@ -17,91 +17,93 @@ package rpm import ( "strings" "testing" + + "github.com/network-quality/goresponsiveness/executor" ) func TestSpecParametersFromArgumentsBadTimeout(t *testing.T) { - _, err := SpecParametersFromArguments(0, 0, 0, 0, 0, 0, 0, 0, 0) + _, err := SpecParametersFromArguments(0, 0, 0, 0, 0, 0, 0, 0, 0, executor.Parallel) if err == nil || !strings.Contains(err.Error(), "timeout") { t.Fatalf("0 timeout improperly allowed.") } - _, err = SpecParametersFromArguments(-1, 0, 0, 0, 0, 0, 0, 0, 0) + _, err = SpecParametersFromArguments(-1, 0, 0, 0, 0, 0, 0, 0, 0, executor.Parallel) if err == nil || !strings.Contains(err.Error(), "timeout") { t.Fatalf("negative timeout improperly allowed.") } } func TestSpecParametersFromArgumentsBadMad(t *testing.T) { - _, err := SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0, 0) + _, err := SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0, 0, executor.Parallel) if err == nil || !strings.Contains(err.Error(), "moving-average") { t.Fatalf("0 mad improperly allowed.") } - _, err = SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0, 0) + _, err = SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0, 0, executor.Parallel) if err == nil || !strings.Contains(err.Error(), "moving-average") { t.Fatalf("negative mad improperly allowed.") } } func TestSpecParametersFromArgumentsBadId(t *testing.T) { - _, err := SpecParametersFromArguments(1, 1, 0, 0, 0, 0, 0, 0, 0) + _, err := SpecParametersFromArguments(1, 1, 0, 0, 0, 0, 0, 0, 0, executor.Parallel) if err == nil || !strings.Contains(err.Error(), "reevaluation") { t.Fatalf("0 id improperly allowed.") } - _, err = SpecParametersFromArguments(1, 1, -1, 0, 0, 0, 0, 0, 0) + _, err = SpecParametersFromArguments(1, 1, -1, 0, 0, 0, 0, 0, 0, executor.Parallel) if err == nil || !strings.Contains(err.Error(), "reevaluation") { t.Fatalf("negative id improperly allowed.") } } func TestSpecParametersFromArgumentsBadSdt(t *testing.T) { - _, err := SpecParametersFromArguments(1, 1, 1, 1, -1, 0, 0, 0, 0) + _, err := SpecParametersFromArguments(1, 1, 1, 1, -1, 0, 0, 0, 0, executor.Parallel) if err == nil || !strings.Contains(err.Error(), "deviation") { t.Fatalf("0 sdt improperly allowed.") } } func TestSpecParametersFromArgumentsBadMnp(t *testing.T) { - _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 0, 0, 0, 0) + _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 0, 0, 0, 0, executor.Parallel) if err == nil || !strings.Contains(err.Error(), "parallel") { t.Fatalf("0 mnp improperly allowed.") } - _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, -1, 0, 0, 0) + _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, -1, 0, 0, 0, executor.Parallel) if err == nil || !strings.Contains(err.Error(), "parallel") { t.Fatalf("negative mnp improperly allowed.") } } func TestSpecParametersFromArgumentsBadMps(t *testing.T) { - _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 0, 0, 0) + _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 0, 0, 0, executor.Parallel) if err == nil || !strings.Contains(err.Error(), "probing interval") { t.Fatalf("0 mps improperly allowed.") } - _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, -1, 0, 0) + _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, -1, 0, 0, executor.Parallel) if err == nil || !strings.Contains(err.Error(), "probing interval") { t.Fatalf("negative mps improperly allowed.") } } func TestSpecParametersFromArgumentsBadPtc(t *testing.T) { - _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 0, 0) + _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 0, 0, executor.Parallel) if err == nil || !strings.Contains(err.Error(), "capacity") { t.Fatalf("0 ptc improperly allowed.") } - _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, -1, 0) + _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, -1, 0, executor.Parallel) if err == nil || !strings.Contains(err.Error(), "capacity") { t.Fatalf("negative ptc improperly allowed.") } } func TestSpecParametersFromArgumentsBadP(t *testing.T) { - _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 1, -1) + _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 1, -1, executor.Parallel) if err == nil || !strings.Contains(err.Error(), "percentile") { t.Fatalf("-1 percentile improperly allowed.") } - _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 1, 0) + _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 1, 0, executor.Parallel) if err == nil || !strings.Contains(err.Error(), "percentile") { t.Fatalf("0 percentile improperly allowed.") } - _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 1, 101) + _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 1, 101, executor.Parallel) if err == nil || !strings.Contains(err.Error(), "percentile") { t.Fatalf("percentile greater than 100 improperly allowed.") } @@ -148,9 +148,8 @@ func ResponsivenessProber[BucketType utilities.Number]( ) } - currentBucketId := bucketGenerator.Generate() - dataPointsLock.Lock() + currentBucketId := bucketGenerator.Generate() if dataPoints != nil { dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, BucketType]{ Type: series.SeriesMessageReserve, Bucket: currentBucketId, diff --git a/series/series.go b/series/series.go index 6b9af1f..ef133d2 100644 --- a/series/series.go +++ b/series/series.go @@ -41,6 +41,8 @@ type WindowSeries[Data any, Bucket constraints.Ordered] interface { GetValues() []utilities.Optional[Data] Complete() bool GetType() WindowSeriesDuration + + Append(appended *WindowSeries[Data, Bucket]) } type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct { @@ -48,6 +50,7 @@ type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct { data []utilities.Pair[Bucket, utilities.Optional[Data]] latestIndex int empty bool + lock sync.RWMutex } /* @@ -58,6 +61,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error { if !wsi.empty && b <= wsi.data[wsi.latestIndex].First { return fmt.Errorf("reserving must be monotonically increasing") } + wsi.lock.Lock() + defer wsi.lock.Unlock() if wsi.empty { /* Special case if we are empty: The latestIndex is where we want this value to go! */ @@ -76,6 +81,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error { } func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) error { + wsi.lock.Lock() + defer wsi.lock.Unlock() iterator := wsi.latestIndex for { if wsi.data[iterator].First == b { @@ -91,6 +98,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) erro } func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Count() (some int, none int) { + wsi.lock.Lock() + defer wsi.lock.Unlock() some = 0 none = 0 for _, v := range wsi.data { @@ -104,6 +113,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Count() (some int, none int } func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Complete() bool { + wsi.lock.Lock() + defer wsi.lock.Unlock() for _, v := range wsi.data { if utilities.IsNone(v.Second) { return false @@ -113,10 +124,18 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Complete() bool { } func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) nextIndex(currentIndex int) int { + // Internal functions should be called with the lock held! + if wsi.lock.TryLock() { + panic("windowSeriesWindowOnlyImpl nextIndex called without lock held.") + } return (currentIndex + 1) % wsi.windowSize } func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) previousIndex(currentIndex int) int { + // Internal functions should be called with the lock held! + if wsi.lock.TryLock() { + panic("windowSeriesWindowOnlyImpl nextIndex called without lock held.") + } nextIndex := currentIndex - 1 if nextIndex < 0 { nextIndex += wsi.windowSize @@ -125,6 +144,10 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) previousIndex(currentIndex } func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optional[Data] { + // Internal functions should be called with the lock held! + if wsi.lock.TryLock() { + panic("windowSeriesWindowOnlyImpl nextIndex called without lock held.") + } result := make([]utilities.Optional[Data], wsi.windowSize) if wsi.empty { @@ -144,6 +167,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optio } func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] { + wsi.lock.Lock() + defer wsi.lock.Unlock() return wsi.toArray() } @@ -152,12 +177,16 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetType() WindowSeriesDurat } func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ForEach(eacher func(b Bucket, d *utilities.Optional[Data])) { + wsi.lock.Lock() + defer wsi.lock.Unlock() for _, v := range wsi.data { eacher(v.First, &v.Second) } } func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string { + wsi.lock.Lock() + defer wsi.lock.Unlock() result := fmt.Sprintf("Window series (window (%d) only, latest index: %v): ", wsi.windowSize, wsi.latestIndex) for _, v := range wsi.data { valueString := "None" @@ -169,6 +198,10 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string { return result } +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) { + panic("") +} + func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered]( windowSize int, ) *windowSeriesWindowOnlyImpl[Data, Bucket] { @@ -186,10 +219,14 @@ func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered]( type windowSeriesForeverImpl[Data any, Bucket constraints.Ordered] struct { data []utilities.Pair[Bucket, utilities.Optional[Data]] empty bool + lock sync.RWMutex } func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error { + wsi.lock.Lock() + defer wsi.lock.Unlock() if !wsi.empty && b <= wsi.data[len(wsi.data)-1].First { + fmt.Printf("reserving must be monotonically increasing") return fmt.Errorf("reserving must be monotonically increasing") } @@ -199,6 +236,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error { } func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error { + wsi.lock.Lock() + defer wsi.lock.Unlock() for i := range wsi.data { if wsi.data[i].First == b { wsi.data[i].Second = utilities.Some[Data](d) @@ -209,6 +248,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error { } func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] { + wsi.lock.Lock() + defer wsi.lock.Unlock() result := make([]utilities.Optional[Data], len(wsi.data)) for i, v := range utilities.Reverse(wsi.data) { @@ -219,6 +260,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Option } func (wsi *windowSeriesForeverImpl[Data, Bucket]) Count() (some int, none int) { + wsi.lock.Lock() + defer wsi.lock.Unlock() some = 0 none = 0 for _, v := range wsi.data { @@ -232,6 +275,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Count() (some int, none int) { } func (wsi *windowSeriesForeverImpl[Data, Bucket]) Complete() bool { + wsi.lock.Lock() + defer wsi.lock.Unlock() for _, v := range wsi.data { if utilities.IsNone(v.Second) { return false @@ -253,12 +298,16 @@ func newWindowSeriesForeverImpl[Data any, Bucket constraints.Ordered]() *windowS } func (wsi *windowSeriesForeverImpl[Data, Bucket]) ForEach(eacher func(b Bucket, d *utilities.Optional[Data])) { + wsi.lock.Lock() + defer wsi.lock.Unlock() for _, v := range wsi.data { eacher(v.First, &v.Second) } } func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string { + wsi.lock.Lock() + defer wsi.lock.Unlock() result := "Window series (forever): " for _, v := range wsi.data { valueString := "None" @@ -270,6 +319,19 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string { return result } +func (wsi *windowSeriesForeverImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) { + result, ok := (*appended).(*windowSeriesForeverImpl[Data, Bucket]) + if !ok { + panic("Cannot merge a forever window series with a non-forever window series.") + } + wsi.lock.Lock() + defer wsi.lock.Unlock() + result.lock.Lock() + defer result.lock.Unlock() + + wsi.data = append(wsi.data, result.data...) +} + /* * End of WindowSeries interface methods. */ diff --git a/series/series_test.go b/series/series_test.go index 3ed752d..54ddfcc 100644 --- a/series/series_test.go +++ b/series/series_test.go @@ -15,14 +15,21 @@ package series import ( "reflect" + "sync" "testing" + "time" + RPMTesting "github.com/network-quality/goresponsiveness/testing" "github.com/network-quality/goresponsiveness/utilities" ) func TestNextIndex(t *testing.T) { wsi := newWindowSeriesWindowOnlyImpl[int, int](4) + // Calling internal functions must be done with the lock held! + wsi.lock.Lock() + defer wsi.lock.Unlock() + idx := wsi.nextIndex(wsi.latestIndex) if idx != 1 { t.Fatalf("nextIndex is wrong (1)") @@ -54,6 +61,18 @@ func TestNextIndex(t *testing.T) { wsi.latestIndex = idx } +func TestNextIndexUnlocked(t *testing.T) { + wsi := newWindowSeriesWindowOnlyImpl[int, int](4) + + panicingTest := func() { + wsi.nextIndex(wsi.latestIndex) + } + + if !RPMTesting.DidPanic(panicingTest) { + t.Fatalf("Expected a call to nextIndex (without the lock held) to panic but it did not") + } +} + func TestSimpleWindowComplete(t *testing.T) { wsi := newWindowSeriesWindowOnlyImpl[int, int](4) if wsi.Complete() { @@ -877,3 +896,48 @@ func Test_ForeverStandardDeviationCalculation2(test *testing.T) { test.Fatalf("Standard deviation(series) max calculation(series) failed: Expected: %v; Actual: %v.", expected, sd) } } + +func Test_ForeverLocking(test *testing.T) { + series := newWindowSeriesForeverImpl[float64, int]() + testFail := false + + series.Reserve(1) + series.Reserve(2) + + series.Fill(1, 8) + series.Fill(2, 9) + + wg := sync.WaitGroup{} + + counter := 0 + + wg.Add(2) + go func() { + series.ForEach(func(b int, d *utilities.Optional[float64]) { + // All of these ++s should be done under a single lock of the lock and, therefore, + // the ForEach below should not start until both buckets are ForEach'd over! + counter++ + // Make this a long wait so we know that there is no chance for a race and that + // we are really testing what we mean to test! + time.Sleep(time.Second * 5) + }) + wg.Done() + }() + + time.Sleep(1 * time.Second) + + go func() { + series.ForEach(func(b int, d *utilities.Optional[float64]) { + if counter != 2 { + testFail = true + } + }) + wg.Done() + }() + + wg.Wait() + + if testFail { + test.Fatalf("Mutual exclusion checks did not properly lock out parallel ForEach operations.") + } +} diff --git a/testing/utilities.go b/testing/utilities.go new file mode 100644 index 0000000..6272a6d --- /dev/null +++ b/testing/utilities.go @@ -0,0 +1,30 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package testing + +// Return true/false depending on whether the given function panics. +func DidPanic(doer func()) (didPanic bool) { + didPanic = false + + defer func() { + if recovering := recover(); recovering != nil { + didPanic = true + } + }() + + doer() + + return +} diff --git a/tools/graphing.py b/tools/graphing.py index eb1e6d7..63d5166 100644 --- a/tools/graphing.py +++ b/tools/graphing.py @@ -68,6 +68,7 @@ def find_earliest(dfs): """ earliest = dfs[0]["CreationTime"].iloc[0] for df in dfs: + print(f"A data frame: {df['CreationTime']}") if df["CreationTime"].iloc[0] < earliest: earliest = df["CreationTime"].iloc[0] return earliest @@ -89,7 +90,7 @@ def time_since_start(dfs, start, column_name="TimeSinceStart"): def probeClean(df): # ConnRTT and ConnCongestionWindow refer to Underlying Connection - df.columns = ["CreationTime", "NumRTT", "Duration", "ConnRTT", "ConnCongestionWindow", "Type", "Empty"] + df.columns = ["CreationTime", "NumRTT", "Duration", "ConnRTT", "ConnCongestionWindow", "Type", "Algorithm", "Empty"] df = df.drop(columns=["Empty"]) df["CreationTime"] = pd.to_datetime(df["CreationTime"], format="%m-%d-%Y-%H-%M-%S.%f") df["Type"] = df["Type"].apply(str.strip) @@ -99,7 +100,7 @@ def probeClean(df): def throughputClean(df): - df.columns = ["CreationTime", "Throughput", "NumberConnections", "Empty"] + df.columns = ["CreationTime", "Throughput", "NumberActiveConnections", "NumberConnections", "Empty"] df = df.drop(columns=["Empty"]) df["CreationTime"] = pd.to_datetime(df["CreationTime"], format="%m-%d-%Y-%H-%M-%S.%f") df["ADJ_Throughput"] = df["Throughput"] / 1000000 @@ -306,6 +307,8 @@ def graph_normal(dfs, xcolumn, ax, title): def stacked_area_throughput(throughput_df, granular, xcolumn, ycolumn, ax, title, label, linecolor="black"): + + print(f"Stacked area throughput!") ax.set_title(title) ax.yaxis.tick_right() @@ -448,6 +451,7 @@ def make_graphs(files, save): if not containsALL: continue + print(f"About to call main()") main(start + " - " + str(x), files[start][end]) if save: pdf = matplotlib.backends.backend_pdf.PdfPages(f"{start} - {x}.pdf") |
