diff options
| author | Will Hawkins <[email protected]> | 2023-12-13 19:56:03 -0500 | 
|---|---|---|
| committer | Will Hawkins <[email protected]> | 2024-01-04 19:10:37 -0500 | 
| commit | f3990f950277c2f61e0e1811b4b8a81fc0219da4 (patch) | |
| tree | 6969e4ac2c4e94e75fe2e0c5581da5c07785dce8 | |
| parent | 552f01ad73248474553ce471695745db58c862ea (diff) | |
[Feature] Support for testing upload/download in parallel
Use the `--rpm.parallel` to test in parallel mode. The default testing
mode is serial.
Signed-off-by: Will Hawkins <[email protected]>
| -rw-r--r-- | constants/constants.go | 9 | ||||
| -rw-r--r-- | direction/direction.go | 8 | ||||
| -rw-r--r-- | executor/executor.go | 53 | ||||
| -rw-r--r-- | executor/executor_test.go | 56 | ||||
| -rw-r--r-- | networkQuality.go | 844 | ||||
| -rw-r--r-- | rpm/parameters.go | 15 | ||||
| -rw-r--r-- | rpm/parameters_test.go | 34 | ||||
| -rw-r--r-- | rpm/rpm.go | 3 | ||||
| -rw-r--r-- | series/series.go | 62 | ||||
| -rw-r--r-- | series/series_test.go | 64 | ||||
| -rw-r--r-- | testing/utilities.go | 30 | ||||
| -rw-r--r-- | tools/graphing.py | 8 | 
12 files changed, 772 insertions, 414 deletions
diff --git a/constants/constants.go b/constants/constants.go index 8621de8..81c932e 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -14,7 +14,11 @@  package constants -import "time" +import ( +	"time" + +	"github.com/network-quality/goresponsiveness/executor" +)  var (  	// The initial number of load-generating connections when attempting to saturate the network. @@ -38,6 +42,9 @@ var (  	DefaultInsecureSkipVerify bool = true  	DefaultL4SCongestionControlAlgorithm string = "prague" + +	// The default execution policy for running a test (serial) +	DefaultTestExecutionPolicy = executor.Serial  )  type SpecParametersCliOptions struct { diff --git a/direction/direction.go b/direction/direction.go index 865a273..9b41b26 100644 --- a/direction/direction.go +++ b/direction/direction.go @@ -15,11 +15,14 @@  package direction  import ( +	"context" +  	"github.com/network-quality/goresponsiveness/datalogger"  	"github.com/network-quality/goresponsiveness/debug"  	"github.com/network-quality/goresponsiveness/lgc"  	"github.com/network-quality/goresponsiveness/probe"  	"github.com/network-quality/goresponsiveness/rpm" +	"github.com/network-quality/goresponsiveness/series"  )  type Direction struct { @@ -37,4 +40,9 @@ type Direction struct {  	ExtendedStatsEligible             bool  	StableThroughput                  bool  	StableResponsiveness              bool +	SelfRtts                          series.WindowSeries[float64, uint64] +	ForeignRtts                       series.WindowSeries[float64, uint64] +	ThroughputActivityCtx             *context.Context +	ThroughputActivityCtxCancel       *context.CancelFunc +	FormattedResults                  string  } diff --git a/executor/executor.go b/executor/executor.go new file mode 100644 index 0000000..22c1235 --- /dev/null +++ b/executor/executor.go @@ -0,0 +1,53 @@ +package executor + +import ( +	"sync" +) + +type ExecutionMethod int + +const ( +	Parallel ExecutionMethod = iota +	Serial +) + +type ExecutionUnit func() + +func (ep ExecutionMethod) ToString() string { +	switch ep { +	case Parallel: +		return "Parallel" +	case Serial: +		return "Serial" +	} +	return "Unrecognized execution method" +} + +func Execute(executionMethod ExecutionMethod, executionUnits []ExecutionUnit) *sync.WaitGroup { +	waiter := &sync.WaitGroup{} + +	// Make sure that we Add to the wait group all the execution units +	// before starting to run any -- there is a potential race condition +	// otherwise. +	(*waiter).Add(len(executionUnits)) + +	for _, executionUnit := range executionUnits { +		// Stupid capture in Go! Argh. +		executionUnit := executionUnit + +		invoker := func() { +			executionUnit() +			(*waiter).Done() +		} +		switch executionMethod { +		case Parallel: +			go invoker() +		case Serial: +			invoker() +		default: +			panic("Invalid execution method value given.") +		} +	} + +	return waiter +} diff --git a/executor/executor_test.go b/executor/executor_test.go new file mode 100644 index 0000000..9c3af7e --- /dev/null +++ b/executor/executor_test.go @@ -0,0 +1,56 @@ +package executor_test + +import ( +	"testing" +	"time" + +	"github.com/network-quality/goresponsiveness/executor" +) + +var countToFive = func() { +	time.Sleep(5 * time.Second) +} + +var countToThree = func() { +	time.Sleep(3 * time.Second) +} + +var executionUnits = []executor.ExecutionUnit{countToFive, countToThree} + +func TestSerial(t *testing.T) { +	then := time.Now() +	waiter := executor.Execute(executor.Serial, executionUnits) +	waiter.Wait() +	when := time.Now() + +	if when.Sub(then) < 7*time.Second { +		t.Fatalf("Execution did not happen serially -- the wait was too short: %v", when.Sub(then).Seconds()) +	} +} + +func TestParallel(t *testing.T) { +	then := time.Now() +	waiter := executor.Execute(executor.Parallel, executionUnits) +	waiter.Wait() +	when := time.Now() + +	if when.Sub(then) > 6*time.Second { +		t.Fatalf("Execution did not happen in parallel -- the wait was too long: %v", when.Sub(then).Seconds()) +	} +} + +func TestExecutionMethodParallelToString(t *testing.T) { +	executionMethod := executor.Parallel + +	if executionMethod.ToString() != "Parallel" { +		t.Fatalf("Incorrect result from ExecutionMethod.ToString; expected Parallel but got %v", executionMethod.ToString()) +	} +} + +func TestExecutionMethodSerialToString(t *testing.T) { +	executionMethod := executor.Serial + +	if executionMethod.ToString() != "Serial" { +		t.Fatalf("Incorrect result from ExecutionMethod.ToString; expected Serial but got %v", executionMethod.ToString()) +	} +} diff --git a/networkQuality.go b/networkQuality.go index 340a316..2e13893 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -31,6 +31,7 @@ import (  	"github.com/network-quality/goresponsiveness/datalogger"  	"github.com/network-quality/goresponsiveness/debug"  	"github.com/network-quality/goresponsiveness/direction" +	"github.com/network-quality/goresponsiveness/executor"  	"github.com/network-quality/goresponsiveness/extendedstats"  	"github.com/network-quality/goresponsiveness/lgc"  	"github.com/network-quality/goresponsiveness/probe" @@ -167,9 +168,20 @@ var (  	)  	withL4S          = flag.Bool("with-l4s", false, "Use L4S (with default TCP prague congestion control algorithm.)")  	withL4SAlgorithm = flag.String("with-l4s-algorithm", "", "Use L4S (with specified congestion control algorithm.)") + +	parallelTestExecutionPolicy = constants.DefaultTestExecutionPolicy  )  func main() { +	// Add one final command-line argument +	flag.BoolFunc("rpm.parallel", "Parallel test execution policy.", func(value string) error { +		if value != "true" { +			return fmt.Errorf("-parallel can only be used to enable parallel test execution policy") +		} +		parallelTestExecutionPolicy = executor.Parallel +		return nil +	}) +  	flag.Parse()  	if *showVersion { @@ -184,7 +196,7 @@ func main() {  	}  	specParameters, err := rpm.SpecParametersFromArguments(*rpmtimeout, *rpmmad, *rpmid, -		*rpmtmp, *rpmsdt, *rpmmnp, *rpmmps, *rpmptc, *rpmp) +		*rpmtmp, *rpmsdt, *rpmmnp, *rpmmps, *rpmptc, *rpmp, parallelTestExecutionPolicy)  	if err != nil {  		fmt.Fprintf(  			os.Stderr, @@ -639,479 +651,533 @@ func main() {  	} -	// All tests will accumulate data to these series because it will all matter for RPM calculation! -	selfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) -	foreignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) -  	var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil  	if *printQualityAttenuation {  		selfRttsQualityAttenuation = qualityattenuation.NewSimpleQualityAttenuation()  	} +	directionExecutionUnits := make([]executor.ExecutionUnit, 0) +  	for _, direction := range directions { +		// Make a copy here to make sure that we do not get go-wierdness in our closure (see https://github.com/golang/go/discussions/56010). +		direction := direction -		timeoutDuration := specParameters.TestTimeout -		timeoutAbsoluteTime := time.Now().Add(timeoutDuration) +		directionExecutionUnit := func() { +			timeoutDuration := specParameters.TestTimeout +			timeoutAbsoluteTime := time.Now().Add(timeoutDuration) -		timeoutChannel := timeoutat.TimeoutAt( -			operatingCtx, -			timeoutAbsoluteTime, -			debugLevel, -		) -		if debug.IsDebug(debugLevel) { -			fmt.Printf("%s Test will end no later than %v\n", direction.DirectionLabel, timeoutAbsoluteTime) -		} +			timeoutChannel := timeoutat.TimeoutAt( +				operatingCtx, +				timeoutAbsoluteTime, +				debugLevel, +			) +			if debug.IsDebug(debugLevel) { +				fmt.Printf("%s Test will end no later than %v\n", +					direction.DirectionLabel, timeoutAbsoluteTime) +			} -		throughputCtx, throughputCtxCancel := context.WithCancel(operatingCtx) -		proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx) +			throughputOperatorCtx, throughputOperatorCtxCancel := context.WithCancel(operatingCtx) +			proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx) -		// This context is used to control the network activity (i.e., it controls all -		// the connections that are open to do load generation and probing). Cancelling this context will close -		// all the network connections that are responsible for generating the load. -		networkActivityCtx, networkActivityCtxCancel := context.WithCancel(operatingCtx) +			// This context is used to control the network activity (i.e., it controls all +			// the connections that are open to do load generation and probing). Cancelling this context will close +			// all the network connections that are responsible for generating the load. +			probeNetworkActivityCtx, probeNetworkActivityCtxCancel := context.WithCancel(operatingCtx) +			throughputCtx, throughputCtxCancel := context.WithCancel(operatingCtx) +			direction.ThroughputActivityCtx, direction.ThroughputActivityCtxCancel = &throughputCtx, &throughputCtxCancel -		throughputGeneratorCtx, throughputGeneratorCtxCancel := context.WithCancel(throughputCtx) +			reachWorkingConditionsCtx, reachWorkingConditionsCtxCancel := +				context.WithCancel(throughputOperatorCtx) -		lgStabilizationCommunicationChannel := rpm.LoadGenerator( -			throughputCtx, -			networkActivityCtx, -			throughputGeneratorCtx, -			specParameters.EvalInterval, -			direction.CreateLgdc, -			&direction.Lgcc, -			&globalNumericBucketGenerator, -			specParameters.MaxParallelConns, -			*calculateExtendedStats, -			direction.DirectionDebugging, -		) +			lgStabilizationCommunicationChannel := rpm.LoadGenerator( +				throughputOperatorCtx, +				*direction.ThroughputActivityCtx, +				reachWorkingConditionsCtx, +				specParameters.EvalInterval, +				direction.CreateLgdc, +				&direction.Lgcc, +				&globalNumericBucketGenerator, +				specParameters.MaxParallelConns, +				*calculateExtendedStats, +				direction.DirectionDebugging, +			) -		throughputStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, -			fmt.Sprintf("%v Throughput Stabilizer", direction.DirectionLabel)) -		downloadThroughputStabilizerDebugLevel := debug.Error -		if *debugCliFlag { -			downloadThroughputStabilizerDebugLevel = debug.Debug -		} -		throughputStabilizer := stabilizer.NewStabilizer[float64, uint64]( -			specParameters.MovingAvgDist, specParameters.StdDevTolerance, 0, "bytes", -			downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig) +			throughputStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, +				fmt.Sprintf("%v Throughput Stabilizer", direction.DirectionLabel)) +			downloadThroughputStabilizerDebugLevel := debug.Error +			if *debugCliFlag { +				downloadThroughputStabilizerDebugLevel = debug.Debug +			} +			throughputStabilizer := stabilizer.NewStabilizer[float64, uint64]( +				specParameters.MovingAvgDist, specParameters.StdDevTolerance, 0, "bytes", +				downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig) -		responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, -			fmt.Sprintf("%v Responsiveness Stabilizer", direction.DirectionLabel)) -		responsivenessStabilizerDebugLevel := debug.Error -		if *debugCliFlag { -			responsivenessStabilizerDebugLevel = debug.Debug -		} -		responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint64]( -			specParameters.MovingAvgDist, specParameters.StdDevTolerance, -			specParameters.TrimmedMeanPct, "milliseconds", -			responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig) +			responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, +				fmt.Sprintf("%v Responsiveness Stabilizer", direction.DirectionLabel)) +			responsivenessStabilizerDebugLevel := debug.Error +			if *debugCliFlag { +				responsivenessStabilizerDebugLevel = debug.Debug +			} +			responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint64]( +				specParameters.MovingAvgDist, specParameters.StdDevTolerance, +				specParameters.TrimmedMeanPct, "milliseconds", +				responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig) -		// For later debugging output, record the last throughputs on load-generating connectings -		// and the number of open connections. -		lastThroughputRate := float64(0) -		lastThroughputOpenConnectionCount := int(0) +			// For later debugging output, record the last throughputs on load-generating connectings +			// and the number of open connections. +			lastThroughputRate := float64(0) +			lastThroughputOpenConnectionCount := int(0) -		stabilityCheckTime := time.Now().Add(specParameters.EvalInterval) -		stabilityCheckTimeChannel := timeoutat.TimeoutAt( -			operatingCtx, -			stabilityCheckTime, -			debugLevel, -		) +			stabilityCheckTime := time.Now().Add(specParameters.EvalInterval) +			stabilityCheckTimeChannel := timeoutat.TimeoutAt( +				operatingCtx, +				stabilityCheckTime, +				debugLevel, +			) -	lg_timeout: -		for !direction.StableThroughput { -			select { -			case throughputMeasurement := <-lgStabilizationCommunicationChannel: -				{ -					switch throughputMeasurement.Type { -					case series.SeriesMessageReserve: -						{ -							throughputStabilizer.Reserve(throughputMeasurement.Bucket) -							if *debugCliFlag { -								fmt.Printf( -									"%s: Reserving a throughput bucket with id %v.\n", -									direction.DirectionLabel, throughputMeasurement.Bucket) +		lg_timeout: +			for !direction.StableThroughput { +				select { +				case throughputMeasurement := <-lgStabilizationCommunicationChannel: +					{ +						switch throughputMeasurement.Type { +						case series.SeriesMessageReserve: +							{ +								throughputStabilizer.Reserve(throughputMeasurement.Bucket) +								if *debugCliFlag { +									fmt.Printf( +										"%s: Reserving a throughput bucket with id %v.\n", +										direction.DirectionLabel, throughputMeasurement.Bucket) +								}  							} -						} -					case series.SeriesMessageMeasure: -						{ -							bucket := throughputMeasurement.Bucket -							measurement := utilities.GetSome(throughputMeasurement.Measure) +						case series.SeriesMessageMeasure: +							{ +								bucket := throughputMeasurement.Bucket +								measurement := utilities.GetSome(throughputMeasurement.Measure) -							throughputStabilizer.AddMeasurement(bucket, measurement.Throughput) +								throughputStabilizer.AddMeasurement(bucket, measurement.Throughput) -							direction.ThroughputDataLogger.LogRecord(measurement) -							for _, v := range measurement.GranularThroughputDataPoints { -								v.Direction = "Download" -								direction.GranularThroughputDataLogger.LogRecord(v) -							} +								direction.ThroughputDataLogger.LogRecord(measurement) +								for _, v := range measurement.GranularThroughputDataPoints { +									v.Direction = "Download" +									direction.GranularThroughputDataLogger.LogRecord(v) +								} -							lastThroughputRate = measurement.Throughput -							lastThroughputOpenConnectionCount = measurement.Connections +								lastThroughputRate = measurement.Throughput +								lastThroughputOpenConnectionCount = measurement.Connections +							}  						}  					} -				} -			case <-stabilityCheckTimeChannel: -				{ -					if *debugCliFlag { -						fmt.Printf( -							"%v throughput stability interval is complete.\n", direction.DirectionLabel) -					} -					stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) -					stabilityCheckTimeChannel = timeoutat.TimeoutAt( -						operatingCtx, -						stabilityCheckTime, -						debugLevel, -					) +				case <-stabilityCheckTimeChannel: +					{ +						if *debugCliFlag { +							fmt.Printf( +								"%v throughput stability interval is complete.\n", direction.DirectionLabel) +						} +						stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) +						stabilityCheckTimeChannel = timeoutat.TimeoutAt( +							operatingCtx, +							stabilityCheckTime, +							debugLevel, +						) -					direction.StableThroughput = throughputStabilizer.IsStable() -					if *debugCliFlag { -						fmt.Printf( -							"%v is instantaneously %s.\n", direction.DirectionLabel, -							utilities.Conditional(direction.StableThroughput, "stable", "unstable")) -					} +						direction.StableThroughput = throughputStabilizer.IsStable() +						if *debugCliFlag { +							fmt.Printf( +								"%v is instantaneously %s.\n", direction.DirectionLabel, +								utilities.Conditional(direction.StableThroughput, "stable", "unstable")) +						} -					if direction.StableThroughput { -						throughputGeneratorCtxCancel() +						throughputStabilizer.Interval() +					} +				case <-timeoutChannel: +					{ +						break lg_timeout  					} -					throughputStabilizer.Interval() -				} -			case <-timeoutChannel: -				{ -					break lg_timeout  				}  			} -		} -		if direction.StableThroughput { -			if *debugCliFlag { -				fmt.Printf("Throughput is stable; beginning responsiveness testing.\n") +			if direction.StableThroughput { +				if *debugCliFlag { +					fmt.Printf("Throughput is stable; beginning responsiveness testing.\n") +				} +			} else { +				fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Making the test 15 seconds longer to calculate speculative RPM results.\n") +				speculativeTimeoutDuration := time.Second * 15 +				speculativeAbsoluteTimeoutTime := time.Now().Add(speculativeTimeoutDuration) +				timeoutChannel = timeoutat.TimeoutAt( +					operatingCtx, +					speculativeAbsoluteTimeoutTime, +					debugLevel, +				)  			} -		} else { -			fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Adding 15 seconds to calculate speculative RPM results.\n") -			speculativeTimeoutDuration := time.Second * 15 -			speculativeAbsoluteTimeoutTime := time.Now().Add(speculativeTimeoutDuration) -			timeoutChannel = timeoutat.TimeoutAt( -				operatingCtx, -				speculativeAbsoluteTimeoutTime, -				debugLevel, -			) -		} -		perDirectionSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) -		perDirectionForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) +			// No matter what, we will stop adding additional load-generating connections! +			reachWorkingConditionsCtxCancel() -		responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber( -			proberOperatorCtx, -			networkActivityCtx, -			generateForeignProbeConfiguration, -			generateSelfProbeConfiguration, -			&direction.Lgcc, -			&globalNumericBucketGenerator, -			direction.CreateLgdc().Direction(), // TODO: This could be better! -			specParameters.ProbeInterval, -			sslKeyFileConcurrentWriter, -			*calculateExtendedStats, -			direction.ProbeDebugging, -		) +			direction.SelfRtts = series.NewWindowSeries[float64, uint64](series.Forever, 0) +			direction.ForeignRtts = series.NewWindowSeries[float64, uint64](series.Forever, 0) -	responsiveness_timeout: -		for !direction.StableResponsiveness { -			select { -			case probeMeasurement := <-responsivenessStabilizationCommunicationChannel: -				{ -					switch probeMeasurement.Type { -					case series.SeriesMessageReserve: -						{ -							bucket := probeMeasurement.Bucket -							if *debugCliFlag { -								fmt.Printf( -									"%s: Reserving a responsiveness bucket with id %v.\n", direction.DirectionLabel, bucket) -							} -							responsivenessStabilizer.Reserve(bucket) -							selfRtts.Reserve(bucket) -							foreignRtts.Reserve(bucket) -							perDirectionForeignRtts.Reserve(bucket) -							perDirectionSelfRtts.Reserve(bucket) -						} -					case series.SeriesMessageMeasure: -						{ -							bucket := probeMeasurement.Bucket -							measurement := utilities.GetSome(probeMeasurement.Measure) -							foreignDataPoint := measurement.Foreign -							selfDataPoint := measurement.Self +			responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber( +				proberOperatorCtx, +				probeNetworkActivityCtx, +				generateForeignProbeConfiguration, +				generateSelfProbeConfiguration, +				&direction.Lgcc, +				&globalNumericBucketGenerator, +				direction.CreateLgdc().Direction(), // TODO: This could be better! +				specParameters.ProbeInterval, +				sslKeyFileConcurrentWriter, +				*calculateExtendedStats, +				direction.ProbeDebugging, +			) -							if *debugCliFlag { -								fmt.Printf( -									"%s: Filling a responsiveness bucket with id %v with value %v.\n", -									direction.DirectionLabel, bucket, measurement) +		responsiveness_timeout: +			for !direction.StableResponsiveness { +				select { +				case probeMeasurement := <-responsivenessStabilizationCommunicationChannel: +					{ +						switch probeMeasurement.Type { +						case series.SeriesMessageReserve: +							{ +								bucket := probeMeasurement.Bucket +								if *debugCliFlag { +									fmt.Printf( +										"%s: Reserving a responsiveness bucket with id %v.\n", direction.DirectionLabel, bucket) +								} +								responsivenessStabilizer.Reserve(bucket) +								direction.ForeignRtts.Reserve(bucket) +								direction.SelfRtts.Reserve(bucket)  							} -							responsivenessStabilizer.AddMeasurement(bucket, -								(foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds()) +						case series.SeriesMessageMeasure: +							{ +								bucket := probeMeasurement.Bucket +								measurement := utilities.GetSome(probeMeasurement.Measure) +								foreignDataPoint := measurement.Foreign +								selfDataPoint := measurement.Self -							if err := selfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil { -								fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (selfRtts)\n", bucket) -							} -							if err := perDirectionSelfRtts.Fill(bucket, -								selfDataPoint.Duration.Seconds()); err != nil { -								fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionSelfRtts)\n", bucket) -							} +								if *debugCliFlag { +									fmt.Printf( +										"%s: Filling a responsiveness bucket with id %v with value %v.\n", +										direction.DirectionLabel, bucket, measurement) +								} +								responsivenessStabilizer.AddMeasurement(bucket, +									(foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds()) -							if err := foreignRtts.Fill(bucket, foreignDataPoint.Duration.Seconds()); err != nil { -								fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (foreignRtts)\n", bucket) -							} +								if err := direction.SelfRtts.Fill(bucket, +									selfDataPoint.Duration.Seconds()); err != nil { +									fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionSelfRtts)\n", bucket) +								} -							if err := perDirectionForeignRtts.Fill(bucket, -								foreignDataPoint.Duration.Seconds()); err != nil { -								fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionForeignRtts)\n", bucket) -							} +								if err := direction.ForeignRtts.Fill(bucket, +									foreignDataPoint.Duration.Seconds()); err != nil { +									fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionForeignRtts)\n", bucket) +								} -							if selfRttsQualityAttenuation != nil { -								selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds()) -							} +								if selfRttsQualityAttenuation != nil { +									selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds()) +								} -							direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint) -							direction.SelfProbeDataLogger.LogRecord(*selfDataPoint) +								direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint) +								direction.SelfProbeDataLogger.LogRecord(*selfDataPoint) +							}  						}  					} -				} -			case throughputMeasurement := <-lgStabilizationCommunicationChannel: -				{ -					switch throughputMeasurement.Type { -					case series.SeriesMessageReserve: -						{ -							// We are no longer tracking stability, so reservation messages are useless! -							if *debugCliFlag { -								fmt.Printf( -									"%s: Discarding a throughput bucket with id %v when ascertaining responsiveness.\n", -									direction.DirectionLabel, throughputMeasurement.Bucket) +				case throughputMeasurement := <-lgStabilizationCommunicationChannel: +					{ +						switch throughputMeasurement.Type { +						case series.SeriesMessageReserve: +							{ +								// We are no longer tracking stability, so reservation messages are useless! +								if *debugCliFlag { +									fmt.Printf( +										"%s: Discarding a throughput bucket with id %v when ascertaining responsiveness.\n", +										direction.DirectionLabel, throughputMeasurement.Bucket) +								}  							} -						} -					case series.SeriesMessageMeasure: -						{ -							measurement := utilities.GetSome(throughputMeasurement.Measure) +						case series.SeriesMessageMeasure: +							{ +								measurement := utilities.GetSome(throughputMeasurement.Measure) -							if *debugCliFlag { -								fmt.Printf("Adding a throughput measurement (while ascertaining responsiveness).\n") -							} -							// There may be more than one round trip accumulated together. If that is the case, -							direction.ThroughputDataLogger.LogRecord(measurement) -							for _, v := range measurement.GranularThroughputDataPoints { -								v.Direction = direction.DirectionLabel -								direction.GranularThroughputDataLogger.LogRecord(v) -							} +								if *debugCliFlag { +									fmt.Printf("Adding a throughput measurement (while ascertaining responsiveness).\n") +								} +								// There may be more than one round trip accumulated together. If that is the case, +								direction.ThroughputDataLogger.LogRecord(measurement) +								for _, v := range measurement.GranularThroughputDataPoints { +									v.Direction = direction.DirectionLabel +									direction.GranularThroughputDataLogger.LogRecord(v) +								} -							lastThroughputRate = measurement.Throughput -							lastThroughputOpenConnectionCount = measurement.Connections +								lastThroughputRate = measurement.Throughput +								lastThroughputOpenConnectionCount = measurement.Connections +							}  						}  					} -				} -			case <-timeoutChannel: -				{ -					break responsiveness_timeout -				} -			case <-stabilityCheckTimeChannel: -				{ -					if *debugCliFlag { -						fmt.Printf( -							"%v responsiveness stability interval is complete.\n", direction.DirectionLabel) +				case <-timeoutChannel: +					{ +						break responsiveness_timeout  					} +				case <-stabilityCheckTimeChannel: +					{ +						if *debugCliFlag { +							fmt.Printf( +								"%v responsiveness stability interval is complete.\n", direction.DirectionLabel) +						} -					stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) -					stabilityCheckTimeChannel = timeoutat.TimeoutAt( -						operatingCtx, -						stabilityCheckTime, -						debugLevel, -					) +						stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) +						stabilityCheckTimeChannel = timeoutat.TimeoutAt( +							operatingCtx, +							stabilityCheckTime, +							debugLevel, +						) -					// Check stabilization immediately -- this could change if we wait. Not sure if the immediacy -					// is *actually* important, but it can't hurt? -					direction.StableResponsiveness = responsivenessStabilizer.IsStable() +						// Check stabilization immediately -- this could change if we wait. Not sure if the immediacy +						// is *actually* important, but it can't hurt? +						direction.StableResponsiveness = responsivenessStabilizer.IsStable() -					if *debugCliFlag { -						fmt.Printf( -							"Responsiveness is instantaneously %s.\n", -							utilities.Conditional(direction.StableResponsiveness, "stable", "unstable")) -					} +						if *debugCliFlag { +							fmt.Printf( +								"Responsiveness is instantaneously %s.\n", +								utilities.Conditional(direction.StableResponsiveness, "stable", "unstable")) +						} -					responsivenessStabilizer.Interval() +						responsivenessStabilizer.Interval() +					}  				}  			} -		} - -		// Did the test run to stability? -		testRanToStability := direction.StableThroughput && direction.StableResponsiveness - -		if *debugCliFlag { -			fmt.Printf("Stopping all the load generating data generators (stability: %s).\n", -				utilities.Conditional(testRanToStability, "success", "failure")) -		} -		/* At this point there are -		1. Load generators running -		-- uploadLoadGeneratorOperatorCtx -		-- downloadLoadGeneratorOperatorCtx -		2. Network connections opened by those load generators: -		-- lgNetworkActivityCtx -		3. Probes -		-- proberCtx -		*/ +			// Did the test run to stability? +			testRanToStability := direction.StableThroughput && direction.StableResponsiveness -		// First, stop the load generator and the probe operators (but *not* the network activity) -		proberOperatorCtxCancel() -		throughputCtxCancel() +			if *debugCliFlag { +				fmt.Printf("Stopping all the load generating data generators (stability: %s).\n", +					utilities.Conditional(testRanToStability, "success", "failure")) +			} -		// Second, calculate the extended stats (if the user requested and they are available for the direction) -		extendedStats := extendedstats.AggregateExtendedStats{} -		if *calculateExtendedStats && direction.ExtendedStatsEligible { -			if extendedstats.ExtendedStatsAvailable() { -				func() { -					// Put inside an IIFE so that we can use a defer! -					direction.Lgcc.Lock.Lock() -					defer direction.Lgcc.Lock.Unlock() +			/* At this point there are +			1. Load generators running +			-- uploadLoadGeneratorOperatorCtx +			-- downloadLoadGeneratorOperatorCtx +			2. Network connections opened by those load generators: +			-- lgNetworkActivityCtx +			3. Probes +			-- proberCtx +			*/ -					lgcCount, err := direction.Lgcc.Len() -					if err != nil { -						fmt.Fprintf( -							os.Stderr, -							"Warning: Could not calculate the number of %v load-generating connections; aborting extended stats preparation.\n", direction.DirectionLabel, -						) -						return -					} +			// First, stop the load generator and the probe operators (but *not* the network activity) +			proberOperatorCtxCancel() +			throughputOperatorCtxCancel() -					for i := 0; i < lgcCount; i++ { -						// Assume that extended statistics are available -- the check was done explicitly at -						// program startup if the calculateExtendedStats flag was set by the user on the command line. -						currentLgc, _ := direction.Lgcc.Get(i) +			// Second, calculate the extended stats (if the user requested and they are available for the direction) +			extendedStats := extendedstats.AggregateExtendedStats{} +			if *calculateExtendedStats && direction.ExtendedStatsEligible { +				if extendedstats.ExtendedStatsAvailable() { +					func() { +						// Put inside an IIFE so that we can use a defer! +						direction.Lgcc.Lock.Lock() +						defer direction.Lgcc.Lock.Unlock() -						if currentLgc == nil || (*currentLgc).Stats() == nil { +						lgcCount, err := direction.Lgcc.Len() +						if err != nil {  							fmt.Fprintf(  								os.Stderr, -								"Warning: Could not add extended stats for the connection: The LGC was nil or there were no stats available.\n", +								"Warning: Could not calculate the number of %v load-generating connections; aborting extended stats preparation.\n", direction.DirectionLabel,  							) -							continue +							return  						} -						if err := extendedStats.IncorporateConnectionStats( -							(*currentLgc).Stats().ConnInfo.Conn); err != nil { -							fmt.Fprintf( -								os.Stderr, -								"Warning: Could not add extended stats for the connection: %v.\n", -								err, -							) + +						for i := 0; i < lgcCount; i++ { +							// Assume that extended statistics are available -- the check was done explicitly at +							// program startup if the calculateExtendedStats flag was set by the user on the command line. +							currentLgc, _ := direction.Lgcc.Get(i) + +							if currentLgc == nil || (*currentLgc).Stats() == nil { +								fmt.Fprintf( +									os.Stderr, +									"Warning: Could not add extended stats for the connection: The LGC was nil or there were no stats available.\n", +								) +								continue +							} +							if err := extendedStats.IncorporateConnectionStats( +								(*currentLgc).Stats().ConnInfo.Conn); err != nil { +								fmt.Fprintf( +									os.Stderr, +									"Warning: Could not add extended stats for the connection: %v.\n", +									err, +								) +							}  						} -					} -				}() -			} else { -				// TODO: Should we just log here? -				panic("Extended stats are not available but the user requested their calculation.") +					}() +				} else { +					// TODO: Should we just log here? +					panic("Extended stats are not available but the user requested their calculation.") +				}  			} -		} -		// Third, stop the network connections opened by the load generators and probers. -		networkActivityCtxCancel() +			// *Always* stop the probers! But, conditionally stop the througput. +			probeNetworkActivityCtxCancel() +			if parallelTestExecutionPolicy != executor.Parallel { +				if direction.ThroughputActivityCtxCancel == nil { +					panic(fmt.Sprintf("The cancellation function for the %v direction's throughput is nil!", direction.DirectionLabel)) +				} +				(*direction.ThroughputActivityCtxCancel)() +			} -		fmt.Printf( -			"%v: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", -			direction.DirectionLabel, -			utilities.ToMbps(lastThroughputRate), -			utilities.ToMBps(lastThroughputRate), -			lastThroughputOpenConnectionCount, -		) +			direction.FormattedResults += fmt.Sprintf( +				"%v: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", +				direction.DirectionLabel, +				utilities.ToMbps(lastThroughputRate), +				utilities.ToMBps(lastThroughputRate), +				lastThroughputOpenConnectionCount, +			) -		if *calculateExtendedStats { -			fmt.Println(extendedStats.Repr()) -		} -		directionResult := rpm.CalculateRpm(perDirectionSelfRtts, perDirectionForeignRtts, -			specParameters.TrimmedMeanPct, specParameters.Percentile) -		if *debugCliFlag { -			fmt.Printf("(%s RPM Calculation stats): %v\n", direction.DirectionLabel, directionResult.ToString()) -		} -		if *printQualityAttenuation { -			fmt.Println("Quality Attenuation Statistics:") -			fmt.Printf( -				`Number of losses: %d -Number of samples: %d -Min: %.6f s -Max: %.6f s -Mean: %.6f s -Variance: %.6f s -Standard Deviation: %.6f s -PDV(90): %.6f s -PDV(99): %.6f s -P(90): %.6f s -P(99): %.6f s -RPM: %.0f -Gaming QoO: %.0f +			if *calculateExtendedStats { +				direction.FormattedResults += fmt.Sprintf("%v\n", extendedStats.Repr()) +			} +			directionResult := rpm.CalculateRpm(direction.SelfRtts, direction.ForeignRtts, +				specParameters.TrimmedMeanPct, specParameters.Percentile) +			if *debugCliFlag { +				direction.FormattedResults += fmt.Sprintf("(%s RPM Calculation stats): %v\n", +					direction.DirectionLabel, directionResult.ToString()) +			} +			if *printQualityAttenuation { +				direction.FormattedResults += "Quality Attenuation Statistics:\n" +				direction.FormattedResults += fmt.Sprintf( +					`	Number of losses: %d +	Number of samples: %d +	Min: %.6f s +	Max: %.6f s +	Mean: %.6f s +	Variance: %.6f s +	Standard Deviation: %.6f s +	PDV(90): %.6f s +	PDV(99): %.6f s +	P(90): %.6f s +	P(99): %.6f s +	RPM: %.0f +	Gaming QoO: %.0f  `, selfRttsQualityAttenuation.GetNumberOfLosses(), -				selfRttsQualityAttenuation.GetNumberOfSamples(), -				selfRttsQualityAttenuation.GetMinimum(), -				selfRttsQualityAttenuation.GetMaximum(), -				selfRttsQualityAttenuation.GetAverage(), -				selfRttsQualityAttenuation.GetVariance(), -				selfRttsQualityAttenuation.GetStandardDeviation(), -				selfRttsQualityAttenuation.GetPDV(90), -				selfRttsQualityAttenuation.GetPDV(99), -				selfRttsQualityAttenuation.GetPercentile(90), -				selfRttsQualityAttenuation.GetPercentile(99), -				selfRttsQualityAttenuation.GetRPM(), -				selfRttsQualityAttenuation.GetGamingQoO()) -		} +					selfRttsQualityAttenuation.GetNumberOfSamples(), +					selfRttsQualityAttenuation.GetMinimum(), +					selfRttsQualityAttenuation.GetMaximum(), +					selfRttsQualityAttenuation.GetAverage(), +					selfRttsQualityAttenuation.GetVariance(), +					selfRttsQualityAttenuation.GetStandardDeviation(), +					selfRttsQualityAttenuation.GetPDV(90), +					selfRttsQualityAttenuation.GetPDV(99), +					selfRttsQualityAttenuation.GetPercentile(90), +					selfRttsQualityAttenuation.GetPercentile(99), +					selfRttsQualityAttenuation.GetRPM(), +					selfRttsQualityAttenuation.GetGamingQoO()) +			} -		if !testRanToStability { -			fmt.Printf("Test did not run to stability, these results are estimates:\n") -		} +			if !testRanToStability { +				direction.FormattedResults += "Test did not run to stability, these results are estimates:\n" +			} + +			direction.FormattedResults += fmt.Sprintf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, +				directionResult.PNRpm, specParameters.Percentile) +			direction.FormattedResults += fmt.Sprintf( +				"%s RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel, +				directionResult.MeanRpm, specParameters.TrimmedMeanPct) + +			if len(*prometheusStatsFilename) > 0 { +				var testStable int +				if testRanToStability { +					testStable = 1 +				} +				var buffer bytes.Buffer +				buffer.WriteString(fmt.Sprintf("networkquality_%v_test_stable %d\n", +					strings.ToLower(direction.DirectionLabel), testStable)) +				buffer.WriteString(fmt.Sprintf("networkquality_%v_p90_rpm_value %d\n", +					strings.ToLower(direction.DirectionLabel), int64(directionResult.PNRpm))) +				buffer.WriteString(fmt.Sprintf("networkquality_%v_trimmed_rpm_value %d\n", +					strings.ToLower(direction.DirectionLabel), +					int64(directionResult.MeanRpm))) -		fmt.Printf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, directionResult.PNRpm, specParameters.Percentile) -		fmt.Printf("%s RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel, -			directionResult.MeanRpm, specParameters.TrimmedMeanPct) +				buffer.WriteString(fmt.Sprintf("networkquality_%v_bits_per_second %d\n", +					strings.ToLower(direction.DirectionLabel), int64(lastThroughputRate))) +				buffer.WriteString(fmt.Sprintf("networkquality_%v_connections %d\n", +					strings.ToLower(direction.DirectionLabel), +					int64(lastThroughputOpenConnectionCount))) -		if len(*prometheusStatsFilename) > 0 { -			var testStable int -			if testRanToStability { -				testStable = 1 +				if err := os.WriteFile(*prometheusStatsFilename, buffer.Bytes(), 0o644); err != nil { +					fmt.Printf("could not write %s: %s", *prometheusStatsFilename, err) +					os.Exit(1) +				}  			} -			var buffer bytes.Buffer -			buffer.WriteString(fmt.Sprintf("networkquality_%v_test_stable %d\n", -				strings.ToLower(direction.DirectionLabel), testStable)) -			buffer.WriteString(fmt.Sprintf("networkquality_%v_p90_rpm_value %d\n", -				strings.ToLower(direction.DirectionLabel), int64(directionResult.PNRpm))) -			buffer.WriteString(fmt.Sprintf("networkquality_%v_trimmed_rpm_value %d\n", -				strings.ToLower(direction.DirectionLabel), -				int64(directionResult.MeanRpm))) -			buffer.WriteString(fmt.Sprintf("networkquality_%v_bits_per_second %d\n", -				strings.ToLower(direction.DirectionLabel), int64(lastThroughputRate))) -			buffer.WriteString(fmt.Sprintf("networkquality_%v_connections %d\n", -				strings.ToLower(direction.DirectionLabel), -				int64(lastThroughputOpenConnectionCount))) +			direction.ThroughputDataLogger.Export() +			if *debugCliFlag { +				fmt.Printf("Closing the %v throughput data logger.\n", direction.DirectionLabel) +			} +			direction.ThroughputDataLogger.Close() -			if err := os.WriteFile(*prometheusStatsFilename, buffer.Bytes(), 0o644); err != nil { -				fmt.Printf("could not write %s: %s", *prometheusStatsFilename, err) -				os.Exit(1) +			direction.GranularThroughputDataLogger.Export() +			if *debugCliFlag { +				fmt.Printf("Closing the %v granular throughput data logger.\n", direction.DirectionLabel)  			} -		} +			direction.GranularThroughputDataLogger.Close() -		direction.ThroughputDataLogger.Export() -		if *debugCliFlag { -			fmt.Printf("Closing the %v throughput data logger.\n", direction.DirectionLabel) +			if *debugCliFlag { +				fmt.Printf("In debugging mode, we will cool down between tests.\n") +				time.Sleep(constants.CooldownPeriod) +				fmt.Printf("Done cooling down.\n") +			}  		} -		direction.ThroughputDataLogger.Close() +		directionExecutionUnits = append(directionExecutionUnits, directionExecutionUnit) +	} // End of direction testing. -		direction.GranularThroughputDataLogger.Export() -		if *debugCliFlag { -			fmt.Printf("Closing the %v granular throughput data logger.\n", direction.DirectionLabel) -		} -		direction.GranularThroughputDataLogger.Close() +	waiter := executor.Execute(parallelTestExecutionPolicy, directionExecutionUnits) +	waiter.Wait() -		if *debugCliFlag { -			fmt.Printf("In debugging mode, we will cool down between tests.\n") -			time.Sleep(constants.CooldownPeriod) -			fmt.Printf("Done cooling down.\n") +	// If we were testing in parallel mode, then the throughputs for each direction are still +	// running. We left them running in case one of the directions reached stability before the +	// other! +	if parallelTestExecutionPolicy == executor.Parallel { +		for _, direction := range directions { +			if *debugCliFlag { +				fmt.Printf("Stopping the throughput connections for the %v test.\n", direction.DirectionLabel) +			} +			if direction.ThroughputActivityCtxCancel == nil { +				panic(fmt.Sprintf("The cancellation function for the %v direction's throughput is nil!", direction.DirectionLabel)) +			} +			if (*direction.ThroughputActivityCtx).Err() != nil { +				fmt.Fprintf(os.Stderr, "Warning: The throughput for the %v direction was already cancelled but should have been ongoing.\n", direction.DirectionLabel) +				continue +			} +			(*direction.ThroughputActivityCtxCancel)() +		} +	} else { +		for _, direction := range directions { +			if direction.ThroughputActivityCtxCancel == nil { +				panic(fmt.Sprintf("The cancellation function for the %v direction's throughput is nil!", direction.DirectionLabel)) +			} +			if (*direction.ThroughputActivityCtx).Err() == nil { +				fmt.Fprintf(os.Stderr, "Warning: The throughput for the %v direction should have already been stopped but it was not.\n", direction.DirectionLabel) +			}  		}  	} -	result := rpm.CalculateRpm(selfRtts, foreignRtts, specParameters.TrimmedMeanPct, specParameters.Percentile) +	fmt.Printf("Results:\n") +	fmt.Printf("========\n") +	// Print out the formatted results from each of the directions. +	for _, direction := range directions { +		fmt.Print(direction.FormattedResults) +		fmt.Printf("========\n") +	} + +	allSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) +	allForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) + +	allSelfRtts.Append(&downloadDirection.SelfRtts) +	allSelfRtts.Append(&uploadDirection.SelfRtts) +	allForeignRtts.Append(&downloadDirection.ForeignRtts) +	allForeignRtts.Append(&uploadDirection.ForeignRtts) + +	result := rpm.CalculateRpm(allSelfRtts, allForeignRtts, specParameters.TrimmedMeanPct, specParameters.Percentile)  	if *debugCliFlag {  		fmt.Printf("(Final RPM Calculation stats): %v\n", result.ToString()) diff --git a/rpm/parameters.go b/rpm/parameters.go index 1d7e065..cc9fe7f 100644 --- a/rpm/parameters.go +++ b/rpm/parameters.go @@ -18,6 +18,7 @@ import (  	"fmt"  	"time" +	"github.com/network-quality/goresponsiveness/executor"  	"github.com/network-quality/goresponsiveness/utilities"  ) @@ -31,9 +32,12 @@ type SpecParameters struct {  	ProbeInterval    time.Duration  	ProbeCapacityPct float64  	Percentile       uint +	ExecutionPolicy  executor.ExecutionMethod  } -func SpecParametersFromArguments(timeout int, mad int, id int, tmp uint, sdt float64, mnp int, mps int, ptc float64, p int) (*SpecParameters, error) { +func SpecParametersFromArguments(timeout int, mad int, id int, tmp uint, sdt float64, mnp int, +	mps int, ptc float64, p int, executionPolicy executor.ExecutionMethod, +) (*SpecParameters, error) {  	if timeout <= 0 {  		return nil, fmt.Errorf("cannot specify a 0 or negative timeout for the test")  	} @@ -68,7 +72,8 @@ func SpecParametersFromArguments(timeout int, mad int, id int, tmp uint, sdt flo  	params := SpecParameters{  		TestTimeout: testTimeout, MovingAvgDist: mad,  		EvalInterval: evalInterval, TrimmedMeanPct: tmp, StdDevTolerance: sdt, -		MaxParallelConns: mnp, ProbeInterval: probeInterval, ProbeCapacityPct: ptc, Percentile: uint(p), +		MaxParallelConns: mnp, ProbeInterval: probeInterval, ProbeCapacityPct: ptc, +		Percentile: uint(p), ExecutionPolicy: executionPolicy,  	}  	return ¶ms, nil  } @@ -82,8 +87,10 @@ Trimmed-Mean Percentage:                     %v,  Standard-Deviation Tolerance:                %v,  Maximum number of parallel connections:      %v,  Probe Interval:                              %v (derived from given maximum-probes-per-second parameter), -Maximum Percentage Of Throughput For Probes: %v`, +Maximum Percentage Of Throughput For Probes: %v +Execution Policy:                            %v`,  		parameters.TestTimeout, parameters.MovingAvgDist, parameters.EvalInterval, parameters.TrimmedMeanPct, -		parameters.StdDevTolerance, parameters.MaxParallelConns, parameters.ProbeInterval, parameters.ProbeCapacityPct, +		parameters.StdDevTolerance, parameters.MaxParallelConns, parameters.ProbeInterval, +		parameters.ProbeCapacityPct, parameters.ExecutionPolicy.ToString(),  	)  } diff --git a/rpm/parameters_test.go b/rpm/parameters_test.go index 2035a99..0070e99 100644 --- a/rpm/parameters_test.go +++ b/rpm/parameters_test.go @@ -17,91 +17,93 @@ package rpm  import (  	"strings"  	"testing" + +	"github.com/network-quality/goresponsiveness/executor"  )  func TestSpecParametersFromArgumentsBadTimeout(t *testing.T) { -	_, err := SpecParametersFromArguments(0, 0, 0, 0, 0, 0, 0, 0, 0) +	_, err := SpecParametersFromArguments(0, 0, 0, 0, 0, 0, 0, 0, 0, executor.Parallel)  	if err == nil || !strings.Contains(err.Error(), "timeout") {  		t.Fatalf("0 timeout improperly allowed.")  	} -	_, err = SpecParametersFromArguments(-1, 0, 0, 0, 0, 0, 0, 0, 0) +	_, err = SpecParametersFromArguments(-1, 0, 0, 0, 0, 0, 0, 0, 0, executor.Parallel)  	if err == nil || !strings.Contains(err.Error(), "timeout") {  		t.Fatalf("negative timeout improperly allowed.")  	}  }  func TestSpecParametersFromArgumentsBadMad(t *testing.T) { -	_, err := SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0, 0) +	_, err := SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0, 0, executor.Parallel)  	if err == nil || !strings.Contains(err.Error(), "moving-average") {  		t.Fatalf("0 mad improperly allowed.")  	} -	_, err = SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0, 0) +	_, err = SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0, 0, executor.Parallel)  	if err == nil || !strings.Contains(err.Error(), "moving-average") {  		t.Fatalf("negative mad improperly allowed.")  	}  }  func TestSpecParametersFromArgumentsBadId(t *testing.T) { -	_, err := SpecParametersFromArguments(1, 1, 0, 0, 0, 0, 0, 0, 0) +	_, err := SpecParametersFromArguments(1, 1, 0, 0, 0, 0, 0, 0, 0, executor.Parallel)  	if err == nil || !strings.Contains(err.Error(), "reevaluation") {  		t.Fatalf("0 id improperly allowed.")  	} -	_, err = SpecParametersFromArguments(1, 1, -1, 0, 0, 0, 0, 0, 0) +	_, err = SpecParametersFromArguments(1, 1, -1, 0, 0, 0, 0, 0, 0, executor.Parallel)  	if err == nil || !strings.Contains(err.Error(), "reevaluation") {  		t.Fatalf("negative id improperly allowed.")  	}  }  func TestSpecParametersFromArgumentsBadSdt(t *testing.T) { -	_, err := SpecParametersFromArguments(1, 1, 1, 1, -1, 0, 0, 0, 0) +	_, err := SpecParametersFromArguments(1, 1, 1, 1, -1, 0, 0, 0, 0, executor.Parallel)  	if err == nil || !strings.Contains(err.Error(), "deviation") {  		t.Fatalf("0 sdt improperly allowed.")  	}  }  func TestSpecParametersFromArgumentsBadMnp(t *testing.T) { -	_, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 0, 0, 0, 0) +	_, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 0, 0, 0, 0, executor.Parallel)  	if err == nil || !strings.Contains(err.Error(), "parallel") {  		t.Fatalf("0 mnp improperly allowed.")  	} -	_, err = SpecParametersFromArguments(1, 1, 1, 1, 1, -1, 0, 0, 0) +	_, err = SpecParametersFromArguments(1, 1, 1, 1, 1, -1, 0, 0, 0, executor.Parallel)  	if err == nil || !strings.Contains(err.Error(), "parallel") {  		t.Fatalf("negative mnp improperly allowed.")  	}  }  func TestSpecParametersFromArgumentsBadMps(t *testing.T) { -	_, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 0, 0, 0) +	_, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 0, 0, 0, executor.Parallel)  	if err == nil || !strings.Contains(err.Error(), "probing interval") {  		t.Fatalf("0 mps improperly allowed.")  	} -	_, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, -1, 0, 0) +	_, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, -1, 0, 0, executor.Parallel)  	if err == nil || !strings.Contains(err.Error(), "probing interval") {  		t.Fatalf("negative mps improperly allowed.")  	}  }  func TestSpecParametersFromArgumentsBadPtc(t *testing.T) { -	_, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 0, 0) +	_, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 0, 0, executor.Parallel)  	if err == nil || !strings.Contains(err.Error(), "capacity") {  		t.Fatalf("0 ptc improperly allowed.")  	} -	_, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, -1, 0) +	_, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, -1, 0, executor.Parallel)  	if err == nil || !strings.Contains(err.Error(), "capacity") {  		t.Fatalf("negative ptc improperly allowed.")  	}  }  func TestSpecParametersFromArgumentsBadP(t *testing.T) { -	_, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 1, -1) +	_, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 1, -1, executor.Parallel)  	if err == nil || !strings.Contains(err.Error(), "percentile") {  		t.Fatalf("-1 percentile improperly allowed.")  	} -	_, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 1, 0) +	_, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 1, 0, executor.Parallel)  	if err == nil || !strings.Contains(err.Error(), "percentile") {  		t.Fatalf("0 percentile improperly allowed.")  	} -	_, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 1, 101) +	_, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 1, 101, executor.Parallel)  	if err == nil || !strings.Contains(err.Error(), "percentile") {  		t.Fatalf("percentile greater than 100 improperly allowed.")  	} @@ -148,9 +148,8 @@ func ResponsivenessProber[BucketType utilities.Number](  					)  				} -				currentBucketId := bucketGenerator.Generate() -  				dataPointsLock.Lock() +				currentBucketId := bucketGenerator.Generate()  				if dataPoints != nil {  					dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, BucketType]{  						Type: series.SeriesMessageReserve, Bucket: currentBucketId, diff --git a/series/series.go b/series/series.go index 6b9af1f..ef133d2 100644 --- a/series/series.go +++ b/series/series.go @@ -41,6 +41,8 @@ type WindowSeries[Data any, Bucket constraints.Ordered] interface {  	GetValues() []utilities.Optional[Data]  	Complete() bool  	GetType() WindowSeriesDuration + +	Append(appended *WindowSeries[Data, Bucket])  }  type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct { @@ -48,6 +50,7 @@ type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct {  	data        []utilities.Pair[Bucket, utilities.Optional[Data]]  	latestIndex int  	empty       bool +	lock        sync.RWMutex  }  /* @@ -58,6 +61,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error {  	if !wsi.empty && b <= wsi.data[wsi.latestIndex].First {  		return fmt.Errorf("reserving must be monotonically increasing")  	} +	wsi.lock.Lock() +	defer wsi.lock.Unlock()  	if wsi.empty {  		/* Special case if we are empty: The latestIndex is where we want this value to go! */ @@ -76,6 +81,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error {  }  func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) error { +	wsi.lock.Lock() +	defer wsi.lock.Unlock()  	iterator := wsi.latestIndex  	for {  		if wsi.data[iterator].First == b { @@ -91,6 +98,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) erro  }  func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Count() (some int, none int) { +	wsi.lock.Lock() +	defer wsi.lock.Unlock()  	some = 0  	none = 0  	for _, v := range wsi.data { @@ -104,6 +113,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Count() (some int, none int  }  func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Complete() bool { +	wsi.lock.Lock() +	defer wsi.lock.Unlock()  	for _, v := range wsi.data {  		if utilities.IsNone(v.Second) {  			return false @@ -113,10 +124,18 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Complete() bool {  }  func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) nextIndex(currentIndex int) int { +	// Internal functions should be called with the lock held! +	if wsi.lock.TryLock() { +		panic("windowSeriesWindowOnlyImpl nextIndex called without lock held.") +	}  	return (currentIndex + 1) % wsi.windowSize  }  func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) previousIndex(currentIndex int) int { +	// Internal functions should be called with the lock held! +	if wsi.lock.TryLock() { +		panic("windowSeriesWindowOnlyImpl nextIndex called without lock held.") +	}  	nextIndex := currentIndex - 1  	if nextIndex < 0 {  		nextIndex += wsi.windowSize @@ -125,6 +144,10 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) previousIndex(currentIndex  }  func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optional[Data] { +	// Internal functions should be called with the lock held! +	if wsi.lock.TryLock() { +		panic("windowSeriesWindowOnlyImpl nextIndex called without lock held.") +	}  	result := make([]utilities.Optional[Data], wsi.windowSize)  	if wsi.empty { @@ -144,6 +167,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optio  }  func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] { +	wsi.lock.Lock() +	defer wsi.lock.Unlock()  	return wsi.toArray()  } @@ -152,12 +177,16 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetType() WindowSeriesDurat  }  func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ForEach(eacher func(b Bucket, d *utilities.Optional[Data])) { +	wsi.lock.Lock() +	defer wsi.lock.Unlock()  	for _, v := range wsi.data {  		eacher(v.First, &v.Second)  	}  }  func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string { +	wsi.lock.Lock() +	defer wsi.lock.Unlock()  	result := fmt.Sprintf("Window series (window (%d) only, latest index: %v): ", wsi.windowSize, wsi.latestIndex)  	for _, v := range wsi.data {  		valueString := "None" @@ -169,6 +198,10 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string {  	return result  } +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) { +	panic("") +} +  func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered](  	windowSize int,  ) *windowSeriesWindowOnlyImpl[Data, Bucket] { @@ -186,10 +219,14 @@ func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered](  type windowSeriesForeverImpl[Data any, Bucket constraints.Ordered] struct {  	data  []utilities.Pair[Bucket, utilities.Optional[Data]]  	empty bool +	lock  sync.RWMutex  }  func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error { +	wsi.lock.Lock() +	defer wsi.lock.Unlock()  	if !wsi.empty && b <= wsi.data[len(wsi.data)-1].First { +		fmt.Printf("reserving must be monotonically increasing")  		return fmt.Errorf("reserving must be monotonically increasing")  	} @@ -199,6 +236,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error {  }  func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error { +	wsi.lock.Lock() +	defer wsi.lock.Unlock()  	for i := range wsi.data {  		if wsi.data[i].First == b {  			wsi.data[i].Second = utilities.Some[Data](d) @@ -209,6 +248,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error {  }  func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] { +	wsi.lock.Lock() +	defer wsi.lock.Unlock()  	result := make([]utilities.Optional[Data], len(wsi.data))  	for i, v := range utilities.Reverse(wsi.data) { @@ -219,6 +260,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Option  }  func (wsi *windowSeriesForeverImpl[Data, Bucket]) Count() (some int, none int) { +	wsi.lock.Lock() +	defer wsi.lock.Unlock()  	some = 0  	none = 0  	for _, v := range wsi.data { @@ -232,6 +275,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Count() (some int, none int) {  }  func (wsi *windowSeriesForeverImpl[Data, Bucket]) Complete() bool { +	wsi.lock.Lock() +	defer wsi.lock.Unlock()  	for _, v := range wsi.data {  		if utilities.IsNone(v.Second) {  			return false @@ -253,12 +298,16 @@ func newWindowSeriesForeverImpl[Data any, Bucket constraints.Ordered]() *windowS  }  func (wsi *windowSeriesForeverImpl[Data, Bucket]) ForEach(eacher func(b Bucket, d *utilities.Optional[Data])) { +	wsi.lock.Lock() +	defer wsi.lock.Unlock()  	for _, v := range wsi.data {  		eacher(v.First, &v.Second)  	}  }  func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string { +	wsi.lock.Lock() +	defer wsi.lock.Unlock()  	result := "Window series (forever): "  	for _, v := range wsi.data {  		valueString := "None" @@ -270,6 +319,19 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string {  	return result  } +func (wsi *windowSeriesForeverImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) { +	result, ok := (*appended).(*windowSeriesForeverImpl[Data, Bucket]) +	if !ok { +		panic("Cannot merge a forever window series with a non-forever window series.") +	} +	wsi.lock.Lock() +	defer wsi.lock.Unlock() +	result.lock.Lock() +	defer result.lock.Unlock() + +	wsi.data = append(wsi.data, result.data...) +} +  /*   * End of WindowSeries interface methods.   */ diff --git a/series/series_test.go b/series/series_test.go index 3ed752d..54ddfcc 100644 --- a/series/series_test.go +++ b/series/series_test.go @@ -15,14 +15,21 @@ package series  import (  	"reflect" +	"sync"  	"testing" +	"time" +	RPMTesting "github.com/network-quality/goresponsiveness/testing"  	"github.com/network-quality/goresponsiveness/utilities"  )  func TestNextIndex(t *testing.T) {  	wsi := newWindowSeriesWindowOnlyImpl[int, int](4) +	// Calling internal functions must be done with the lock held! +	wsi.lock.Lock() +	defer wsi.lock.Unlock() +  	idx := wsi.nextIndex(wsi.latestIndex)  	if idx != 1 {  		t.Fatalf("nextIndex is wrong (1)") @@ -54,6 +61,18 @@ func TestNextIndex(t *testing.T) {  	wsi.latestIndex = idx  } +func TestNextIndexUnlocked(t *testing.T) { +	wsi := newWindowSeriesWindowOnlyImpl[int, int](4) + +	panicingTest := func() { +		wsi.nextIndex(wsi.latestIndex) +	} + +	if !RPMTesting.DidPanic(panicingTest) { +		t.Fatalf("Expected a call to nextIndex (without the lock held) to panic but it did not") +	} +} +  func TestSimpleWindowComplete(t *testing.T) {  	wsi := newWindowSeriesWindowOnlyImpl[int, int](4)  	if wsi.Complete() { @@ -877,3 +896,48 @@ func Test_ForeverStandardDeviationCalculation2(test *testing.T) {  		test.Fatalf("Standard deviation(series) max calculation(series) failed: Expected: %v; Actual: %v.", expected, sd)  	}  } + +func Test_ForeverLocking(test *testing.T) { +	series := newWindowSeriesForeverImpl[float64, int]() +	testFail := false + +	series.Reserve(1) +	series.Reserve(2) + +	series.Fill(1, 8) +	series.Fill(2, 9) + +	wg := sync.WaitGroup{} + +	counter := 0 + +	wg.Add(2) +	go func() { +		series.ForEach(func(b int, d *utilities.Optional[float64]) { +			// All of these ++s should be done under a single lock of the lock and, therefore, +			// the ForEach below should not start until both buckets are ForEach'd over! +			counter++ +			// Make this a long wait so we know that there is no chance for a race and that +			// we are really testing what we mean to test! +			time.Sleep(time.Second * 5) +		}) +		wg.Done() +	}() + +	time.Sleep(1 * time.Second) + +	go func() { +		series.ForEach(func(b int, d *utilities.Optional[float64]) { +			if counter != 2 { +				testFail = true +			} +		}) +		wg.Done() +	}() + +	wg.Wait() + +	if testFail { +		test.Fatalf("Mutual exclusion checks did not properly lock out parallel ForEach operations.") +	} +} diff --git a/testing/utilities.go b/testing/utilities.go new file mode 100644 index 0000000..6272a6d --- /dev/null +++ b/testing/utilities.go @@ -0,0 +1,30 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package testing + +// Return true/false depending on whether the given function panics. +func DidPanic(doer func()) (didPanic bool) { +	didPanic = false + +	defer func() { +		if recovering := recover(); recovering != nil { +			didPanic = true +		} +	}() + +	doer() + +	return +} diff --git a/tools/graphing.py b/tools/graphing.py index eb1e6d7..63d5166 100644 --- a/tools/graphing.py +++ b/tools/graphing.py @@ -68,6 +68,7 @@ def find_earliest(dfs):      """      earliest = dfs[0]["CreationTime"].iloc[0]      for df in dfs: +        print(f"A data frame: {df['CreationTime']}")          if df["CreationTime"].iloc[0] < earliest:              earliest = df["CreationTime"].iloc[0]      return earliest @@ -89,7 +90,7 @@ def time_since_start(dfs, start, column_name="TimeSinceStart"):  def probeClean(df):      # ConnRTT and ConnCongestionWindow refer to Underlying Connection -    df.columns = ["CreationTime", "NumRTT", "Duration", "ConnRTT", "ConnCongestionWindow", "Type", "Empty"] +    df.columns = ["CreationTime", "NumRTT", "Duration", "ConnRTT", "ConnCongestionWindow", "Type", "Algorithm", "Empty"]      df = df.drop(columns=["Empty"])      df["CreationTime"] = pd.to_datetime(df["CreationTime"], format="%m-%d-%Y-%H-%M-%S.%f")      df["Type"] = df["Type"].apply(str.strip) @@ -99,7 +100,7 @@ def probeClean(df):  def throughputClean(df): -    df.columns = ["CreationTime", "Throughput", "NumberConnections", "Empty"] +    df.columns = ["CreationTime", "Throughput", "NumberActiveConnections", "NumberConnections", "Empty"]      df = df.drop(columns=["Empty"])      df["CreationTime"] = pd.to_datetime(df["CreationTime"], format="%m-%d-%Y-%H-%M-%S.%f")      df["ADJ_Throughput"] = df["Throughput"] / 1000000 @@ -306,6 +307,8 @@ def graph_normal(dfs, xcolumn, ax, title):  def stacked_area_throughput(throughput_df, granular, xcolumn, ycolumn, ax, title, label, linecolor="black"): + +    print(f"Stacked area throughput!")      ax.set_title(title)      ax.yaxis.tick_right() @@ -448,6 +451,7 @@ def make_graphs(files, save):              if not containsALL:                  continue +            print(f"About to call main()")              main(start + " - " + str(x), files[start][end])              if save:                  pdf = matplotlib.backends.backend_pdf.PdfPages(f"{start} - {x}.pdf")  | 
