diff options
Diffstat (limited to 'networkQuality.go')
| -rw-r--r-- | networkQuality.go | 227 |
1 files changed, 163 insertions, 64 deletions
diff --git a/networkQuality.go b/networkQuality.go index 776c0e7..aa8d854 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -33,10 +33,10 @@ import ( "github.com/network-quality/goresponsiveness/direction" "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" - "github.com/network-quality/goresponsiveness/ms" "github.com/network-quality/goresponsiveness/probe" "github.com/network-quality/goresponsiveness/qualityattenuation" "github.com/network-quality/goresponsiveness/rpm" + "github.com/network-quality/goresponsiveness/series" "github.com/network-quality/goresponsiveness/stabilizer" "github.com/network-quality/goresponsiveness/timeoutat" "github.com/network-quality/goresponsiveness/utilities" @@ -471,8 +471,8 @@ func main() { } // All tests will accumulate data to these series because it will all matter for RPM calculation! - selfRtts := ms.NewInfiniteMathematicalSeries[float64]() - foreignRtts := ms.NewInfiniteMathematicalSeries[float64]() + selfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) + foreignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil if *printQualityAttenuation { @@ -521,8 +521,8 @@ func main() { if *debugCliFlag { downloadThroughputStabilizerDebugLevel = debug.Debug } - throughputStabilizer := stabilizer.NewStabilizer[float64]( - uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance, 0, "bytes", + throughputStabilizer := stabilizer.NewStabilizer[float64, uint64]( + specParameters.MovingAvgDist, specParameters.StdDevTolerance, 0, "bytes", downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig) responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, @@ -531,8 +531,8 @@ func main() { if *debugCliFlag { responsivenessStabilizerDebugLevel = debug.Debug } - responsivenessStabilizer := stabilizer.NewStabilizer[int64]( - uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance, + responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint]( + specParameters.MovingAvgDist, specParameters.StdDevTolerance, specParameters.TrimmedMeanPct, "milliseconds", responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig) @@ -541,32 +541,70 @@ func main() { lastThroughputRate := float64(0) lastThroughputOpenConnectionCount := int(0) + stabilityCheckTime := time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel := timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) + lg_timeout: for !direction.StableThroughput { select { case throughputMeasurement := <-lgStabilizationCommunicationChannel: { - throughputStabilizer.AddMeasurement( - throughputMeasurement.Throughput) + switch throughputMeasurement.Type { + case series.SeriesMessageReserve: + { + throughputStabilizer.Reserve(throughputMeasurement.Bucket) + if *debugCliFlag { + fmt.Printf( + "%s: Reserving a throughput bucket with id %v.\n", + direction.DirectionLabel, throughputMeasurement.Bucket) + } + } + case series.SeriesMessageMeasure: + { + bucket := throughputMeasurement.Bucket + measurement := utilities.GetSome(throughputMeasurement.Measure) + + throughputStabilizer.AddMeasurement(bucket, measurement.Throughput) + + direction.ThroughputDataLogger.LogRecord(measurement) + for _, v := range measurement.GranularThroughputDataPoints { + v.Direction = "Download" + direction.GranularThroughputDataLogger.LogRecord(v) + } + + lastThroughputRate = measurement.Throughput + lastThroughputOpenConnectionCount = measurement.Connections + } + } + } + case <-stabilityCheckTimeChannel: + { + if *debugCliFlag { + fmt.Printf( + "%v throughput stability interval is complete.\n", direction.DirectionLabel) + } + stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel = timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) + direction.StableThroughput = throughputStabilizer.IsStable() if *debugCliFlag { fmt.Printf( - "################# %v is instantaneously %s.\n", direction.DirectionLabel, + "%v is instantaneously %s.\n", direction.DirectionLabel, utilities.Conditional(direction.StableThroughput, "stable", "unstable")) } - direction.ThroughputDataLogger.LogRecord(throughputMeasurement) - for i := range throughputMeasurement.GranularThroughputDataPoints { - datapoint := throughputMeasurement.GranularThroughputDataPoints[i] - datapoint.Direction = "Download" - direction.GranularThroughputDataLogger.LogRecord(datapoint) - } - - lastThroughputRate = throughputMeasurement.Throughput - lastThroughputOpenConnectionCount = throughputMeasurement.Connections if direction.StableThroughput { throughputGeneratorCtxCancel() } + throughputStabilizer.Interval() } case <-timeoutChannel: { @@ -577,7 +615,7 @@ func main() { if direction.StableThroughput { if *debugCliFlag { - fmt.Printf("################# Throughput is stable; beginning responsiveness testing.\n") + fmt.Printf("Throughput is stable; beginning responsiveness testing.\n") } } else { fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Adding 15 seconds to calculate speculative RPM results.\n") @@ -590,8 +628,8 @@ func main() { ) } - perDirectionSelfRtts := ms.NewInfiniteMathematicalSeries[float64]() - perDirectionForeignRtts := ms.NewInfiniteMathematicalSeries[float64]() + perDirectionSelfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) + perDirectionForeignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber( proberOperatorCtx, @@ -611,61 +649,122 @@ func main() { select { case probeMeasurement := <-responsivenessStabilizationCommunicationChannel: { - foreignDataPoint := probeMeasurement.First - selfDataPoint := probeMeasurement.Second + switch probeMeasurement.Type { + case series.SeriesMessageReserve: + { + bucket := probeMeasurement.Bucket + if *debugCliFlag { + fmt.Printf( + "%s: Reserving a responsiveness bucket with id %v.\n", direction.DirectionLabel, bucket) + } + responsivenessStabilizer.Reserve(bucket) + selfRtts.Reserve(bucket) + foreignRtts.Reserve(bucket) + perDirectionForeignRtts.Reserve(bucket) + perDirectionSelfRtts.Reserve(bucket) + } + case series.SeriesMessageMeasure: + { + bucket := probeMeasurement.Bucket + measurement := utilities.GetSome(probeMeasurement.Measure) + foreignDataPoint := measurement.Foreign + selfDataPoint := measurement.Self - responsivenessStabilizer.AddMeasurement( - (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds()) + if *debugCliFlag { + fmt.Printf( + "%s: Filling a responsiveness bucket with id %v with value %v.\n", + direction.DirectionLabel, bucket, measurement) + } + responsivenessStabilizer.AddMeasurement(bucket, + (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds()) - // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy - // is *actually* important, but it can't hurt? - direction.StableResponsiveness = responsivenessStabilizer.IsStable() + if err := selfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (selfRtts)\n", bucket) + } + if perDirectionSelfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionSelfRtts)\n", bucket) + } - if *debugCliFlag { - fmt.Printf( - "################# Responsiveness is instantaneously %s.\n", - utilities.Conditional(direction.StableResponsiveness, "stable", "unstable")) - } - // There may be more than one round trip accumulated together. If that is the case, - // we will blow them apart in to three separate measurements and each one will just - // be 1 / measurement.RoundTripCount of the total length. - for range utilities.Iota(0, int(foreignDataPoint.RoundTripCount)) { - foreignRtts.AddElement(foreignDataPoint.Duration.Seconds() / - float64(foreignDataPoint.RoundTripCount)) - perDirectionForeignRtts.AddElement(foreignDataPoint.Duration.Seconds() / - float64(foreignDataPoint.RoundTripCount)) - } - selfRtts.AddElement(selfDataPoint.Duration.Seconds()) - perDirectionSelfRtts.AddElement(selfDataPoint.Duration.Seconds()) + if foreignRtts.Fill(bucket, foreignDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (foreignRtts)\n", bucket) + } - if selfRttsQualityAttenuation != nil { - selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds()) - } + if perDirectionForeignRtts.Fill(bucket, + foreignDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionForeignRtts)\n", bucket) + } + + if selfRttsQualityAttenuation != nil { + selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds()) + } - direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint) - direction.SelfProbeDataLogger.LogRecord(*selfDataPoint) + direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint) + direction.SelfProbeDataLogger.LogRecord(*selfDataPoint) + + } + } } case throughputMeasurement := <-lgStabilizationCommunicationChannel: { - if *debugCliFlag { - fmt.Printf("Adding a throughput measurement.\n") - } - // There may be more than one round trip accumulated together. If that is the case, - direction.ThroughputDataLogger.LogRecord(throughputMeasurement) - for i := range throughputMeasurement.GranularThroughputDataPoints { - datapoint := throughputMeasurement.GranularThroughputDataPoints[i] - datapoint.Direction = direction.DirectionLabel - direction.GranularThroughputDataLogger.LogRecord(datapoint) - } + switch throughputMeasurement.Type { + case series.SeriesMessageReserve: + { + // We are no longer tracking stability, so reservation messages are useless! + if *debugCliFlag { + fmt.Printf( + "%s: Discarding a throughput bucket with id %v when ascertaining responsiveness.\n", + direction.DirectionLabel, throughputMeasurement.Bucket) + } + } + case series.SeriesMessageMeasure: + { + measurement := utilities.GetSome(throughputMeasurement.Measure) - lastThroughputRate = throughputMeasurement.Throughput - lastThroughputOpenConnectionCount = throughputMeasurement.Connections + if *debugCliFlag { + fmt.Printf("Adding a throughput measurement (while ascertaining responsiveness).\n") + } + // There may be more than one round trip accumulated together. If that is the case, + direction.ThroughputDataLogger.LogRecord(measurement) + for _, v := range measurement.GranularThroughputDataPoints { + v.Direction = direction.DirectionLabel + direction.GranularThroughputDataLogger.LogRecord(v) + } + lastThroughputRate = measurement.Throughput + lastThroughputOpenConnectionCount = measurement.Connections + } + } } case <-timeoutChannel: { break responsiveness_timeout } + case <-stabilityCheckTimeChannel: + { + if *debugCliFlag { + fmt.Printf( + "%v responsiveness stability interval is complete.\n", direction.DirectionLabel) + } + + stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel = timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) + + // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy + // is *actually* important, but it can't hurt? + direction.StableResponsiveness = responsivenessStabilizer.IsStable() + + if *debugCliFlag { + fmt.Printf( + "Responsiveness is instantaneously %s.\n", + utilities.Conditional(direction.StableResponsiveness, "stable", "unstable")) + } + + responsivenessStabilizer.Interval() + } } } @@ -753,7 +852,7 @@ func main() { } fmt.Printf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, directionResult.PNRpm, 90) - fmt.Printf("%s RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel, + fmt.Printf("%s RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel, directionResult.MeanRpm, specParameters.TrimmedMeanPct) if len(*prometheusStatsFilename) > 0 { @@ -808,7 +907,7 @@ func main() { } fmt.Printf("Final RPM: %5.0f (P%d)\n", result.PNRpm, 90) - fmt.Printf("Final RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n", + fmt.Printf("Final RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", result.MeanRpm, specParameters.TrimmedMeanPct) // Stop the world. |
