summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2023-07-12 11:11:10 -0400
committerWill Hawkins <[email protected]>2023-07-12 11:11:10 -0400
commit399eda676f7889accf72231c6696b89de7ea3fae (patch)
treeddbe2afa4ef2a1e28b9a8b36577832b12a74b138
parent06fd8c3b39979316ec8917d471416114a5b7c581 (diff)
[Bugfix] Duplicate bucket IDs caused incorrect results
The upload direction reused bucket IDs used during the test in the download direction which caused an incorrect grand-total RPM calculation. To solve the problem, this patch adds a global bucket ID generator and passes that to everyone that needs it. TODO: Make the bucket generator type more generic. Signed-off-by: Will Hawkins <[email protected]>
-rw-r--r--networkQuality.go14
-rw-r--r--rpm/rpm.go36
-rw-r--r--series/series.go18
3 files changed, 48 insertions, 20 deletions
diff --git a/networkQuality.go b/networkQuality.go
index 0fff8a6..9671eb3 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -471,14 +471,16 @@ func main() {
}
// All tests will accumulate data to these series because it will all matter for RPM calculation!
- selfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0)
- foreignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0)
+ selfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
+ foreignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil
if *printQualityAttenuation {
selfRttsQualityAttenuation = qualityattenuation.NewSimpleQualityAttenuation()
}
+ globalNumericBucketGenerator := series.NewNumericBucketGenerator[uint64](0)
+
for _, direction := range directions {
timeoutDuration := specParameters.TestTimeout
@@ -510,6 +512,7 @@ func main() {
specParameters.EvalInterval,
direction.CreateLgdc,
&direction.Lgcc,
+ &globalNumericBucketGenerator,
specParameters.MaxParallelConns,
*calculateExtendedStats,
direction.DirectionDebugging,
@@ -531,7 +534,7 @@ func main() {
if *debugCliFlag {
responsivenessStabilizerDebugLevel = debug.Debug
}
- responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint](
+ responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint64](
specParameters.MovingAvgDist, specParameters.StdDevTolerance,
specParameters.TrimmedMeanPct, "milliseconds",
responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig)
@@ -628,8 +631,8 @@ func main() {
)
}
- perDirectionSelfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0)
- perDirectionForeignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0)
+ perDirectionSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
+ perDirectionForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber(
proberOperatorCtx,
@@ -637,6 +640,7 @@ func main() {
generateForeignProbeConfiguration,
generateSelfProbeConfiguration,
&direction.Lgcc,
+ &globalNumericBucketGenerator,
direction.CreateLgdc().Direction(), // TODO: This could be better!
specParameters.ProbeInterval,
sslKeyFileConcurrentWriter,
diff --git a/rpm/rpm.go b/rpm/rpm.go
index b1b8ba4..0f51461 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -90,18 +90,19 @@ type ResponsivenessProbeResult struct {
Self *probe.ProbeDataPoint
}
-func ResponsivenessProber(
+func ResponsivenessProber[BucketType utilities.Number](
proberCtx context.Context,
networkActivityCtx context.Context,
foreignProbeConfigurationGenerator func() probe.ProbeConfiguration,
selfProbeConfigurationGenerator func() probe.ProbeConfiguration,
selfProbeConnectionCollection *lgc.LoadGeneratingConnectionCollection,
+ bucketGenerator *series.NumericBucketGenerator[BucketType],
probeDirection lgc.LgcDirection,
probeInterval time.Duration,
keyLogger io.Writer,
captureExtendedStats bool,
debugging *debug.DebugWithPrefix,
-) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, uint]) {
+) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, BucketType]) {
if debug.IsDebug(debugging.Level) {
fmt.Printf(
"(%s) Starting to collect responsiveness information at an interval of %v!\n",
@@ -112,7 +113,7 @@ func ResponsivenessProber(
// Make a channel to send back all the generated data points
// when we are probing.
- dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, uint])
+ dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, BucketType])
go func() {
wg := sync.WaitGroup{}
@@ -147,8 +148,10 @@ func ResponsivenessProber(
)
}
- dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{
- Type: series.SeriesMessageReserve, Bucket: probeCount,
+ currentBucketId := bucketGenerator.Generate()
+
+ dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, BucketType]{
+ Type: series.SeriesMessageReserve, Bucket: currentBucketId,
Measure: utilities.None[ResponsivenessProbeResult](),
}
@@ -277,8 +280,8 @@ func ResponsivenessProber(
Foreign: foreignProbeDataPoint, Self: selfProbeDataPoint,
}
- dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{
- Type: series.SeriesMessageMeasure, Bucket: probeCount,
+ dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, BucketType]{
+ Type: series.SeriesMessageMeasure, Bucket: currentBucketId,
Measure: utilities.Some[ResponsivenessProbeResult](measurement),
}
}
@@ -306,18 +309,19 @@ func ResponsivenessProber(
return
}
-func LoadGenerator(
+func LoadGenerator[BucketType utilities.Number](
throughputCtx context.Context, // Stop our activity when we no longer need any throughput
networkActivityCtx context.Context, // Create all network connections in this context.
generateLoadCtx context.Context, // Stop adding additional throughput when we are stable.
rampupInterval time.Duration,
lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection.
loadGeneratingConnectionsCollection *lgc.LoadGeneratingConnectionCollection,
+ bucketGenerator *series.NumericBucketGenerator[BucketType],
mnp int,
captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections?
debugging *debug.DebugWithPrefix, // How can we forget debugging?
-) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, uint64]) { // Send back all the instantaneous throughputs that we generate.
- seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, uint64])
+) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, BucketType]) { // Send back all the instantaneous throughputs that we generate.
+ seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, BucketType])
go func() {
flowsCreated := uint64(0)
@@ -332,7 +336,7 @@ func LoadGenerator(
nextSampleStartTime := time.Now().Add(rampupInterval)
- for currentInterval := uint64(0); true; currentInterval++ {
+ for currentIntervalId := uint64(0); true; currentIntervalId++ {
// If the throughputCtx is canceled, then that means our work here is done ...
if throughputCtx.Err() != nil {
@@ -471,11 +475,13 @@ func LoadGenerator(
granularThroughputDatapoints,
}
- seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{
- Type: series.SeriesMessageReserve, Bucket: currentInterval,
+ currentBucketId := bucketGenerator.Generate()
+
+ seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, BucketType]{
+ Type: series.SeriesMessageReserve, Bucket: currentBucketId,
}
- seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{
- Type: series.SeriesMessageMeasure, Bucket: currentInterval,
+ seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, BucketType]{
+ Type: series.SeriesMessageMeasure, Bucket: currentBucketId,
Measure: utilities.Some[ThroughputDataPoint](throughputDataPoint),
}
diff --git a/series/series.go b/series/series.go
index 0084007..43d9809 100644
--- a/series/series.go
+++ b/series/series.go
@@ -15,6 +15,7 @@ package series
import (
"fmt"
+ "sync"
"github.com/network-quality/goresponsiveness/utilities"
"golang.org/x/exp/constraints"
@@ -278,3 +279,20 @@ func NewWindowSeries[Data any, Bucket constraints.Ordered](tipe WindowSeriesDura
}
panic("")
}
+
+type NumericBucketGenerator[T utilities.Number] struct {
+ mt sync.Mutex
+ currentValue T
+}
+
+func (bg *NumericBucketGenerator[T]) Generate() T {
+ bg.mt.Lock()
+ defer bg.mt.Unlock()
+
+ bg.currentValue++
+ return bg.currentValue
+}
+
+func NewNumericBucketGenerator[T utilities.Number](initialValue T) NumericBucketGenerator[T] {
+ return NumericBucketGenerator[T]{currentValue: initialValue}
+}