diff options
| author | Will Hawkins <[email protected]> | 2023-07-10 13:45:50 -0400 |
|---|---|---|
| committer | Will Hawkins <[email protected]> | 2023-07-10 13:45:50 -0400 |
| commit | 78d574a74665c8bc062c26755c80a8b524bce347 (patch) | |
| tree | 7ad65f0052defaea63acb2f3445be00ef97e24d6 /networkQuality.go | |
| parent | fe17152a507bbf94a11cca7f49a51cbae9c0d67b (diff) | |
[Feature] Major update: Track measurements that may be delayed
Among other major feature additions, this version of the client tracks
any measurements that may be long delayed and considers their presence
or absence as part of a stability measurement.
This version of the client also more closely tracks the spec. In
particular, it performs a sinle-sided trimmed mean rather than a
double-sided trimmed mean.
Signed-off-by: Will Hawkins <[email protected]>
Diffstat (limited to 'networkQuality.go')
| -rw-r--r-- | networkQuality.go | 227 |
1 files changed, 163 insertions, 64 deletions
diff --git a/networkQuality.go b/networkQuality.go index 776c0e7..aa8d854 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -33,10 +33,10 @@ import ( "github.com/network-quality/goresponsiveness/direction" "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" - "github.com/network-quality/goresponsiveness/ms" "github.com/network-quality/goresponsiveness/probe" "github.com/network-quality/goresponsiveness/qualityattenuation" "github.com/network-quality/goresponsiveness/rpm" + "github.com/network-quality/goresponsiveness/series" "github.com/network-quality/goresponsiveness/stabilizer" "github.com/network-quality/goresponsiveness/timeoutat" "github.com/network-quality/goresponsiveness/utilities" @@ -471,8 +471,8 @@ func main() { } // All tests will accumulate data to these series because it will all matter for RPM calculation! - selfRtts := ms.NewInfiniteMathematicalSeries[float64]() - foreignRtts := ms.NewInfiniteMathematicalSeries[float64]() + selfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) + foreignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil if *printQualityAttenuation { @@ -521,8 +521,8 @@ func main() { if *debugCliFlag { downloadThroughputStabilizerDebugLevel = debug.Debug } - throughputStabilizer := stabilizer.NewStabilizer[float64]( - uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance, 0, "bytes", + throughputStabilizer := stabilizer.NewStabilizer[float64, uint64]( + specParameters.MovingAvgDist, specParameters.StdDevTolerance, 0, "bytes", downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig) responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, @@ -531,8 +531,8 @@ func main() { if *debugCliFlag { responsivenessStabilizerDebugLevel = debug.Debug } - responsivenessStabilizer := stabilizer.NewStabilizer[int64]( - uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance, + responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint]( + specParameters.MovingAvgDist, specParameters.StdDevTolerance, specParameters.TrimmedMeanPct, "milliseconds", responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig) @@ -541,32 +541,70 @@ func main() { lastThroughputRate := float64(0) lastThroughputOpenConnectionCount := int(0) + stabilityCheckTime := time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel := timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) + lg_timeout: for !direction.StableThroughput { select { case throughputMeasurement := <-lgStabilizationCommunicationChannel: { - throughputStabilizer.AddMeasurement( - throughputMeasurement.Throughput) + switch throughputMeasurement.Type { + case series.SeriesMessageReserve: + { + throughputStabilizer.Reserve(throughputMeasurement.Bucket) + if *debugCliFlag { + fmt.Printf( + "%s: Reserving a throughput bucket with id %v.\n", + direction.DirectionLabel, throughputMeasurement.Bucket) + } + } + case series.SeriesMessageMeasure: + { + bucket := throughputMeasurement.Bucket + measurement := utilities.GetSome(throughputMeasurement.Measure) + + throughputStabilizer.AddMeasurement(bucket, measurement.Throughput) + + direction.ThroughputDataLogger.LogRecord(measurement) + for _, v := range measurement.GranularThroughputDataPoints { + v.Direction = "Download" + direction.GranularThroughputDataLogger.LogRecord(v) + } + + lastThroughputRate = measurement.Throughput + lastThroughputOpenConnectionCount = measurement.Connections + } + } + } + case <-stabilityCheckTimeChannel: + { + if *debugCliFlag { + fmt.Printf( + "%v throughput stability interval is complete.\n", direction.DirectionLabel) + } + stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel = timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) + direction.StableThroughput = throughputStabilizer.IsStable() if *debugCliFlag { fmt.Printf( - "################# %v is instantaneously %s.\n", direction.DirectionLabel, + "%v is instantaneously %s.\n", direction.DirectionLabel, utilities.Conditional(direction.StableThroughput, "stable", "unstable")) } - direction.ThroughputDataLogger.LogRecord(throughputMeasurement) - for i := range throughputMeasurement.GranularThroughputDataPoints { - datapoint := throughputMeasurement.GranularThroughputDataPoints[i] - datapoint.Direction = "Download" - direction.GranularThroughputDataLogger.LogRecord(datapoint) - } - - lastThroughputRate = throughputMeasurement.Throughput - lastThroughputOpenConnectionCount = throughputMeasurement.Connections if direction.StableThroughput { throughputGeneratorCtxCancel() } + throughputStabilizer.Interval() } case <-timeoutChannel: { @@ -577,7 +615,7 @@ func main() { if direction.StableThroughput { if *debugCliFlag { - fmt.Printf("################# Throughput is stable; beginning responsiveness testing.\n") + fmt.Printf("Throughput is stable; beginning responsiveness testing.\n") } } else { fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Adding 15 seconds to calculate speculative RPM results.\n") @@ -590,8 +628,8 @@ func main() { ) } - perDirectionSelfRtts := ms.NewInfiniteMathematicalSeries[float64]() - perDirectionForeignRtts := ms.NewInfiniteMathematicalSeries[float64]() + perDirectionSelfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) + perDirectionForeignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber( proberOperatorCtx, @@ -611,61 +649,122 @@ func main() { select { case probeMeasurement := <-responsivenessStabilizationCommunicationChannel: { - foreignDataPoint := probeMeasurement.First - selfDataPoint := probeMeasurement.Second + switch probeMeasurement.Type { + case series.SeriesMessageReserve: + { + bucket := probeMeasurement.Bucket + if *debugCliFlag { + fmt.Printf( + "%s: Reserving a responsiveness bucket with id %v.\n", direction.DirectionLabel, bucket) + } + responsivenessStabilizer.Reserve(bucket) + selfRtts.Reserve(bucket) + foreignRtts.Reserve(bucket) + perDirectionForeignRtts.Reserve(bucket) + perDirectionSelfRtts.Reserve(bucket) + } + case series.SeriesMessageMeasure: + { + bucket := probeMeasurement.Bucket + measurement := utilities.GetSome(probeMeasurement.Measure) + foreignDataPoint := measurement.Foreign + selfDataPoint := measurement.Self - responsivenessStabilizer.AddMeasurement( - (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds()) + if *debugCliFlag { + fmt.Printf( + "%s: Filling a responsiveness bucket with id %v with value %v.\n", + direction.DirectionLabel, bucket, measurement) + } + responsivenessStabilizer.AddMeasurement(bucket, + (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds()) - // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy - // is *actually* important, but it can't hurt? - direction.StableResponsiveness = responsivenessStabilizer.IsStable() + if err := selfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (selfRtts)\n", bucket) + } + if perDirectionSelfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionSelfRtts)\n", bucket) + } - if *debugCliFlag { - fmt.Printf( - "################# Responsiveness is instantaneously %s.\n", - utilities.Conditional(direction.StableResponsiveness, "stable", "unstable")) - } - // There may be more than one round trip accumulated together. If that is the case, - // we will blow them apart in to three separate measurements and each one will just - // be 1 / measurement.RoundTripCount of the total length. - for range utilities.Iota(0, int(foreignDataPoint.RoundTripCount)) { - foreignRtts.AddElement(foreignDataPoint.Duration.Seconds() / - float64(foreignDataPoint.RoundTripCount)) - perDirectionForeignRtts.AddElement(foreignDataPoint.Duration.Seconds() / - float64(foreignDataPoint.RoundTripCount)) - } - selfRtts.AddElement(selfDataPoint.Duration.Seconds()) - perDirectionSelfRtts.AddElement(selfDataPoint.Duration.Seconds()) + if foreignRtts.Fill(bucket, foreignDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (foreignRtts)\n", bucket) + } - if selfRttsQualityAttenuation != nil { - selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds()) - } + if perDirectionForeignRtts.Fill(bucket, + foreignDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionForeignRtts)\n", bucket) + } + + if selfRttsQualityAttenuation != nil { + selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds()) + } - direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint) - direction.SelfProbeDataLogger.LogRecord(*selfDataPoint) + direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint) + direction.SelfProbeDataLogger.LogRecord(*selfDataPoint) + + } + } } case throughputMeasurement := <-lgStabilizationCommunicationChannel: { - if *debugCliFlag { - fmt.Printf("Adding a throughput measurement.\n") - } - // There may be more than one round trip accumulated together. If that is the case, - direction.ThroughputDataLogger.LogRecord(throughputMeasurement) - for i := range throughputMeasurement.GranularThroughputDataPoints { - datapoint := throughputMeasurement.GranularThroughputDataPoints[i] - datapoint.Direction = direction.DirectionLabel - direction.GranularThroughputDataLogger.LogRecord(datapoint) - } + switch throughputMeasurement.Type { + case series.SeriesMessageReserve: + { + // We are no longer tracking stability, so reservation messages are useless! + if *debugCliFlag { + fmt.Printf( + "%s: Discarding a throughput bucket with id %v when ascertaining responsiveness.\n", + direction.DirectionLabel, throughputMeasurement.Bucket) + } + } + case series.SeriesMessageMeasure: + { + measurement := utilities.GetSome(throughputMeasurement.Measure) - lastThroughputRate = throughputMeasurement.Throughput - lastThroughputOpenConnectionCount = throughputMeasurement.Connections + if *debugCliFlag { + fmt.Printf("Adding a throughput measurement (while ascertaining responsiveness).\n") + } + // There may be more than one round trip accumulated together. If that is the case, + direction.ThroughputDataLogger.LogRecord(measurement) + for _, v := range measurement.GranularThroughputDataPoints { + v.Direction = direction.DirectionLabel + direction.GranularThroughputDataLogger.LogRecord(v) + } + lastThroughputRate = measurement.Throughput + lastThroughputOpenConnectionCount = measurement.Connections + } + } } case <-timeoutChannel: { break responsiveness_timeout } + case <-stabilityCheckTimeChannel: + { + if *debugCliFlag { + fmt.Printf( + "%v responsiveness stability interval is complete.\n", direction.DirectionLabel) + } + + stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel = timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) + + // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy + // is *actually* important, but it can't hurt? + direction.StableResponsiveness = responsivenessStabilizer.IsStable() + + if *debugCliFlag { + fmt.Printf( + "Responsiveness is instantaneously %s.\n", + utilities.Conditional(direction.StableResponsiveness, "stable", "unstable")) + } + + responsivenessStabilizer.Interval() + } } } @@ -753,7 +852,7 @@ func main() { } fmt.Printf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, directionResult.PNRpm, 90) - fmt.Printf("%s RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel, + fmt.Printf("%s RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel, directionResult.MeanRpm, specParameters.TrimmedMeanPct) if len(*prometheusStatsFilename) > 0 { @@ -808,7 +907,7 @@ func main() { } fmt.Printf("Final RPM: %5.0f (P%d)\n", result.PNRpm, 90) - fmt.Printf("Final RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n", + fmt.Printf("Final RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", result.MeanRpm, specParameters.TrimmedMeanPct) // Stop the world. |
