diff options
| author | Will Hawkins <[email protected]> | 2023-07-12 11:11:10 -0400 | 
|---|---|---|
| committer | Will Hawkins <[email protected]> | 2023-07-12 11:11:10 -0400 | 
| commit | 399eda676f7889accf72231c6696b89de7ea3fae (patch) | |
| tree | ddbe2afa4ef2a1e28b9a8b36577832b12a74b138 | |
| parent | 06fd8c3b39979316ec8917d471416114a5b7c581 (diff) | |
[Bugfix] Duplicate bucket IDs caused incorrect results
The upload direction reused bucket IDs used during the test in the
download direction which caused an incorrect grand-total RPM
calculation. To solve the problem, this patch adds a global bucket
ID generator and passes that to everyone that needs it.
TODO: Make the bucket generator type more generic.
Signed-off-by: Will Hawkins <[email protected]>
| -rw-r--r-- | networkQuality.go | 14 | ||||
| -rw-r--r-- | rpm/rpm.go | 36 | ||||
| -rw-r--r-- | series/series.go | 18 | 
3 files changed, 48 insertions, 20 deletions
diff --git a/networkQuality.go b/networkQuality.go index 0fff8a6..9671eb3 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -471,14 +471,16 @@ func main() {  	}  	// All tests will accumulate data to these series because it will all matter for RPM calculation! -	selfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) -	foreignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) +	selfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) +	foreignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)  	var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil  	if *printQualityAttenuation {  		selfRttsQualityAttenuation = qualityattenuation.NewSimpleQualityAttenuation()  	} +	globalNumericBucketGenerator := series.NewNumericBucketGenerator[uint64](0) +  	for _, direction := range directions {  		timeoutDuration := specParameters.TestTimeout @@ -510,6 +512,7 @@ func main() {  			specParameters.EvalInterval,  			direction.CreateLgdc,  			&direction.Lgcc, +			&globalNumericBucketGenerator,  			specParameters.MaxParallelConns,  			*calculateExtendedStats,  			direction.DirectionDebugging, @@ -531,7 +534,7 @@ func main() {  		if *debugCliFlag {  			responsivenessStabilizerDebugLevel = debug.Debug  		} -		responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint]( +		responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint64](  			specParameters.MovingAvgDist, specParameters.StdDevTolerance,  			specParameters.TrimmedMeanPct, "milliseconds",  			responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig) @@ -628,8 +631,8 @@ func main() {  			)  		} -		perDirectionSelfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) -		perDirectionForeignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) +		perDirectionSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) +		perDirectionForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)  		responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber(  			proberOperatorCtx, @@ -637,6 +640,7 @@ func main() {  			generateForeignProbeConfiguration,  			generateSelfProbeConfiguration,  			&direction.Lgcc, +			&globalNumericBucketGenerator,  			direction.CreateLgdc().Direction(), // TODO: This could be better!  			specParameters.ProbeInterval,  			sslKeyFileConcurrentWriter, @@ -90,18 +90,19 @@ type ResponsivenessProbeResult struct {  	Self    *probe.ProbeDataPoint  } -func ResponsivenessProber( +func ResponsivenessProber[BucketType utilities.Number](  	proberCtx context.Context,  	networkActivityCtx context.Context,  	foreignProbeConfigurationGenerator func() probe.ProbeConfiguration,  	selfProbeConfigurationGenerator func() probe.ProbeConfiguration,  	selfProbeConnectionCollection *lgc.LoadGeneratingConnectionCollection, +	bucketGenerator *series.NumericBucketGenerator[BucketType],  	probeDirection lgc.LgcDirection,  	probeInterval time.Duration,  	keyLogger io.Writer,  	captureExtendedStats bool,  	debugging *debug.DebugWithPrefix, -) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, uint]) { +) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, BucketType]) {  	if debug.IsDebug(debugging.Level) {  		fmt.Printf(  			"(%s) Starting to collect responsiveness information at an interval of %v!\n", @@ -112,7 +113,7 @@ func ResponsivenessProber(  	// Make a channel to send back all the generated data points  	// when we are probing. -	dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, uint]) +	dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, BucketType])  	go func() {  		wg := sync.WaitGroup{} @@ -147,8 +148,10 @@ func ResponsivenessProber(  					)  				} -				dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{ -					Type: series.SeriesMessageReserve, Bucket: probeCount, +				currentBucketId := bucketGenerator.Generate() + +				dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, BucketType]{ +					Type: series.SeriesMessageReserve, Bucket: currentBucketId,  					Measure: utilities.None[ResponsivenessProbeResult](),  				} @@ -277,8 +280,8 @@ func ResponsivenessProber(  						Foreign: foreignProbeDataPoint, Self: selfProbeDataPoint,  					} -					dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{ -						Type: series.SeriesMessageMeasure, Bucket: probeCount, +					dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, BucketType]{ +						Type: series.SeriesMessageMeasure, Bucket: currentBucketId,  						Measure: utilities.Some[ResponsivenessProbeResult](measurement),  					}  				} @@ -306,18 +309,19 @@ func ResponsivenessProber(  	return  } -func LoadGenerator( +func LoadGenerator[BucketType utilities.Number](  	throughputCtx context.Context, // Stop our activity when we no longer need any throughput  	networkActivityCtx context.Context, // Create all network connections in this context.  	generateLoadCtx context.Context, // Stop adding additional throughput when we are stable.  	rampupInterval time.Duration,  	lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection.  	loadGeneratingConnectionsCollection *lgc.LoadGeneratingConnectionCollection, +	bucketGenerator *series.NumericBucketGenerator[BucketType],  	mnp int,  	captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections?  	debugging *debug.DebugWithPrefix, // How can we forget debugging? -) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, uint64]) { // Send back all the instantaneous throughputs that we generate. -	seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, uint64]) +) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, BucketType]) { // Send back all the instantaneous throughputs that we generate. +	seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, BucketType])  	go func() {  		flowsCreated := uint64(0) @@ -332,7 +336,7 @@ func LoadGenerator(  		nextSampleStartTime := time.Now().Add(rampupInterval) -		for currentInterval := uint64(0); true; currentInterval++ { +		for currentIntervalId := uint64(0); true; currentIntervalId++ {  			// If the throughputCtx is canceled, then that means our work here is done ...  			if throughputCtx.Err() != nil { @@ -471,11 +475,13 @@ func LoadGenerator(  				granularThroughputDatapoints,  			} -			seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{ -				Type: series.SeriesMessageReserve, Bucket: currentInterval, +			currentBucketId := bucketGenerator.Generate() + +			seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, BucketType]{ +				Type: series.SeriesMessageReserve, Bucket: currentBucketId,  			} -			seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{ -				Type: series.SeriesMessageMeasure, Bucket: currentInterval, +			seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, BucketType]{ +				Type: series.SeriesMessageMeasure, Bucket: currentBucketId,  				Measure: utilities.Some[ThroughputDataPoint](throughputDataPoint),  			} diff --git a/series/series.go b/series/series.go index 0084007..43d9809 100644 --- a/series/series.go +++ b/series/series.go @@ -15,6 +15,7 @@ package series  import (  	"fmt" +	"sync"  	"github.com/network-quality/goresponsiveness/utilities"  	"golang.org/x/exp/constraints" @@ -278,3 +279,20 @@ func NewWindowSeries[Data any, Bucket constraints.Ordered](tipe WindowSeriesDura  	}  	panic("")  } + +type NumericBucketGenerator[T utilities.Number] struct { +	mt           sync.Mutex +	currentValue T +} + +func (bg *NumericBucketGenerator[T]) Generate() T { +	bg.mt.Lock() +	defer bg.mt.Unlock() + +	bg.currentValue++ +	return bg.currentValue +} + +func NewNumericBucketGenerator[T utilities.Number](initialValue T) NumericBucketGenerator[T] { +	return NumericBucketGenerator[T]{currentValue: initialValue} +}  | 
