From 399eda676f7889accf72231c6696b89de7ea3fae Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Wed, 12 Jul 2023 11:11:10 -0400 Subject: [Bugfix] Duplicate bucket IDs caused incorrect results The upload direction reused bucket IDs used during the test in the download direction which caused an incorrect grand-total RPM calculation. To solve the problem, this patch adds a global bucket ID generator and passes that to everyone that needs it. TODO: Make the bucket generator type more generic. Signed-off-by: Will Hawkins --- networkQuality.go | 14 +++++++++----- rpm/rpm.go | 36 +++++++++++++++++++++--------------- series/series.go | 18 ++++++++++++++++++ 3 files changed, 48 insertions(+), 20 deletions(-) diff --git a/networkQuality.go b/networkQuality.go index 0fff8a6..9671eb3 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -471,14 +471,16 @@ func main() { } // All tests will accumulate data to these series because it will all matter for RPM calculation! - selfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) - foreignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) + selfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) + foreignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil if *printQualityAttenuation { selfRttsQualityAttenuation = qualityattenuation.NewSimpleQualityAttenuation() } + globalNumericBucketGenerator := series.NewNumericBucketGenerator[uint64](0) + for _, direction := range directions { timeoutDuration := specParameters.TestTimeout @@ -510,6 +512,7 @@ func main() { specParameters.EvalInterval, direction.CreateLgdc, &direction.Lgcc, + &globalNumericBucketGenerator, specParameters.MaxParallelConns, *calculateExtendedStats, direction.DirectionDebugging, @@ -531,7 +534,7 @@ func main() { if *debugCliFlag { responsivenessStabilizerDebugLevel = debug.Debug } - responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint]( + responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint64]( specParameters.MovingAvgDist, specParameters.StdDevTolerance, specParameters.TrimmedMeanPct, "milliseconds", responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig) @@ -628,8 +631,8 @@ func main() { ) } - perDirectionSelfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) - perDirectionForeignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) + perDirectionSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) + perDirectionForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber( proberOperatorCtx, @@ -637,6 +640,7 @@ func main() { generateForeignProbeConfiguration, generateSelfProbeConfiguration, &direction.Lgcc, + &globalNumericBucketGenerator, direction.CreateLgdc().Direction(), // TODO: This could be better! specParameters.ProbeInterval, sslKeyFileConcurrentWriter, diff --git a/rpm/rpm.go b/rpm/rpm.go index b1b8ba4..0f51461 100644 --- a/rpm/rpm.go +++ b/rpm/rpm.go @@ -90,18 +90,19 @@ type ResponsivenessProbeResult struct { Self *probe.ProbeDataPoint } -func ResponsivenessProber( +func ResponsivenessProber[BucketType utilities.Number]( proberCtx context.Context, networkActivityCtx context.Context, foreignProbeConfigurationGenerator func() probe.ProbeConfiguration, selfProbeConfigurationGenerator func() probe.ProbeConfiguration, selfProbeConnectionCollection *lgc.LoadGeneratingConnectionCollection, + bucketGenerator *series.NumericBucketGenerator[BucketType], probeDirection lgc.LgcDirection, probeInterval time.Duration, keyLogger io.Writer, captureExtendedStats bool, debugging *debug.DebugWithPrefix, -) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, uint]) { +) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, BucketType]) { if debug.IsDebug(debugging.Level) { fmt.Printf( "(%s) Starting to collect responsiveness information at an interval of %v!\n", @@ -112,7 +113,7 @@ func ResponsivenessProber( // Make a channel to send back all the generated data points // when we are probing. - dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, uint]) + dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, BucketType]) go func() { wg := sync.WaitGroup{} @@ -147,8 +148,10 @@ func ResponsivenessProber( ) } - dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{ - Type: series.SeriesMessageReserve, Bucket: probeCount, + currentBucketId := bucketGenerator.Generate() + + dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, BucketType]{ + Type: series.SeriesMessageReserve, Bucket: currentBucketId, Measure: utilities.None[ResponsivenessProbeResult](), } @@ -277,8 +280,8 @@ func ResponsivenessProber( Foreign: foreignProbeDataPoint, Self: selfProbeDataPoint, } - dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{ - Type: series.SeriesMessageMeasure, Bucket: probeCount, + dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, BucketType]{ + Type: series.SeriesMessageMeasure, Bucket: currentBucketId, Measure: utilities.Some[ResponsivenessProbeResult](measurement), } } @@ -306,18 +309,19 @@ func ResponsivenessProber( return } -func LoadGenerator( +func LoadGenerator[BucketType utilities.Number]( throughputCtx context.Context, // Stop our activity when we no longer need any throughput networkActivityCtx context.Context, // Create all network connections in this context. generateLoadCtx context.Context, // Stop adding additional throughput when we are stable. rampupInterval time.Duration, lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection. loadGeneratingConnectionsCollection *lgc.LoadGeneratingConnectionCollection, + bucketGenerator *series.NumericBucketGenerator[BucketType], mnp int, captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections? debugging *debug.DebugWithPrefix, // How can we forget debugging? -) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, uint64]) { // Send back all the instantaneous throughputs that we generate. - seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, uint64]) +) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, BucketType]) { // Send back all the instantaneous throughputs that we generate. + seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, BucketType]) go func() { flowsCreated := uint64(0) @@ -332,7 +336,7 @@ func LoadGenerator( nextSampleStartTime := time.Now().Add(rampupInterval) - for currentInterval := uint64(0); true; currentInterval++ { + for currentIntervalId := uint64(0); true; currentIntervalId++ { // If the throughputCtx is canceled, then that means our work here is done ... if throughputCtx.Err() != nil { @@ -471,11 +475,13 @@ func LoadGenerator( granularThroughputDatapoints, } - seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{ - Type: series.SeriesMessageReserve, Bucket: currentInterval, + currentBucketId := bucketGenerator.Generate() + + seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, BucketType]{ + Type: series.SeriesMessageReserve, Bucket: currentBucketId, } - seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{ - Type: series.SeriesMessageMeasure, Bucket: currentInterval, + seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, BucketType]{ + Type: series.SeriesMessageMeasure, Bucket: currentBucketId, Measure: utilities.Some[ThroughputDataPoint](throughputDataPoint), } diff --git a/series/series.go b/series/series.go index 0084007..43d9809 100644 --- a/series/series.go +++ b/series/series.go @@ -15,6 +15,7 @@ package series import ( "fmt" + "sync" "github.com/network-quality/goresponsiveness/utilities" "golang.org/x/exp/constraints" @@ -278,3 +279,20 @@ func NewWindowSeries[Data any, Bucket constraints.Ordered](tipe WindowSeriesDura } panic("") } + +type NumericBucketGenerator[T utilities.Number] struct { + mt sync.Mutex + currentValue T +} + +func (bg *NumericBucketGenerator[T]) Generate() T { + bg.mt.Lock() + defer bg.mt.Unlock() + + bg.currentValue++ + return bg.currentValue +} + +func NewNumericBucketGenerator[T utilities.Number](initialValue T) NumericBucketGenerator[T] { + return NumericBucketGenerator[T]{currentValue: initialValue} +} -- cgit v1.2.3