summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2023-12-13 19:56:03 -0500
committerWill Hawkins <[email protected]>2024-01-04 19:10:37 -0500
commitf3990f950277c2f61e0e1811b4b8a81fc0219da4 (patch)
tree6969e4ac2c4e94e75fe2e0c5581da5c07785dce8
parent552f01ad73248474553ce471695745db58c862ea (diff)
[Feature] Support for testing upload/download in parallel
Use the `--rpm.parallel` to test in parallel mode. The default testing mode is serial. Signed-off-by: Will Hawkins <[email protected]>
-rw-r--r--constants/constants.go9
-rw-r--r--direction/direction.go8
-rw-r--r--executor/executor.go53
-rw-r--r--executor/executor_test.go56
-rw-r--r--networkQuality.go844
-rw-r--r--rpm/parameters.go15
-rw-r--r--rpm/parameters_test.go34
-rw-r--r--rpm/rpm.go3
-rw-r--r--series/series.go62
-rw-r--r--series/series_test.go64
-rw-r--r--testing/utilities.go30
-rw-r--r--tools/graphing.py8
12 files changed, 772 insertions, 414 deletions
diff --git a/constants/constants.go b/constants/constants.go
index 8621de8..81c932e 100644
--- a/constants/constants.go
+++ b/constants/constants.go
@@ -14,7 +14,11 @@
package constants
-import "time"
+import (
+ "time"
+
+ "github.com/network-quality/goresponsiveness/executor"
+)
var (
// The initial number of load-generating connections when attempting to saturate the network.
@@ -38,6 +42,9 @@ var (
DefaultInsecureSkipVerify bool = true
DefaultL4SCongestionControlAlgorithm string = "prague"
+
+ // The default execution policy for running a test (serial)
+ DefaultTestExecutionPolicy = executor.Serial
)
type SpecParametersCliOptions struct {
diff --git a/direction/direction.go b/direction/direction.go
index 865a273..9b41b26 100644
--- a/direction/direction.go
+++ b/direction/direction.go
@@ -15,11 +15,14 @@
package direction
import (
+ "context"
+
"github.com/network-quality/goresponsiveness/datalogger"
"github.com/network-quality/goresponsiveness/debug"
"github.com/network-quality/goresponsiveness/lgc"
"github.com/network-quality/goresponsiveness/probe"
"github.com/network-quality/goresponsiveness/rpm"
+ "github.com/network-quality/goresponsiveness/series"
)
type Direction struct {
@@ -37,4 +40,9 @@ type Direction struct {
ExtendedStatsEligible bool
StableThroughput bool
StableResponsiveness bool
+ SelfRtts series.WindowSeries[float64, uint64]
+ ForeignRtts series.WindowSeries[float64, uint64]
+ ThroughputActivityCtx *context.Context
+ ThroughputActivityCtxCancel *context.CancelFunc
+ FormattedResults string
}
diff --git a/executor/executor.go b/executor/executor.go
new file mode 100644
index 0000000..22c1235
--- /dev/null
+++ b/executor/executor.go
@@ -0,0 +1,53 @@
+package executor
+
+import (
+ "sync"
+)
+
+type ExecutionMethod int
+
+const (
+ Parallel ExecutionMethod = iota
+ Serial
+)
+
+type ExecutionUnit func()
+
+func (ep ExecutionMethod) ToString() string {
+ switch ep {
+ case Parallel:
+ return "Parallel"
+ case Serial:
+ return "Serial"
+ }
+ return "Unrecognized execution method"
+}
+
+func Execute(executionMethod ExecutionMethod, executionUnits []ExecutionUnit) *sync.WaitGroup {
+ waiter := &sync.WaitGroup{}
+
+ // Make sure that we Add to the wait group all the execution units
+ // before starting to run any -- there is a potential race condition
+ // otherwise.
+ (*waiter).Add(len(executionUnits))
+
+ for _, executionUnit := range executionUnits {
+ // Stupid capture in Go! Argh.
+ executionUnit := executionUnit
+
+ invoker := func() {
+ executionUnit()
+ (*waiter).Done()
+ }
+ switch executionMethod {
+ case Parallel:
+ go invoker()
+ case Serial:
+ invoker()
+ default:
+ panic("Invalid execution method value given.")
+ }
+ }
+
+ return waiter
+}
diff --git a/executor/executor_test.go b/executor/executor_test.go
new file mode 100644
index 0000000..9c3af7e
--- /dev/null
+++ b/executor/executor_test.go
@@ -0,0 +1,56 @@
+package executor_test
+
+import (
+ "testing"
+ "time"
+
+ "github.com/network-quality/goresponsiveness/executor"
+)
+
+var countToFive = func() {
+ time.Sleep(5 * time.Second)
+}
+
+var countToThree = func() {
+ time.Sleep(3 * time.Second)
+}
+
+var executionUnits = []executor.ExecutionUnit{countToFive, countToThree}
+
+func TestSerial(t *testing.T) {
+ then := time.Now()
+ waiter := executor.Execute(executor.Serial, executionUnits)
+ waiter.Wait()
+ when := time.Now()
+
+ if when.Sub(then) < 7*time.Second {
+ t.Fatalf("Execution did not happen serially -- the wait was too short: %v", when.Sub(then).Seconds())
+ }
+}
+
+func TestParallel(t *testing.T) {
+ then := time.Now()
+ waiter := executor.Execute(executor.Parallel, executionUnits)
+ waiter.Wait()
+ when := time.Now()
+
+ if when.Sub(then) > 6*time.Second {
+ t.Fatalf("Execution did not happen in parallel -- the wait was too long: %v", when.Sub(then).Seconds())
+ }
+}
+
+func TestExecutionMethodParallelToString(t *testing.T) {
+ executionMethod := executor.Parallel
+
+ if executionMethod.ToString() != "Parallel" {
+ t.Fatalf("Incorrect result from ExecutionMethod.ToString; expected Parallel but got %v", executionMethod.ToString())
+ }
+}
+
+func TestExecutionMethodSerialToString(t *testing.T) {
+ executionMethod := executor.Serial
+
+ if executionMethod.ToString() != "Serial" {
+ t.Fatalf("Incorrect result from ExecutionMethod.ToString; expected Serial but got %v", executionMethod.ToString())
+ }
+}
diff --git a/networkQuality.go b/networkQuality.go
index 340a316..2e13893 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -31,6 +31,7 @@ import (
"github.com/network-quality/goresponsiveness/datalogger"
"github.com/network-quality/goresponsiveness/debug"
"github.com/network-quality/goresponsiveness/direction"
+ "github.com/network-quality/goresponsiveness/executor"
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
"github.com/network-quality/goresponsiveness/probe"
@@ -167,9 +168,20 @@ var (
)
withL4S = flag.Bool("with-l4s", false, "Use L4S (with default TCP prague congestion control algorithm.)")
withL4SAlgorithm = flag.String("with-l4s-algorithm", "", "Use L4S (with specified congestion control algorithm.)")
+
+ parallelTestExecutionPolicy = constants.DefaultTestExecutionPolicy
)
func main() {
+ // Add one final command-line argument
+ flag.BoolFunc("rpm.parallel", "Parallel test execution policy.", func(value string) error {
+ if value != "true" {
+ return fmt.Errorf("-parallel can only be used to enable parallel test execution policy")
+ }
+ parallelTestExecutionPolicy = executor.Parallel
+ return nil
+ })
+
flag.Parse()
if *showVersion {
@@ -184,7 +196,7 @@ func main() {
}
specParameters, err := rpm.SpecParametersFromArguments(*rpmtimeout, *rpmmad, *rpmid,
- *rpmtmp, *rpmsdt, *rpmmnp, *rpmmps, *rpmptc, *rpmp)
+ *rpmtmp, *rpmsdt, *rpmmnp, *rpmmps, *rpmptc, *rpmp, parallelTestExecutionPolicy)
if err != nil {
fmt.Fprintf(
os.Stderr,
@@ -639,479 +651,533 @@ func main() {
}
- // All tests will accumulate data to these series because it will all matter for RPM calculation!
- selfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
- foreignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
-
var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil
if *printQualityAttenuation {
selfRttsQualityAttenuation = qualityattenuation.NewSimpleQualityAttenuation()
}
+ directionExecutionUnits := make([]executor.ExecutionUnit, 0)
+
for _, direction := range directions {
+ // Make a copy here to make sure that we do not get go-wierdness in our closure (see https://github.com/golang/go/discussions/56010).
+ direction := direction
- timeoutDuration := specParameters.TestTimeout
- timeoutAbsoluteTime := time.Now().Add(timeoutDuration)
+ directionExecutionUnit := func() {
+ timeoutDuration := specParameters.TestTimeout
+ timeoutAbsoluteTime := time.Now().Add(timeoutDuration)
- timeoutChannel := timeoutat.TimeoutAt(
- operatingCtx,
- timeoutAbsoluteTime,
- debugLevel,
- )
- if debug.IsDebug(debugLevel) {
- fmt.Printf("%s Test will end no later than %v\n", direction.DirectionLabel, timeoutAbsoluteTime)
- }
+ timeoutChannel := timeoutat.TimeoutAt(
+ operatingCtx,
+ timeoutAbsoluteTime,
+ debugLevel,
+ )
+ if debug.IsDebug(debugLevel) {
+ fmt.Printf("%s Test will end no later than %v\n",
+ direction.DirectionLabel, timeoutAbsoluteTime)
+ }
- throughputCtx, throughputCtxCancel := context.WithCancel(operatingCtx)
- proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx)
+ throughputOperatorCtx, throughputOperatorCtxCancel := context.WithCancel(operatingCtx)
+ proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx)
- // This context is used to control the network activity (i.e., it controls all
- // the connections that are open to do load generation and probing). Cancelling this context will close
- // all the network connections that are responsible for generating the load.
- networkActivityCtx, networkActivityCtxCancel := context.WithCancel(operatingCtx)
+ // This context is used to control the network activity (i.e., it controls all
+ // the connections that are open to do load generation and probing). Cancelling this context will close
+ // all the network connections that are responsible for generating the load.
+ probeNetworkActivityCtx, probeNetworkActivityCtxCancel := context.WithCancel(operatingCtx)
+ throughputCtx, throughputCtxCancel := context.WithCancel(operatingCtx)
+ direction.ThroughputActivityCtx, direction.ThroughputActivityCtxCancel = &throughputCtx, &throughputCtxCancel
- throughputGeneratorCtx, throughputGeneratorCtxCancel := context.WithCancel(throughputCtx)
+ reachWorkingConditionsCtx, reachWorkingConditionsCtxCancel :=
+ context.WithCancel(throughputOperatorCtx)
- lgStabilizationCommunicationChannel := rpm.LoadGenerator(
- throughputCtx,
- networkActivityCtx,
- throughputGeneratorCtx,
- specParameters.EvalInterval,
- direction.CreateLgdc,
- &direction.Lgcc,
- &globalNumericBucketGenerator,
- specParameters.MaxParallelConns,
- *calculateExtendedStats,
- direction.DirectionDebugging,
- )
+ lgStabilizationCommunicationChannel := rpm.LoadGenerator(
+ throughputOperatorCtx,
+ *direction.ThroughputActivityCtx,
+ reachWorkingConditionsCtx,
+ specParameters.EvalInterval,
+ direction.CreateLgdc,
+ &direction.Lgcc,
+ &globalNumericBucketGenerator,
+ specParameters.MaxParallelConns,
+ *calculateExtendedStats,
+ direction.DirectionDebugging,
+ )
- throughputStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug,
- fmt.Sprintf("%v Throughput Stabilizer", direction.DirectionLabel))
- downloadThroughputStabilizerDebugLevel := debug.Error
- if *debugCliFlag {
- downloadThroughputStabilizerDebugLevel = debug.Debug
- }
- throughputStabilizer := stabilizer.NewStabilizer[float64, uint64](
- specParameters.MovingAvgDist, specParameters.StdDevTolerance, 0, "bytes",
- downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig)
+ throughputStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug,
+ fmt.Sprintf("%v Throughput Stabilizer", direction.DirectionLabel))
+ downloadThroughputStabilizerDebugLevel := debug.Error
+ if *debugCliFlag {
+ downloadThroughputStabilizerDebugLevel = debug.Debug
+ }
+ throughputStabilizer := stabilizer.NewStabilizer[float64, uint64](
+ specParameters.MovingAvgDist, specParameters.StdDevTolerance, 0, "bytes",
+ downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig)
- responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug,
- fmt.Sprintf("%v Responsiveness Stabilizer", direction.DirectionLabel))
- responsivenessStabilizerDebugLevel := debug.Error
- if *debugCliFlag {
- responsivenessStabilizerDebugLevel = debug.Debug
- }
- responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint64](
- specParameters.MovingAvgDist, specParameters.StdDevTolerance,
- specParameters.TrimmedMeanPct, "milliseconds",
- responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig)
+ responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug,
+ fmt.Sprintf("%v Responsiveness Stabilizer", direction.DirectionLabel))
+ responsivenessStabilizerDebugLevel := debug.Error
+ if *debugCliFlag {
+ responsivenessStabilizerDebugLevel = debug.Debug
+ }
+ responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint64](
+ specParameters.MovingAvgDist, specParameters.StdDevTolerance,
+ specParameters.TrimmedMeanPct, "milliseconds",
+ responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig)
- // For later debugging output, record the last throughputs on load-generating connectings
- // and the number of open connections.
- lastThroughputRate := float64(0)
- lastThroughputOpenConnectionCount := int(0)
+ // For later debugging output, record the last throughputs on load-generating connectings
+ // and the number of open connections.
+ lastThroughputRate := float64(0)
+ lastThroughputOpenConnectionCount := int(0)
- stabilityCheckTime := time.Now().Add(specParameters.EvalInterval)
- stabilityCheckTimeChannel := timeoutat.TimeoutAt(
- operatingCtx,
- stabilityCheckTime,
- debugLevel,
- )
+ stabilityCheckTime := time.Now().Add(specParameters.EvalInterval)
+ stabilityCheckTimeChannel := timeoutat.TimeoutAt(
+ operatingCtx,
+ stabilityCheckTime,
+ debugLevel,
+ )
- lg_timeout:
- for !direction.StableThroughput {
- select {
- case throughputMeasurement := <-lgStabilizationCommunicationChannel:
- {
- switch throughputMeasurement.Type {
- case series.SeriesMessageReserve:
- {
- throughputStabilizer.Reserve(throughputMeasurement.Bucket)
- if *debugCliFlag {
- fmt.Printf(
- "%s: Reserving a throughput bucket with id %v.\n",
- direction.DirectionLabel, throughputMeasurement.Bucket)
+ lg_timeout:
+ for !direction.StableThroughput {
+ select {
+ case throughputMeasurement := <-lgStabilizationCommunicationChannel:
+ {
+ switch throughputMeasurement.Type {
+ case series.SeriesMessageReserve:
+ {
+ throughputStabilizer.Reserve(throughputMeasurement.Bucket)
+ if *debugCliFlag {
+ fmt.Printf(
+ "%s: Reserving a throughput bucket with id %v.\n",
+ direction.DirectionLabel, throughputMeasurement.Bucket)
+ }
}
- }
- case series.SeriesMessageMeasure:
- {
- bucket := throughputMeasurement.Bucket
- measurement := utilities.GetSome(throughputMeasurement.Measure)
+ case series.SeriesMessageMeasure:
+ {
+ bucket := throughputMeasurement.Bucket
+ measurement := utilities.GetSome(throughputMeasurement.Measure)
- throughputStabilizer.AddMeasurement(bucket, measurement.Throughput)
+ throughputStabilizer.AddMeasurement(bucket, measurement.Throughput)
- direction.ThroughputDataLogger.LogRecord(measurement)
- for _, v := range measurement.GranularThroughputDataPoints {
- v.Direction = "Download"
- direction.GranularThroughputDataLogger.LogRecord(v)
- }
+ direction.ThroughputDataLogger.LogRecord(measurement)
+ for _, v := range measurement.GranularThroughputDataPoints {
+ v.Direction = "Download"
+ direction.GranularThroughputDataLogger.LogRecord(v)
+ }
- lastThroughputRate = measurement.Throughput
- lastThroughputOpenConnectionCount = measurement.Connections
+ lastThroughputRate = measurement.Throughput
+ lastThroughputOpenConnectionCount = measurement.Connections
+ }
}
}
- }
- case <-stabilityCheckTimeChannel:
- {
- if *debugCliFlag {
- fmt.Printf(
- "%v throughput stability interval is complete.\n", direction.DirectionLabel)
- }
- stabilityCheckTime = time.Now().Add(specParameters.EvalInterval)
- stabilityCheckTimeChannel = timeoutat.TimeoutAt(
- operatingCtx,
- stabilityCheckTime,
- debugLevel,
- )
+ case <-stabilityCheckTimeChannel:
+ {
+ if *debugCliFlag {
+ fmt.Printf(
+ "%v throughput stability interval is complete.\n", direction.DirectionLabel)
+ }
+ stabilityCheckTime = time.Now().Add(specParameters.EvalInterval)
+ stabilityCheckTimeChannel = timeoutat.TimeoutAt(
+ operatingCtx,
+ stabilityCheckTime,
+ debugLevel,
+ )
- direction.StableThroughput = throughputStabilizer.IsStable()
- if *debugCliFlag {
- fmt.Printf(
- "%v is instantaneously %s.\n", direction.DirectionLabel,
- utilities.Conditional(direction.StableThroughput, "stable", "unstable"))
- }
+ direction.StableThroughput = throughputStabilizer.IsStable()
+ if *debugCliFlag {
+ fmt.Printf(
+ "%v is instantaneously %s.\n", direction.DirectionLabel,
+ utilities.Conditional(direction.StableThroughput, "stable", "unstable"))
+ }
- if direction.StableThroughput {
- throughputGeneratorCtxCancel()
+ throughputStabilizer.Interval()
+ }
+ case <-timeoutChannel:
+ {
+ break lg_timeout
}
- throughputStabilizer.Interval()
- }
- case <-timeoutChannel:
- {
- break lg_timeout
}
}
- }
- if direction.StableThroughput {
- if *debugCliFlag {
- fmt.Printf("Throughput is stable; beginning responsiveness testing.\n")
+ if direction.StableThroughput {
+ if *debugCliFlag {
+ fmt.Printf("Throughput is stable; beginning responsiveness testing.\n")
+ }
+ } else {
+ fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Making the test 15 seconds longer to calculate speculative RPM results.\n")
+ speculativeTimeoutDuration := time.Second * 15
+ speculativeAbsoluteTimeoutTime := time.Now().Add(speculativeTimeoutDuration)
+ timeoutChannel = timeoutat.TimeoutAt(
+ operatingCtx,
+ speculativeAbsoluteTimeoutTime,
+ debugLevel,
+ )
}
- } else {
- fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Adding 15 seconds to calculate speculative RPM results.\n")
- speculativeTimeoutDuration := time.Second * 15
- speculativeAbsoluteTimeoutTime := time.Now().Add(speculativeTimeoutDuration)
- timeoutChannel = timeoutat.TimeoutAt(
- operatingCtx,
- speculativeAbsoluteTimeoutTime,
- debugLevel,
- )
- }
- perDirectionSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
- perDirectionForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
+ // No matter what, we will stop adding additional load-generating connections!
+ reachWorkingConditionsCtxCancel()
- responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber(
- proberOperatorCtx,
- networkActivityCtx,
- generateForeignProbeConfiguration,
- generateSelfProbeConfiguration,
- &direction.Lgcc,
- &globalNumericBucketGenerator,
- direction.CreateLgdc().Direction(), // TODO: This could be better!
- specParameters.ProbeInterval,
- sslKeyFileConcurrentWriter,
- *calculateExtendedStats,
- direction.ProbeDebugging,
- )
+ direction.SelfRtts = series.NewWindowSeries[float64, uint64](series.Forever, 0)
+ direction.ForeignRtts = series.NewWindowSeries[float64, uint64](series.Forever, 0)
- responsiveness_timeout:
- for !direction.StableResponsiveness {
- select {
- case probeMeasurement := <-responsivenessStabilizationCommunicationChannel:
- {
- switch probeMeasurement.Type {
- case series.SeriesMessageReserve:
- {
- bucket := probeMeasurement.Bucket
- if *debugCliFlag {
- fmt.Printf(
- "%s: Reserving a responsiveness bucket with id %v.\n", direction.DirectionLabel, bucket)
- }
- responsivenessStabilizer.Reserve(bucket)
- selfRtts.Reserve(bucket)
- foreignRtts.Reserve(bucket)
- perDirectionForeignRtts.Reserve(bucket)
- perDirectionSelfRtts.Reserve(bucket)
- }
- case series.SeriesMessageMeasure:
- {
- bucket := probeMeasurement.Bucket
- measurement := utilities.GetSome(probeMeasurement.Measure)
- foreignDataPoint := measurement.Foreign
- selfDataPoint := measurement.Self
+ responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber(
+ proberOperatorCtx,
+ probeNetworkActivityCtx,
+ generateForeignProbeConfiguration,
+ generateSelfProbeConfiguration,
+ &direction.Lgcc,
+ &globalNumericBucketGenerator,
+ direction.CreateLgdc().Direction(), // TODO: This could be better!
+ specParameters.ProbeInterval,
+ sslKeyFileConcurrentWriter,
+ *calculateExtendedStats,
+ direction.ProbeDebugging,
+ )
- if *debugCliFlag {
- fmt.Printf(
- "%s: Filling a responsiveness bucket with id %v with value %v.\n",
- direction.DirectionLabel, bucket, measurement)
+ responsiveness_timeout:
+ for !direction.StableResponsiveness {
+ select {
+ case probeMeasurement := <-responsivenessStabilizationCommunicationChannel:
+ {
+ switch probeMeasurement.Type {
+ case series.SeriesMessageReserve:
+ {
+ bucket := probeMeasurement.Bucket
+ if *debugCliFlag {
+ fmt.Printf(
+ "%s: Reserving a responsiveness bucket with id %v.\n", direction.DirectionLabel, bucket)
+ }
+ responsivenessStabilizer.Reserve(bucket)
+ direction.ForeignRtts.Reserve(bucket)
+ direction.SelfRtts.Reserve(bucket)
}
- responsivenessStabilizer.AddMeasurement(bucket,
- (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds())
+ case series.SeriesMessageMeasure:
+ {
+ bucket := probeMeasurement.Bucket
+ measurement := utilities.GetSome(probeMeasurement.Measure)
+ foreignDataPoint := measurement.Foreign
+ selfDataPoint := measurement.Self
- if err := selfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil {
- fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (selfRtts)\n", bucket)
- }
- if err := perDirectionSelfRtts.Fill(bucket,
- selfDataPoint.Duration.Seconds()); err != nil {
- fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionSelfRtts)\n", bucket)
- }
+ if *debugCliFlag {
+ fmt.Printf(
+ "%s: Filling a responsiveness bucket with id %v with value %v.\n",
+ direction.DirectionLabel, bucket, measurement)
+ }
+ responsivenessStabilizer.AddMeasurement(bucket,
+ (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds())
- if err := foreignRtts.Fill(bucket, foreignDataPoint.Duration.Seconds()); err != nil {
- fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (foreignRtts)\n", bucket)
- }
+ if err := direction.SelfRtts.Fill(bucket,
+ selfDataPoint.Duration.Seconds()); err != nil {
+ fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionSelfRtts)\n", bucket)
+ }
- if err := perDirectionForeignRtts.Fill(bucket,
- foreignDataPoint.Duration.Seconds()); err != nil {
- fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionForeignRtts)\n", bucket)
- }
+ if err := direction.ForeignRtts.Fill(bucket,
+ foreignDataPoint.Duration.Seconds()); err != nil {
+ fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionForeignRtts)\n", bucket)
+ }
- if selfRttsQualityAttenuation != nil {
- selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds())
- }
+ if selfRttsQualityAttenuation != nil {
+ selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds())
+ }
- direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint)
- direction.SelfProbeDataLogger.LogRecord(*selfDataPoint)
+ direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint)
+ direction.SelfProbeDataLogger.LogRecord(*selfDataPoint)
+ }
}
}
- }
- case throughputMeasurement := <-lgStabilizationCommunicationChannel:
- {
- switch throughputMeasurement.Type {
- case series.SeriesMessageReserve:
- {
- // We are no longer tracking stability, so reservation messages are useless!
- if *debugCliFlag {
- fmt.Printf(
- "%s: Discarding a throughput bucket with id %v when ascertaining responsiveness.\n",
- direction.DirectionLabel, throughputMeasurement.Bucket)
+ case throughputMeasurement := <-lgStabilizationCommunicationChannel:
+ {
+ switch throughputMeasurement.Type {
+ case series.SeriesMessageReserve:
+ {
+ // We are no longer tracking stability, so reservation messages are useless!
+ if *debugCliFlag {
+ fmt.Printf(
+ "%s: Discarding a throughput bucket with id %v when ascertaining responsiveness.\n",
+ direction.DirectionLabel, throughputMeasurement.Bucket)
+ }
}
- }
- case series.SeriesMessageMeasure:
- {
- measurement := utilities.GetSome(throughputMeasurement.Measure)
+ case series.SeriesMessageMeasure:
+ {
+ measurement := utilities.GetSome(throughputMeasurement.Measure)
- if *debugCliFlag {
- fmt.Printf("Adding a throughput measurement (while ascertaining responsiveness).\n")
- }
- // There may be more than one round trip accumulated together. If that is the case,
- direction.ThroughputDataLogger.LogRecord(measurement)
- for _, v := range measurement.GranularThroughputDataPoints {
- v.Direction = direction.DirectionLabel
- direction.GranularThroughputDataLogger.LogRecord(v)
- }
+ if *debugCliFlag {
+ fmt.Printf("Adding a throughput measurement (while ascertaining responsiveness).\n")
+ }
+ // There may be more than one round trip accumulated together. If that is the case,
+ direction.ThroughputDataLogger.LogRecord(measurement)
+ for _, v := range measurement.GranularThroughputDataPoints {
+ v.Direction = direction.DirectionLabel
+ direction.GranularThroughputDataLogger.LogRecord(v)
+ }
- lastThroughputRate = measurement.Throughput
- lastThroughputOpenConnectionCount = measurement.Connections
+ lastThroughputRate = measurement.Throughput
+ lastThroughputOpenConnectionCount = measurement.Connections
+ }
}
}
- }
- case <-timeoutChannel:
- {
- break responsiveness_timeout
- }
- case <-stabilityCheckTimeChannel:
- {
- if *debugCliFlag {
- fmt.Printf(
- "%v responsiveness stability interval is complete.\n", direction.DirectionLabel)
+ case <-timeoutChannel:
+ {
+ break responsiveness_timeout
}
+ case <-stabilityCheckTimeChannel:
+ {
+ if *debugCliFlag {
+ fmt.Printf(
+ "%v responsiveness stability interval is complete.\n", direction.DirectionLabel)
+ }
- stabilityCheckTime = time.Now().Add(specParameters.EvalInterval)
- stabilityCheckTimeChannel = timeoutat.TimeoutAt(
- operatingCtx,
- stabilityCheckTime,
- debugLevel,
- )
+ stabilityCheckTime = time.Now().Add(specParameters.EvalInterval)
+ stabilityCheckTimeChannel = timeoutat.TimeoutAt(
+ operatingCtx,
+ stabilityCheckTime,
+ debugLevel,
+ )
- // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy
- // is *actually* important, but it can't hurt?
- direction.StableResponsiveness = responsivenessStabilizer.IsStable()
+ // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy
+ // is *actually* important, but it can't hurt?
+ direction.StableResponsiveness = responsivenessStabilizer.IsStable()
- if *debugCliFlag {
- fmt.Printf(
- "Responsiveness is instantaneously %s.\n",
- utilities.Conditional(direction.StableResponsiveness, "stable", "unstable"))
- }
+ if *debugCliFlag {
+ fmt.Printf(
+ "Responsiveness is instantaneously %s.\n",
+ utilities.Conditional(direction.StableResponsiveness, "stable", "unstable"))
+ }
- responsivenessStabilizer.Interval()
+ responsivenessStabilizer.Interval()
+ }
}
}
- }
-
- // Did the test run to stability?
- testRanToStability := direction.StableThroughput && direction.StableResponsiveness
-
- if *debugCliFlag {
- fmt.Printf("Stopping all the load generating data generators (stability: %s).\n",
- utilities.Conditional(testRanToStability, "success", "failure"))
- }
- /* At this point there are
- 1. Load generators running
- -- uploadLoadGeneratorOperatorCtx
- -- downloadLoadGeneratorOperatorCtx
- 2. Network connections opened by those load generators:
- -- lgNetworkActivityCtx
- 3. Probes
- -- proberCtx
- */
+ // Did the test run to stability?
+ testRanToStability := direction.StableThroughput && direction.StableResponsiveness
- // First, stop the load generator and the probe operators (but *not* the network activity)
- proberOperatorCtxCancel()
- throughputCtxCancel()
+ if *debugCliFlag {
+ fmt.Printf("Stopping all the load generating data generators (stability: %s).\n",
+ utilities.Conditional(testRanToStability, "success", "failure"))
+ }
- // Second, calculate the extended stats (if the user requested and they are available for the direction)
- extendedStats := extendedstats.AggregateExtendedStats{}
- if *calculateExtendedStats && direction.ExtendedStatsEligible {
- if extendedstats.ExtendedStatsAvailable() {
- func() {
- // Put inside an IIFE so that we can use a defer!
- direction.Lgcc.Lock.Lock()
- defer direction.Lgcc.Lock.Unlock()
+ /* At this point there are
+ 1. Load generators running
+ -- uploadLoadGeneratorOperatorCtx
+ -- downloadLoadGeneratorOperatorCtx
+ 2. Network connections opened by those load generators:
+ -- lgNetworkActivityCtx
+ 3. Probes
+ -- proberCtx
+ */
- lgcCount, err := direction.Lgcc.Len()
- if err != nil {
- fmt.Fprintf(
- os.Stderr,
- "Warning: Could not calculate the number of %v load-generating connections; aborting extended stats preparation.\n", direction.DirectionLabel,
- )
- return
- }
+ // First, stop the load generator and the probe operators (but *not* the network activity)
+ proberOperatorCtxCancel()
+ throughputOperatorCtxCancel()
- for i := 0; i < lgcCount; i++ {
- // Assume that extended statistics are available -- the check was done explicitly at
- // program startup if the calculateExtendedStats flag was set by the user on the command line.
- currentLgc, _ := direction.Lgcc.Get(i)
+ // Second, calculate the extended stats (if the user requested and they are available for the direction)
+ extendedStats := extendedstats.AggregateExtendedStats{}
+ if *calculateExtendedStats && direction.ExtendedStatsEligible {
+ if extendedstats.ExtendedStatsAvailable() {
+ func() {
+ // Put inside an IIFE so that we can use a defer!
+ direction.Lgcc.Lock.Lock()
+ defer direction.Lgcc.Lock.Unlock()
- if currentLgc == nil || (*currentLgc).Stats() == nil {
+ lgcCount, err := direction.Lgcc.Len()
+ if err != nil {
fmt.Fprintf(
os.Stderr,
- "Warning: Could not add extended stats for the connection: The LGC was nil or there were no stats available.\n",
+ "Warning: Could not calculate the number of %v load-generating connections; aborting extended stats preparation.\n", direction.DirectionLabel,
)
- continue
+ return
}
- if err := extendedStats.IncorporateConnectionStats(
- (*currentLgc).Stats().ConnInfo.Conn); err != nil {
- fmt.Fprintf(
- os.Stderr,
- "Warning: Could not add extended stats for the connection: %v.\n",
- err,
- )
+
+ for i := 0; i < lgcCount; i++ {
+ // Assume that extended statistics are available -- the check was done explicitly at
+ // program startup if the calculateExtendedStats flag was set by the user on the command line.
+ currentLgc, _ := direction.Lgcc.Get(i)
+
+ if currentLgc == nil || (*currentLgc).Stats() == nil {
+ fmt.Fprintf(
+ os.Stderr,
+ "Warning: Could not add extended stats for the connection: The LGC was nil or there were no stats available.\n",
+ )
+ continue
+ }
+ if err := extendedStats.IncorporateConnectionStats(
+ (*currentLgc).Stats().ConnInfo.Conn); err != nil {
+ fmt.Fprintf(
+ os.Stderr,
+ "Warning: Could not add extended stats for the connection: %v.\n",
+ err,
+ )
+ }
}
- }
- }()
- } else {
- // TODO: Should we just log here?
- panic("Extended stats are not available but the user requested their calculation.")
+ }()
+ } else {
+ // TODO: Should we just log here?
+ panic("Extended stats are not available but the user requested their calculation.")
+ }
}
- }
- // Third, stop the network connections opened by the load generators and probers.
- networkActivityCtxCancel()
+ // *Always* stop the probers! But, conditionally stop the througput.
+ probeNetworkActivityCtxCancel()
+ if parallelTestExecutionPolicy != executor.Parallel {
+ if direction.ThroughputActivityCtxCancel == nil {
+ panic(fmt.Sprintf("The cancellation function for the %v direction's throughput is nil!", direction.DirectionLabel))
+ }
+ (*direction.ThroughputActivityCtxCancel)()
+ }
- fmt.Printf(
- "%v: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
- direction.DirectionLabel,
- utilities.ToMbps(lastThroughputRate),
- utilities.ToMBps(lastThroughputRate),
- lastThroughputOpenConnectionCount,
- )
+ direction.FormattedResults += fmt.Sprintf(
+ "%v: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
+ direction.DirectionLabel,
+ utilities.ToMbps(lastThroughputRate),
+ utilities.ToMBps(lastThroughputRate),
+ lastThroughputOpenConnectionCount,
+ )
- if *calculateExtendedStats {
- fmt.Println(extendedStats.Repr())
- }
- directionResult := rpm.CalculateRpm(perDirectionSelfRtts, perDirectionForeignRtts,
- specParameters.TrimmedMeanPct, specParameters.Percentile)
- if *debugCliFlag {
- fmt.Printf("(%s RPM Calculation stats): %v\n", direction.DirectionLabel, directionResult.ToString())
- }
- if *printQualityAttenuation {
- fmt.Println("Quality Attenuation Statistics:")
- fmt.Printf(
- `Number of losses: %d
-Number of samples: %d
-Min: %.6f s
-Max: %.6f s
-Mean: %.6f s
-Variance: %.6f s
-Standard Deviation: %.6f s
-PDV(90): %.6f s
-PDV(99): %.6f s
-P(90): %.6f s
-P(99): %.6f s
-RPM: %.0f
-Gaming QoO: %.0f
+ if *calculateExtendedStats {
+ direction.FormattedResults += fmt.Sprintf("%v\n", extendedStats.Repr())
+ }
+ directionResult := rpm.CalculateRpm(direction.SelfRtts, direction.ForeignRtts,
+ specParameters.TrimmedMeanPct, specParameters.Percentile)
+ if *debugCliFlag {
+ direction.FormattedResults += fmt.Sprintf("(%s RPM Calculation stats): %v\n",
+ direction.DirectionLabel, directionResult.ToString())
+ }
+ if *printQualityAttenuation {
+ direction.FormattedResults += "Quality Attenuation Statistics:\n"
+ direction.FormattedResults += fmt.Sprintf(
+ ` Number of losses: %d
+ Number of samples: %d
+ Min: %.6f s
+ Max: %.6f s
+ Mean: %.6f s
+ Variance: %.6f s
+ Standard Deviation: %.6f s
+ PDV(90): %.6f s
+ PDV(99): %.6f s
+ P(90): %.6f s
+ P(99): %.6f s
+ RPM: %.0f
+ Gaming QoO: %.0f
`, selfRttsQualityAttenuation.GetNumberOfLosses(),
- selfRttsQualityAttenuation.GetNumberOfSamples(),
- selfRttsQualityAttenuation.GetMinimum(),
- selfRttsQualityAttenuation.GetMaximum(),
- selfRttsQualityAttenuation.GetAverage(),
- selfRttsQualityAttenuation.GetVariance(),
- selfRttsQualityAttenuation.GetStandardDeviation(),
- selfRttsQualityAttenuation.GetPDV(90),
- selfRttsQualityAttenuation.GetPDV(99),
- selfRttsQualityAttenuation.GetPercentile(90),
- selfRttsQualityAttenuation.GetPercentile(99),
- selfRttsQualityAttenuation.GetRPM(),
- selfRttsQualityAttenuation.GetGamingQoO())
- }
+ selfRttsQualityAttenuation.GetNumberOfSamples(),
+ selfRttsQualityAttenuation.GetMinimum(),
+ selfRttsQualityAttenuation.GetMaximum(),
+ selfRttsQualityAttenuation.GetAverage(),
+ selfRttsQualityAttenuation.GetVariance(),
+ selfRttsQualityAttenuation.GetStandardDeviation(),
+ selfRttsQualityAttenuation.GetPDV(90),
+ selfRttsQualityAttenuation.GetPDV(99),
+ selfRttsQualityAttenuation.GetPercentile(90),
+ selfRttsQualityAttenuation.GetPercentile(99),
+ selfRttsQualityAttenuation.GetRPM(),
+ selfRttsQualityAttenuation.GetGamingQoO())
+ }
- if !testRanToStability {
- fmt.Printf("Test did not run to stability, these results are estimates:\n")
- }
+ if !testRanToStability {
+ direction.FormattedResults += "Test did not run to stability, these results are estimates:\n"
+ }
+
+ direction.FormattedResults += fmt.Sprintf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel,
+ directionResult.PNRpm, specParameters.Percentile)
+ direction.FormattedResults += fmt.Sprintf(
+ "%s RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel,
+ directionResult.MeanRpm, specParameters.TrimmedMeanPct)
+
+ if len(*prometheusStatsFilename) > 0 {
+ var testStable int
+ if testRanToStability {
+ testStable = 1
+ }
+ var buffer bytes.Buffer
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_test_stable %d\n",
+ strings.ToLower(direction.DirectionLabel), testStable))
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_p90_rpm_value %d\n",
+ strings.ToLower(direction.DirectionLabel), int64(directionResult.PNRpm)))
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_trimmed_rpm_value %d\n",
+ strings.ToLower(direction.DirectionLabel),
+ int64(directionResult.MeanRpm)))
- fmt.Printf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, directionResult.PNRpm, specParameters.Percentile)
- fmt.Printf("%s RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel,
- directionResult.MeanRpm, specParameters.TrimmedMeanPct)
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_bits_per_second %d\n",
+ strings.ToLower(direction.DirectionLabel), int64(lastThroughputRate)))
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_connections %d\n",
+ strings.ToLower(direction.DirectionLabel),
+ int64(lastThroughputOpenConnectionCount)))
- if len(*prometheusStatsFilename) > 0 {
- var testStable int
- if testRanToStability {
- testStable = 1
+ if err := os.WriteFile(*prometheusStatsFilename, buffer.Bytes(), 0o644); err != nil {
+ fmt.Printf("could not write %s: %s", *prometheusStatsFilename, err)
+ os.Exit(1)
+ }
}
- var buffer bytes.Buffer
- buffer.WriteString(fmt.Sprintf("networkquality_%v_test_stable %d\n",
- strings.ToLower(direction.DirectionLabel), testStable))
- buffer.WriteString(fmt.Sprintf("networkquality_%v_p90_rpm_value %d\n",
- strings.ToLower(direction.DirectionLabel), int64(directionResult.PNRpm)))
- buffer.WriteString(fmt.Sprintf("networkquality_%v_trimmed_rpm_value %d\n",
- strings.ToLower(direction.DirectionLabel),
- int64(directionResult.MeanRpm)))
- buffer.WriteString(fmt.Sprintf("networkquality_%v_bits_per_second %d\n",
- strings.ToLower(direction.DirectionLabel), int64(lastThroughputRate)))
- buffer.WriteString(fmt.Sprintf("networkquality_%v_connections %d\n",
- strings.ToLower(direction.DirectionLabel),
- int64(lastThroughputOpenConnectionCount)))
+ direction.ThroughputDataLogger.Export()
+ if *debugCliFlag {
+ fmt.Printf("Closing the %v throughput data logger.\n", direction.DirectionLabel)
+ }
+ direction.ThroughputDataLogger.Close()
- if err := os.WriteFile(*prometheusStatsFilename, buffer.Bytes(), 0o644); err != nil {
- fmt.Printf("could not write %s: %s", *prometheusStatsFilename, err)
- os.Exit(1)
+ direction.GranularThroughputDataLogger.Export()
+ if *debugCliFlag {
+ fmt.Printf("Closing the %v granular throughput data logger.\n", direction.DirectionLabel)
}
- }
+ direction.GranularThroughputDataLogger.Close()
- direction.ThroughputDataLogger.Export()
- if *debugCliFlag {
- fmt.Printf("Closing the %v throughput data logger.\n", direction.DirectionLabel)
+ if *debugCliFlag {
+ fmt.Printf("In debugging mode, we will cool down between tests.\n")
+ time.Sleep(constants.CooldownPeriod)
+ fmt.Printf("Done cooling down.\n")
+ }
}
- direction.ThroughputDataLogger.Close()
+ directionExecutionUnits = append(directionExecutionUnits, directionExecutionUnit)
+ } // End of direction testing.
- direction.GranularThroughputDataLogger.Export()
- if *debugCliFlag {
- fmt.Printf("Closing the %v granular throughput data logger.\n", direction.DirectionLabel)
- }
- direction.GranularThroughputDataLogger.Close()
+ waiter := executor.Execute(parallelTestExecutionPolicy, directionExecutionUnits)
+ waiter.Wait()
- if *debugCliFlag {
- fmt.Printf("In debugging mode, we will cool down between tests.\n")
- time.Sleep(constants.CooldownPeriod)
- fmt.Printf("Done cooling down.\n")
+ // If we were testing in parallel mode, then the throughputs for each direction are still
+ // running. We left them running in case one of the directions reached stability before the
+ // other!
+ if parallelTestExecutionPolicy == executor.Parallel {
+ for _, direction := range directions {
+ if *debugCliFlag {
+ fmt.Printf("Stopping the throughput connections for the %v test.\n", direction.DirectionLabel)
+ }
+ if direction.ThroughputActivityCtxCancel == nil {
+ panic(fmt.Sprintf("The cancellation function for the %v direction's throughput is nil!", direction.DirectionLabel))
+ }
+ if (*direction.ThroughputActivityCtx).Err() != nil {
+ fmt.Fprintf(os.Stderr, "Warning: The throughput for the %v direction was already cancelled but should have been ongoing.\n", direction.DirectionLabel)
+ continue
+ }
+ (*direction.ThroughputActivityCtxCancel)()
+ }
+ } else {
+ for _, direction := range directions {
+ if direction.ThroughputActivityCtxCancel == nil {
+ panic(fmt.Sprintf("The cancellation function for the %v direction's throughput is nil!", direction.DirectionLabel))
+ }
+ if (*direction.ThroughputActivityCtx).Err() == nil {
+ fmt.Fprintf(os.Stderr, "Warning: The throughput for the %v direction should have already been stopped but it was not.\n", direction.DirectionLabel)
+ }
}
}
- result := rpm.CalculateRpm(selfRtts, foreignRtts, specParameters.TrimmedMeanPct, specParameters.Percentile)
+ fmt.Printf("Results:\n")
+ fmt.Printf("========\n")
+ // Print out the formatted results from each of the directions.
+ for _, direction := range directions {
+ fmt.Print(direction.FormattedResults)
+ fmt.Printf("========\n")
+ }
+
+ allSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
+ allForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
+
+ allSelfRtts.Append(&downloadDirection.SelfRtts)
+ allSelfRtts.Append(&uploadDirection.SelfRtts)
+ allForeignRtts.Append(&downloadDirection.ForeignRtts)
+ allForeignRtts.Append(&uploadDirection.ForeignRtts)
+
+ result := rpm.CalculateRpm(allSelfRtts, allForeignRtts, specParameters.TrimmedMeanPct, specParameters.Percentile)
if *debugCliFlag {
fmt.Printf("(Final RPM Calculation stats): %v\n", result.ToString())
diff --git a/rpm/parameters.go b/rpm/parameters.go
index 1d7e065..cc9fe7f 100644
--- a/rpm/parameters.go
+++ b/rpm/parameters.go
@@ -18,6 +18,7 @@ import (
"fmt"
"time"
+ "github.com/network-quality/goresponsiveness/executor"
"github.com/network-quality/goresponsiveness/utilities"
)
@@ -31,9 +32,12 @@ type SpecParameters struct {
ProbeInterval time.Duration
ProbeCapacityPct float64
Percentile uint
+ ExecutionPolicy executor.ExecutionMethod
}
-func SpecParametersFromArguments(timeout int, mad int, id int, tmp uint, sdt float64, mnp int, mps int, ptc float64, p int) (*SpecParameters, error) {
+func SpecParametersFromArguments(timeout int, mad int, id int, tmp uint, sdt float64, mnp int,
+ mps int, ptc float64, p int, executionPolicy executor.ExecutionMethod,
+) (*SpecParameters, error) {
if timeout <= 0 {
return nil, fmt.Errorf("cannot specify a 0 or negative timeout for the test")
}
@@ -68,7 +72,8 @@ func SpecParametersFromArguments(timeout int, mad int, id int, tmp uint, sdt flo
params := SpecParameters{
TestTimeout: testTimeout, MovingAvgDist: mad,
EvalInterval: evalInterval, TrimmedMeanPct: tmp, StdDevTolerance: sdt,
- MaxParallelConns: mnp, ProbeInterval: probeInterval, ProbeCapacityPct: ptc, Percentile: uint(p),
+ MaxParallelConns: mnp, ProbeInterval: probeInterval, ProbeCapacityPct: ptc,
+ Percentile: uint(p), ExecutionPolicy: executionPolicy,
}
return &params, nil
}
@@ -82,8 +87,10 @@ Trimmed-Mean Percentage: %v,
Standard-Deviation Tolerance: %v,
Maximum number of parallel connections: %v,
Probe Interval: %v (derived from given maximum-probes-per-second parameter),
-Maximum Percentage Of Throughput For Probes: %v`,
+Maximum Percentage Of Throughput For Probes: %v
+Execution Policy: %v`,
parameters.TestTimeout, parameters.MovingAvgDist, parameters.EvalInterval, parameters.TrimmedMeanPct,
- parameters.StdDevTolerance, parameters.MaxParallelConns, parameters.ProbeInterval, parameters.ProbeCapacityPct,
+ parameters.StdDevTolerance, parameters.MaxParallelConns, parameters.ProbeInterval,
+ parameters.ProbeCapacityPct, parameters.ExecutionPolicy.ToString(),
)
}
diff --git a/rpm/parameters_test.go b/rpm/parameters_test.go
index 2035a99..0070e99 100644
--- a/rpm/parameters_test.go
+++ b/rpm/parameters_test.go
@@ -17,91 +17,93 @@ package rpm
import (
"strings"
"testing"
+
+ "github.com/network-quality/goresponsiveness/executor"
)
func TestSpecParametersFromArgumentsBadTimeout(t *testing.T) {
- _, err := SpecParametersFromArguments(0, 0, 0, 0, 0, 0, 0, 0, 0)
+ _, err := SpecParametersFromArguments(0, 0, 0, 0, 0, 0, 0, 0, 0, executor.Parallel)
if err == nil || !strings.Contains(err.Error(), "timeout") {
t.Fatalf("0 timeout improperly allowed.")
}
- _, err = SpecParametersFromArguments(-1, 0, 0, 0, 0, 0, 0, 0, 0)
+ _, err = SpecParametersFromArguments(-1, 0, 0, 0, 0, 0, 0, 0, 0, executor.Parallel)
if err == nil || !strings.Contains(err.Error(), "timeout") {
t.Fatalf("negative timeout improperly allowed.")
}
}
func TestSpecParametersFromArgumentsBadMad(t *testing.T) {
- _, err := SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0, 0)
+ _, err := SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0, 0, executor.Parallel)
if err == nil || !strings.Contains(err.Error(), "moving-average") {
t.Fatalf("0 mad improperly allowed.")
}
- _, err = SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0, 0)
+ _, err = SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0, 0, executor.Parallel)
if err == nil || !strings.Contains(err.Error(), "moving-average") {
t.Fatalf("negative mad improperly allowed.")
}
}
func TestSpecParametersFromArgumentsBadId(t *testing.T) {
- _, err := SpecParametersFromArguments(1, 1, 0, 0, 0, 0, 0, 0, 0)
+ _, err := SpecParametersFromArguments(1, 1, 0, 0, 0, 0, 0, 0, 0, executor.Parallel)
if err == nil || !strings.Contains(err.Error(), "reevaluation") {
t.Fatalf("0 id improperly allowed.")
}
- _, err = SpecParametersFromArguments(1, 1, -1, 0, 0, 0, 0, 0, 0)
+ _, err = SpecParametersFromArguments(1, 1, -1, 0, 0, 0, 0, 0, 0, executor.Parallel)
if err == nil || !strings.Contains(err.Error(), "reevaluation") {
t.Fatalf("negative id improperly allowed.")
}
}
func TestSpecParametersFromArgumentsBadSdt(t *testing.T) {
- _, err := SpecParametersFromArguments(1, 1, 1, 1, -1, 0, 0, 0, 0)
+ _, err := SpecParametersFromArguments(1, 1, 1, 1, -1, 0, 0, 0, 0, executor.Parallel)
if err == nil || !strings.Contains(err.Error(), "deviation") {
t.Fatalf("0 sdt improperly allowed.")
}
}
func TestSpecParametersFromArgumentsBadMnp(t *testing.T) {
- _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 0, 0, 0, 0)
+ _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 0, 0, 0, 0, executor.Parallel)
if err == nil || !strings.Contains(err.Error(), "parallel") {
t.Fatalf("0 mnp improperly allowed.")
}
- _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, -1, 0, 0, 0)
+ _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, -1, 0, 0, 0, executor.Parallel)
if err == nil || !strings.Contains(err.Error(), "parallel") {
t.Fatalf("negative mnp improperly allowed.")
}
}
func TestSpecParametersFromArgumentsBadMps(t *testing.T) {
- _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 0, 0, 0)
+ _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 0, 0, 0, executor.Parallel)
if err == nil || !strings.Contains(err.Error(), "probing interval") {
t.Fatalf("0 mps improperly allowed.")
}
- _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, -1, 0, 0)
+ _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, -1, 0, 0, executor.Parallel)
if err == nil || !strings.Contains(err.Error(), "probing interval") {
t.Fatalf("negative mps improperly allowed.")
}
}
func TestSpecParametersFromArgumentsBadPtc(t *testing.T) {
- _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 0, 0)
+ _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 0, 0, executor.Parallel)
if err == nil || !strings.Contains(err.Error(), "capacity") {
t.Fatalf("0 ptc improperly allowed.")
}
- _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, -1, 0)
+ _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, -1, 0, executor.Parallel)
if err == nil || !strings.Contains(err.Error(), "capacity") {
t.Fatalf("negative ptc improperly allowed.")
}
}
func TestSpecParametersFromArgumentsBadP(t *testing.T) {
- _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 1, -1)
+ _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 1, -1, executor.Parallel)
if err == nil || !strings.Contains(err.Error(), "percentile") {
t.Fatalf("-1 percentile improperly allowed.")
}
- _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 1, 0)
+ _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 1, 0, executor.Parallel)
if err == nil || !strings.Contains(err.Error(), "percentile") {
t.Fatalf("0 percentile improperly allowed.")
}
- _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 1, 101)
+ _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 1, 101, executor.Parallel)
if err == nil || !strings.Contains(err.Error(), "percentile") {
t.Fatalf("percentile greater than 100 improperly allowed.")
}
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 902e5bb..905e578 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -148,9 +148,8 @@ func ResponsivenessProber[BucketType utilities.Number](
)
}
- currentBucketId := bucketGenerator.Generate()
-
dataPointsLock.Lock()
+ currentBucketId := bucketGenerator.Generate()
if dataPoints != nil {
dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, BucketType]{
Type: series.SeriesMessageReserve, Bucket: currentBucketId,
diff --git a/series/series.go b/series/series.go
index 6b9af1f..ef133d2 100644
--- a/series/series.go
+++ b/series/series.go
@@ -41,6 +41,8 @@ type WindowSeries[Data any, Bucket constraints.Ordered] interface {
GetValues() []utilities.Optional[Data]
Complete() bool
GetType() WindowSeriesDuration
+
+ Append(appended *WindowSeries[Data, Bucket])
}
type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct {
@@ -48,6 +50,7 @@ type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct {
data []utilities.Pair[Bucket, utilities.Optional[Data]]
latestIndex int
empty bool
+ lock sync.RWMutex
}
/*
@@ -58,6 +61,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error {
if !wsi.empty && b <= wsi.data[wsi.latestIndex].First {
return fmt.Errorf("reserving must be monotonically increasing")
}
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
if wsi.empty {
/* Special case if we are empty: The latestIndex is where we want this value to go! */
@@ -76,6 +81,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error {
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) error {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
iterator := wsi.latestIndex
for {
if wsi.data[iterator].First == b {
@@ -91,6 +98,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) erro
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Count() (some int, none int) {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
some = 0
none = 0
for _, v := range wsi.data {
@@ -104,6 +113,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Count() (some int, none int
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Complete() bool {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
for _, v := range wsi.data {
if utilities.IsNone(v.Second) {
return false
@@ -113,10 +124,18 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Complete() bool {
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) nextIndex(currentIndex int) int {
+ // Internal functions should be called with the lock held!
+ if wsi.lock.TryLock() {
+ panic("windowSeriesWindowOnlyImpl nextIndex called without lock held.")
+ }
return (currentIndex + 1) % wsi.windowSize
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) previousIndex(currentIndex int) int {
+ // Internal functions should be called with the lock held!
+ if wsi.lock.TryLock() {
+ panic("windowSeriesWindowOnlyImpl nextIndex called without lock held.")
+ }
nextIndex := currentIndex - 1
if nextIndex < 0 {
nextIndex += wsi.windowSize
@@ -125,6 +144,10 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) previousIndex(currentIndex
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optional[Data] {
+ // Internal functions should be called with the lock held!
+ if wsi.lock.TryLock() {
+ panic("windowSeriesWindowOnlyImpl nextIndex called without lock held.")
+ }
result := make([]utilities.Optional[Data], wsi.windowSize)
if wsi.empty {
@@ -144,6 +167,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optio
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
return wsi.toArray()
}
@@ -152,12 +177,16 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetType() WindowSeriesDurat
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ForEach(eacher func(b Bucket, d *utilities.Optional[Data])) {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
for _, v := range wsi.data {
eacher(v.First, &v.Second)
}
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
result := fmt.Sprintf("Window series (window (%d) only, latest index: %v): ", wsi.windowSize, wsi.latestIndex)
for _, v := range wsi.data {
valueString := "None"
@@ -169,6 +198,10 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string {
return result
}
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) {
+ panic("")
+}
+
func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered](
windowSize int,
) *windowSeriesWindowOnlyImpl[Data, Bucket] {
@@ -186,10 +219,14 @@ func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered](
type windowSeriesForeverImpl[Data any, Bucket constraints.Ordered] struct {
data []utilities.Pair[Bucket, utilities.Optional[Data]]
empty bool
+ lock sync.RWMutex
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
if !wsi.empty && b <= wsi.data[len(wsi.data)-1].First {
+ fmt.Printf("reserving must be monotonically increasing")
return fmt.Errorf("reserving must be monotonically increasing")
}
@@ -199,6 +236,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error {
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
for i := range wsi.data {
if wsi.data[i].First == b {
wsi.data[i].Second = utilities.Some[Data](d)
@@ -209,6 +248,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error {
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
result := make([]utilities.Optional[Data], len(wsi.data))
for i, v := range utilities.Reverse(wsi.data) {
@@ -219,6 +260,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Option
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) Count() (some int, none int) {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
some = 0
none = 0
for _, v := range wsi.data {
@@ -232,6 +275,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Count() (some int, none int) {
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) Complete() bool {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
for _, v := range wsi.data {
if utilities.IsNone(v.Second) {
return false
@@ -253,12 +298,16 @@ func newWindowSeriesForeverImpl[Data any, Bucket constraints.Ordered]() *windowS
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) ForEach(eacher func(b Bucket, d *utilities.Optional[Data])) {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
for _, v := range wsi.data {
eacher(v.First, &v.Second)
}
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
result := "Window series (forever): "
for _, v := range wsi.data {
valueString := "None"
@@ -270,6 +319,19 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string {
return result
}
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) {
+ result, ok := (*appended).(*windowSeriesForeverImpl[Data, Bucket])
+ if !ok {
+ panic("Cannot merge a forever window series with a non-forever window series.")
+ }
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
+ result.lock.Lock()
+ defer result.lock.Unlock()
+
+ wsi.data = append(wsi.data, result.data...)
+}
+
/*
* End of WindowSeries interface methods.
*/
diff --git a/series/series_test.go b/series/series_test.go
index 3ed752d..54ddfcc 100644
--- a/series/series_test.go
+++ b/series/series_test.go
@@ -15,14 +15,21 @@ package series
import (
"reflect"
+ "sync"
"testing"
+ "time"
+ RPMTesting "github.com/network-quality/goresponsiveness/testing"
"github.com/network-quality/goresponsiveness/utilities"
)
func TestNextIndex(t *testing.T) {
wsi := newWindowSeriesWindowOnlyImpl[int, int](4)
+ // Calling internal functions must be done with the lock held!
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
+
idx := wsi.nextIndex(wsi.latestIndex)
if idx != 1 {
t.Fatalf("nextIndex is wrong (1)")
@@ -54,6 +61,18 @@ func TestNextIndex(t *testing.T) {
wsi.latestIndex = idx
}
+func TestNextIndexUnlocked(t *testing.T) {
+ wsi := newWindowSeriesWindowOnlyImpl[int, int](4)
+
+ panicingTest := func() {
+ wsi.nextIndex(wsi.latestIndex)
+ }
+
+ if !RPMTesting.DidPanic(panicingTest) {
+ t.Fatalf("Expected a call to nextIndex (without the lock held) to panic but it did not")
+ }
+}
+
func TestSimpleWindowComplete(t *testing.T) {
wsi := newWindowSeriesWindowOnlyImpl[int, int](4)
if wsi.Complete() {
@@ -877,3 +896,48 @@ func Test_ForeverStandardDeviationCalculation2(test *testing.T) {
test.Fatalf("Standard deviation(series) max calculation(series) failed: Expected: %v; Actual: %v.", expected, sd)
}
}
+
+func Test_ForeverLocking(test *testing.T) {
+ series := newWindowSeriesForeverImpl[float64, int]()
+ testFail := false
+
+ series.Reserve(1)
+ series.Reserve(2)
+
+ series.Fill(1, 8)
+ series.Fill(2, 9)
+
+ wg := sync.WaitGroup{}
+
+ counter := 0
+
+ wg.Add(2)
+ go func() {
+ series.ForEach(func(b int, d *utilities.Optional[float64]) {
+ // All of these ++s should be done under a single lock of the lock and, therefore,
+ // the ForEach below should not start until both buckets are ForEach'd over!
+ counter++
+ // Make this a long wait so we know that there is no chance for a race and that
+ // we are really testing what we mean to test!
+ time.Sleep(time.Second * 5)
+ })
+ wg.Done()
+ }()
+
+ time.Sleep(1 * time.Second)
+
+ go func() {
+ series.ForEach(func(b int, d *utilities.Optional[float64]) {
+ if counter != 2 {
+ testFail = true
+ }
+ })
+ wg.Done()
+ }()
+
+ wg.Wait()
+
+ if testFail {
+ test.Fatalf("Mutual exclusion checks did not properly lock out parallel ForEach operations.")
+ }
+}
diff --git a/testing/utilities.go b/testing/utilities.go
new file mode 100644
index 0000000..6272a6d
--- /dev/null
+++ b/testing/utilities.go
@@ -0,0 +1,30 @@
+/*
+ * This file is part of Go Responsiveness.
+ *
+ * Go Responsiveness is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software Foundation,
+ * either version 2 of the License, or (at your option) any later version.
+ * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package testing
+
+// Return true/false depending on whether the given function panics.
+func DidPanic(doer func()) (didPanic bool) {
+ didPanic = false
+
+ defer func() {
+ if recovering := recover(); recovering != nil {
+ didPanic = true
+ }
+ }()
+
+ doer()
+
+ return
+}
diff --git a/tools/graphing.py b/tools/graphing.py
index eb1e6d7..63d5166 100644
--- a/tools/graphing.py
+++ b/tools/graphing.py
@@ -68,6 +68,7 @@ def find_earliest(dfs):
"""
earliest = dfs[0]["CreationTime"].iloc[0]
for df in dfs:
+ print(f"A data frame: {df['CreationTime']}")
if df["CreationTime"].iloc[0] < earliest:
earliest = df["CreationTime"].iloc[0]
return earliest
@@ -89,7 +90,7 @@ def time_since_start(dfs, start, column_name="TimeSinceStart"):
def probeClean(df):
# ConnRTT and ConnCongestionWindow refer to Underlying Connection
- df.columns = ["CreationTime", "NumRTT", "Duration", "ConnRTT", "ConnCongestionWindow", "Type", "Empty"]
+ df.columns = ["CreationTime", "NumRTT", "Duration", "ConnRTT", "ConnCongestionWindow", "Type", "Algorithm", "Empty"]
df = df.drop(columns=["Empty"])
df["CreationTime"] = pd.to_datetime(df["CreationTime"], format="%m-%d-%Y-%H-%M-%S.%f")
df["Type"] = df["Type"].apply(str.strip)
@@ -99,7 +100,7 @@ def probeClean(df):
def throughputClean(df):
- df.columns = ["CreationTime", "Throughput", "NumberConnections", "Empty"]
+ df.columns = ["CreationTime", "Throughput", "NumberActiveConnections", "NumberConnections", "Empty"]
df = df.drop(columns=["Empty"])
df["CreationTime"] = pd.to_datetime(df["CreationTime"], format="%m-%d-%Y-%H-%M-%S.%f")
df["ADJ_Throughput"] = df["Throughput"] / 1000000
@@ -306,6 +307,8 @@ def graph_normal(dfs, xcolumn, ax, title):
def stacked_area_throughput(throughput_df, granular, xcolumn, ycolumn, ax, title, label, linecolor="black"):
+
+ print(f"Stacked area throughput!")
ax.set_title(title)
ax.yaxis.tick_right()
@@ -448,6 +451,7 @@ def make_graphs(files, save):
if not containsALL:
continue
+ print(f"About to call main()")
main(start + " - " + str(x), files[start][end])
if save:
pdf = matplotlib.backends.backend_pdf.PdfPages(f"{start} - {x}.pdf")