From 78d574a74665c8bc062c26755c80a8b524bce347 Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Mon, 10 Jul 2023 13:45:50 -0400 Subject: [Feature] Major update: Track measurements that may be delayed Among other major feature additions, this version of the client tracks any measurements that may be long delayed and considers their presence or absence as part of a stability measurement. This version of the client also more closely tracks the spec. In particular, it performs a sinle-sided trimmed mean rather than a double-sided trimmed mean. Signed-off-by: Will Hawkins --- networkQuality.go | 233 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 166 insertions(+), 67 deletions(-) (limited to 'networkQuality.go') diff --git a/networkQuality.go b/networkQuality.go index 776c0e7..aa8d854 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -33,10 +33,10 @@ import ( "github.com/network-quality/goresponsiveness/direction" "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" - "github.com/network-quality/goresponsiveness/ms" "github.com/network-quality/goresponsiveness/probe" "github.com/network-quality/goresponsiveness/qualityattenuation" "github.com/network-quality/goresponsiveness/rpm" + "github.com/network-quality/goresponsiveness/series" "github.com/network-quality/goresponsiveness/stabilizer" "github.com/network-quality/goresponsiveness/timeoutat" "github.com/network-quality/goresponsiveness/utilities" @@ -471,8 +471,8 @@ func main() { } // All tests will accumulate data to these series because it will all matter for RPM calculation! - selfRtts := ms.NewInfiniteMathematicalSeries[float64]() - foreignRtts := ms.NewInfiniteMathematicalSeries[float64]() + selfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) + foreignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil if *printQualityAttenuation { @@ -521,8 +521,8 @@ func main() { if *debugCliFlag { downloadThroughputStabilizerDebugLevel = debug.Debug } - throughputStabilizer := stabilizer.NewStabilizer[float64]( - uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance, 0, "bytes", + throughputStabilizer := stabilizer.NewStabilizer[float64, uint64]( + specParameters.MovingAvgDist, specParameters.StdDevTolerance, 0, "bytes", downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig) responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, @@ -531,8 +531,8 @@ func main() { if *debugCliFlag { responsivenessStabilizerDebugLevel = debug.Debug } - responsivenessStabilizer := stabilizer.NewStabilizer[int64]( - uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance, + responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint]( + specParameters.MovingAvgDist, specParameters.StdDevTolerance, specParameters.TrimmedMeanPct, "milliseconds", responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig) @@ -541,32 +541,70 @@ func main() { lastThroughputRate := float64(0) lastThroughputOpenConnectionCount := int(0) + stabilityCheckTime := time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel := timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) + lg_timeout: for !direction.StableThroughput { select { case throughputMeasurement := <-lgStabilizationCommunicationChannel: { - throughputStabilizer.AddMeasurement( - throughputMeasurement.Throughput) + switch throughputMeasurement.Type { + case series.SeriesMessageReserve: + { + throughputStabilizer.Reserve(throughputMeasurement.Bucket) + if *debugCliFlag { + fmt.Printf( + "%s: Reserving a throughput bucket with id %v.\n", + direction.DirectionLabel, throughputMeasurement.Bucket) + } + } + case series.SeriesMessageMeasure: + { + bucket := throughputMeasurement.Bucket + measurement := utilities.GetSome(throughputMeasurement.Measure) + + throughputStabilizer.AddMeasurement(bucket, measurement.Throughput) + + direction.ThroughputDataLogger.LogRecord(measurement) + for _, v := range measurement.GranularThroughputDataPoints { + v.Direction = "Download" + direction.GranularThroughputDataLogger.LogRecord(v) + } + + lastThroughputRate = measurement.Throughput + lastThroughputOpenConnectionCount = measurement.Connections + } + } + } + case <-stabilityCheckTimeChannel: + { + if *debugCliFlag { + fmt.Printf( + "%v throughput stability interval is complete.\n", direction.DirectionLabel) + } + stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel = timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) + direction.StableThroughput = throughputStabilizer.IsStable() if *debugCliFlag { fmt.Printf( - "################# %v is instantaneously %s.\n", direction.DirectionLabel, + "%v is instantaneously %s.\n", direction.DirectionLabel, utilities.Conditional(direction.StableThroughput, "stable", "unstable")) } - direction.ThroughputDataLogger.LogRecord(throughputMeasurement) - for i := range throughputMeasurement.GranularThroughputDataPoints { - datapoint := throughputMeasurement.GranularThroughputDataPoints[i] - datapoint.Direction = "Download" - direction.GranularThroughputDataLogger.LogRecord(datapoint) - } - - lastThroughputRate = throughputMeasurement.Throughput - lastThroughputOpenConnectionCount = throughputMeasurement.Connections if direction.StableThroughput { throughputGeneratorCtxCancel() } + throughputStabilizer.Interval() } case <-timeoutChannel: { @@ -577,7 +615,7 @@ func main() { if direction.StableThroughput { if *debugCliFlag { - fmt.Printf("################# Throughput is stable; beginning responsiveness testing.\n") + fmt.Printf("Throughput is stable; beginning responsiveness testing.\n") } } else { fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Adding 15 seconds to calculate speculative RPM results.\n") @@ -590,8 +628,8 @@ func main() { ) } - perDirectionSelfRtts := ms.NewInfiniteMathematicalSeries[float64]() - perDirectionForeignRtts := ms.NewInfiniteMathematicalSeries[float64]() + perDirectionSelfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) + perDirectionForeignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber( proberOperatorCtx, @@ -611,11 +649,109 @@ func main() { select { case probeMeasurement := <-responsivenessStabilizationCommunicationChannel: { - foreignDataPoint := probeMeasurement.First - selfDataPoint := probeMeasurement.Second + switch probeMeasurement.Type { + case series.SeriesMessageReserve: + { + bucket := probeMeasurement.Bucket + if *debugCliFlag { + fmt.Printf( + "%s: Reserving a responsiveness bucket with id %v.\n", direction.DirectionLabel, bucket) + } + responsivenessStabilizer.Reserve(bucket) + selfRtts.Reserve(bucket) + foreignRtts.Reserve(bucket) + perDirectionForeignRtts.Reserve(bucket) + perDirectionSelfRtts.Reserve(bucket) + } + case series.SeriesMessageMeasure: + { + bucket := probeMeasurement.Bucket + measurement := utilities.GetSome(probeMeasurement.Measure) + foreignDataPoint := measurement.Foreign + selfDataPoint := measurement.Self + + if *debugCliFlag { + fmt.Printf( + "%s: Filling a responsiveness bucket with id %v with value %v.\n", + direction.DirectionLabel, bucket, measurement) + } + responsivenessStabilizer.AddMeasurement(bucket, + (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds()) + + if err := selfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (selfRtts)\n", bucket) + } + if perDirectionSelfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionSelfRtts)\n", bucket) + } + + if foreignRtts.Fill(bucket, foreignDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (foreignRtts)\n", bucket) + } + + if perDirectionForeignRtts.Fill(bucket, + foreignDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionForeignRtts)\n", bucket) + } + + if selfRttsQualityAttenuation != nil { + selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds()) + } + + direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint) + direction.SelfProbeDataLogger.LogRecord(*selfDataPoint) + + } + } + } + case throughputMeasurement := <-lgStabilizationCommunicationChannel: + { + switch throughputMeasurement.Type { + case series.SeriesMessageReserve: + { + // We are no longer tracking stability, so reservation messages are useless! + if *debugCliFlag { + fmt.Printf( + "%s: Discarding a throughput bucket with id %v when ascertaining responsiveness.\n", + direction.DirectionLabel, throughputMeasurement.Bucket) + } + } + case series.SeriesMessageMeasure: + { + measurement := utilities.GetSome(throughputMeasurement.Measure) + + if *debugCliFlag { + fmt.Printf("Adding a throughput measurement (while ascertaining responsiveness).\n") + } + // There may be more than one round trip accumulated together. If that is the case, + direction.ThroughputDataLogger.LogRecord(measurement) + for _, v := range measurement.GranularThroughputDataPoints { + v.Direction = direction.DirectionLabel + direction.GranularThroughputDataLogger.LogRecord(v) + } + + lastThroughputRate = measurement.Throughput + lastThroughputOpenConnectionCount = measurement.Connections + } + } + } + case <-timeoutChannel: + { + break responsiveness_timeout + } + case <-stabilityCheckTimeChannel: + { + if *debugCliFlag { + fmt.Printf( + "%v responsiveness stability interval is complete.\n", direction.DirectionLabel) + } - responsivenessStabilizer.AddMeasurement( - (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds()) + stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel = timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy // is *actually* important, but it can't hurt? @@ -623,48 +759,11 @@ func main() { if *debugCliFlag { fmt.Printf( - "################# Responsiveness is instantaneously %s.\n", + "Responsiveness is instantaneously %s.\n", utilities.Conditional(direction.StableResponsiveness, "stable", "unstable")) } - // There may be more than one round trip accumulated together. If that is the case, - // we will blow them apart in to three separate measurements and each one will just - // be 1 / measurement.RoundTripCount of the total length. - for range utilities.Iota(0, int(foreignDataPoint.RoundTripCount)) { - foreignRtts.AddElement(foreignDataPoint.Duration.Seconds() / - float64(foreignDataPoint.RoundTripCount)) - perDirectionForeignRtts.AddElement(foreignDataPoint.Duration.Seconds() / - float64(foreignDataPoint.RoundTripCount)) - } - selfRtts.AddElement(selfDataPoint.Duration.Seconds()) - perDirectionSelfRtts.AddElement(selfDataPoint.Duration.Seconds()) - - if selfRttsQualityAttenuation != nil { - selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds()) - } - direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint) - direction.SelfProbeDataLogger.LogRecord(*selfDataPoint) - } - case throughputMeasurement := <-lgStabilizationCommunicationChannel: - { - if *debugCliFlag { - fmt.Printf("Adding a throughput measurement.\n") - } - // There may be more than one round trip accumulated together. If that is the case, - direction.ThroughputDataLogger.LogRecord(throughputMeasurement) - for i := range throughputMeasurement.GranularThroughputDataPoints { - datapoint := throughputMeasurement.GranularThroughputDataPoints[i] - datapoint.Direction = direction.DirectionLabel - direction.GranularThroughputDataLogger.LogRecord(datapoint) - } - - lastThroughputRate = throughputMeasurement.Throughput - lastThroughputOpenConnectionCount = throughputMeasurement.Connections - - } - case <-timeoutChannel: - { - break responsiveness_timeout + responsivenessStabilizer.Interval() } } } @@ -753,7 +852,7 @@ func main() { } fmt.Printf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, directionResult.PNRpm, 90) - fmt.Printf("%s RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel, + fmt.Printf("%s RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel, directionResult.MeanRpm, specParameters.TrimmedMeanPct) if len(*prometheusStatsFilename) > 0 { @@ -808,7 +907,7 @@ func main() { } fmt.Printf("Final RPM: %5.0f (P%d)\n", result.PNRpm, 90) - fmt.Printf("Final RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n", + fmt.Printf("Final RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", result.MeanRpm, specParameters.TrimmedMeanPct) // Stop the world. -- cgit v1.2.3