summaryrefslogtreecommitdiff
path: root/networkQuality.go
diff options
context:
space:
mode:
Diffstat (limited to 'networkQuality.go')
-rw-r--r--networkQuality.go227
1 files changed, 163 insertions, 64 deletions
diff --git a/networkQuality.go b/networkQuality.go
index 776c0e7..aa8d854 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -33,10 +33,10 @@ import (
"github.com/network-quality/goresponsiveness/direction"
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
- "github.com/network-quality/goresponsiveness/ms"
"github.com/network-quality/goresponsiveness/probe"
"github.com/network-quality/goresponsiveness/qualityattenuation"
"github.com/network-quality/goresponsiveness/rpm"
+ "github.com/network-quality/goresponsiveness/series"
"github.com/network-quality/goresponsiveness/stabilizer"
"github.com/network-quality/goresponsiveness/timeoutat"
"github.com/network-quality/goresponsiveness/utilities"
@@ -471,8 +471,8 @@ func main() {
}
// All tests will accumulate data to these series because it will all matter for RPM calculation!
- selfRtts := ms.NewInfiniteMathematicalSeries[float64]()
- foreignRtts := ms.NewInfiniteMathematicalSeries[float64]()
+ selfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0)
+ foreignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0)
var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil
if *printQualityAttenuation {
@@ -521,8 +521,8 @@ func main() {
if *debugCliFlag {
downloadThroughputStabilizerDebugLevel = debug.Debug
}
- throughputStabilizer := stabilizer.NewStabilizer[float64](
- uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance, 0, "bytes",
+ throughputStabilizer := stabilizer.NewStabilizer[float64, uint64](
+ specParameters.MovingAvgDist, specParameters.StdDevTolerance, 0, "bytes",
downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig)
responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug,
@@ -531,8 +531,8 @@ func main() {
if *debugCliFlag {
responsivenessStabilizerDebugLevel = debug.Debug
}
- responsivenessStabilizer := stabilizer.NewStabilizer[int64](
- uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance,
+ responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint](
+ specParameters.MovingAvgDist, specParameters.StdDevTolerance,
specParameters.TrimmedMeanPct, "milliseconds",
responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig)
@@ -541,32 +541,70 @@ func main() {
lastThroughputRate := float64(0)
lastThroughputOpenConnectionCount := int(0)
+ stabilityCheckTime := time.Now().Add(specParameters.EvalInterval)
+ stabilityCheckTimeChannel := timeoutat.TimeoutAt(
+ operatingCtx,
+ stabilityCheckTime,
+ debugLevel,
+ )
+
lg_timeout:
for !direction.StableThroughput {
select {
case throughputMeasurement := <-lgStabilizationCommunicationChannel:
{
- throughputStabilizer.AddMeasurement(
- throughputMeasurement.Throughput)
+ switch throughputMeasurement.Type {
+ case series.SeriesMessageReserve:
+ {
+ throughputStabilizer.Reserve(throughputMeasurement.Bucket)
+ if *debugCliFlag {
+ fmt.Printf(
+ "%s: Reserving a throughput bucket with id %v.\n",
+ direction.DirectionLabel, throughputMeasurement.Bucket)
+ }
+ }
+ case series.SeriesMessageMeasure:
+ {
+ bucket := throughputMeasurement.Bucket
+ measurement := utilities.GetSome(throughputMeasurement.Measure)
+
+ throughputStabilizer.AddMeasurement(bucket, measurement.Throughput)
+
+ direction.ThroughputDataLogger.LogRecord(measurement)
+ for _, v := range measurement.GranularThroughputDataPoints {
+ v.Direction = "Download"
+ direction.GranularThroughputDataLogger.LogRecord(v)
+ }
+
+ lastThroughputRate = measurement.Throughput
+ lastThroughputOpenConnectionCount = measurement.Connections
+ }
+ }
+ }
+ case <-stabilityCheckTimeChannel:
+ {
+ if *debugCliFlag {
+ fmt.Printf(
+ "%v throughput stability interval is complete.\n", direction.DirectionLabel)
+ }
+ stabilityCheckTime = time.Now().Add(specParameters.EvalInterval)
+ stabilityCheckTimeChannel = timeoutat.TimeoutAt(
+ operatingCtx,
+ stabilityCheckTime,
+ debugLevel,
+ )
+
direction.StableThroughput = throughputStabilizer.IsStable()
if *debugCliFlag {
fmt.Printf(
- "################# %v is instantaneously %s.\n", direction.DirectionLabel,
+ "%v is instantaneously %s.\n", direction.DirectionLabel,
utilities.Conditional(direction.StableThroughput, "stable", "unstable"))
}
- direction.ThroughputDataLogger.LogRecord(throughputMeasurement)
- for i := range throughputMeasurement.GranularThroughputDataPoints {
- datapoint := throughputMeasurement.GranularThroughputDataPoints[i]
- datapoint.Direction = "Download"
- direction.GranularThroughputDataLogger.LogRecord(datapoint)
- }
-
- lastThroughputRate = throughputMeasurement.Throughput
- lastThroughputOpenConnectionCount = throughputMeasurement.Connections
if direction.StableThroughput {
throughputGeneratorCtxCancel()
}
+ throughputStabilizer.Interval()
}
case <-timeoutChannel:
{
@@ -577,7 +615,7 @@ func main() {
if direction.StableThroughput {
if *debugCliFlag {
- fmt.Printf("################# Throughput is stable; beginning responsiveness testing.\n")
+ fmt.Printf("Throughput is stable; beginning responsiveness testing.\n")
}
} else {
fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Adding 15 seconds to calculate speculative RPM results.\n")
@@ -590,8 +628,8 @@ func main() {
)
}
- perDirectionSelfRtts := ms.NewInfiniteMathematicalSeries[float64]()
- perDirectionForeignRtts := ms.NewInfiniteMathematicalSeries[float64]()
+ perDirectionSelfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0)
+ perDirectionForeignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0)
responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber(
proberOperatorCtx,
@@ -611,61 +649,122 @@ func main() {
select {
case probeMeasurement := <-responsivenessStabilizationCommunicationChannel:
{
- foreignDataPoint := probeMeasurement.First
- selfDataPoint := probeMeasurement.Second
+ switch probeMeasurement.Type {
+ case series.SeriesMessageReserve:
+ {
+ bucket := probeMeasurement.Bucket
+ if *debugCliFlag {
+ fmt.Printf(
+ "%s: Reserving a responsiveness bucket with id %v.\n", direction.DirectionLabel, bucket)
+ }
+ responsivenessStabilizer.Reserve(bucket)
+ selfRtts.Reserve(bucket)
+ foreignRtts.Reserve(bucket)
+ perDirectionForeignRtts.Reserve(bucket)
+ perDirectionSelfRtts.Reserve(bucket)
+ }
+ case series.SeriesMessageMeasure:
+ {
+ bucket := probeMeasurement.Bucket
+ measurement := utilities.GetSome(probeMeasurement.Measure)
+ foreignDataPoint := measurement.Foreign
+ selfDataPoint := measurement.Self
- responsivenessStabilizer.AddMeasurement(
- (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds())
+ if *debugCliFlag {
+ fmt.Printf(
+ "%s: Filling a responsiveness bucket with id %v with value %v.\n",
+ direction.DirectionLabel, bucket, measurement)
+ }
+ responsivenessStabilizer.AddMeasurement(bucket,
+ (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds())
- // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy
- // is *actually* important, but it can't hurt?
- direction.StableResponsiveness = responsivenessStabilizer.IsStable()
+ if err := selfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil {
+ fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (selfRtts)\n", bucket)
+ }
+ if perDirectionSelfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil {
+ fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionSelfRtts)\n", bucket)
+ }
- if *debugCliFlag {
- fmt.Printf(
- "################# Responsiveness is instantaneously %s.\n",
- utilities.Conditional(direction.StableResponsiveness, "stable", "unstable"))
- }
- // There may be more than one round trip accumulated together. If that is the case,
- // we will blow them apart in to three separate measurements and each one will just
- // be 1 / measurement.RoundTripCount of the total length.
- for range utilities.Iota(0, int(foreignDataPoint.RoundTripCount)) {
- foreignRtts.AddElement(foreignDataPoint.Duration.Seconds() /
- float64(foreignDataPoint.RoundTripCount))
- perDirectionForeignRtts.AddElement(foreignDataPoint.Duration.Seconds() /
- float64(foreignDataPoint.RoundTripCount))
- }
- selfRtts.AddElement(selfDataPoint.Duration.Seconds())
- perDirectionSelfRtts.AddElement(selfDataPoint.Duration.Seconds())
+ if foreignRtts.Fill(bucket, foreignDataPoint.Duration.Seconds()); err != nil {
+ fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (foreignRtts)\n", bucket)
+ }
- if selfRttsQualityAttenuation != nil {
- selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds())
- }
+ if perDirectionForeignRtts.Fill(bucket,
+ foreignDataPoint.Duration.Seconds()); err != nil {
+ fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionForeignRtts)\n", bucket)
+ }
+
+ if selfRttsQualityAttenuation != nil {
+ selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds())
+ }
- direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint)
- direction.SelfProbeDataLogger.LogRecord(*selfDataPoint)
+ direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint)
+ direction.SelfProbeDataLogger.LogRecord(*selfDataPoint)
+
+ }
+ }
}
case throughputMeasurement := <-lgStabilizationCommunicationChannel:
{
- if *debugCliFlag {
- fmt.Printf("Adding a throughput measurement.\n")
- }
- // There may be more than one round trip accumulated together. If that is the case,
- direction.ThroughputDataLogger.LogRecord(throughputMeasurement)
- for i := range throughputMeasurement.GranularThroughputDataPoints {
- datapoint := throughputMeasurement.GranularThroughputDataPoints[i]
- datapoint.Direction = direction.DirectionLabel
- direction.GranularThroughputDataLogger.LogRecord(datapoint)
- }
+ switch throughputMeasurement.Type {
+ case series.SeriesMessageReserve:
+ {
+ // We are no longer tracking stability, so reservation messages are useless!
+ if *debugCliFlag {
+ fmt.Printf(
+ "%s: Discarding a throughput bucket with id %v when ascertaining responsiveness.\n",
+ direction.DirectionLabel, throughputMeasurement.Bucket)
+ }
+ }
+ case series.SeriesMessageMeasure:
+ {
+ measurement := utilities.GetSome(throughputMeasurement.Measure)
- lastThroughputRate = throughputMeasurement.Throughput
- lastThroughputOpenConnectionCount = throughputMeasurement.Connections
+ if *debugCliFlag {
+ fmt.Printf("Adding a throughput measurement (while ascertaining responsiveness).\n")
+ }
+ // There may be more than one round trip accumulated together. If that is the case,
+ direction.ThroughputDataLogger.LogRecord(measurement)
+ for _, v := range measurement.GranularThroughputDataPoints {
+ v.Direction = direction.DirectionLabel
+ direction.GranularThroughputDataLogger.LogRecord(v)
+ }
+ lastThroughputRate = measurement.Throughput
+ lastThroughputOpenConnectionCount = measurement.Connections
+ }
+ }
}
case <-timeoutChannel:
{
break responsiveness_timeout
}
+ case <-stabilityCheckTimeChannel:
+ {
+ if *debugCliFlag {
+ fmt.Printf(
+ "%v responsiveness stability interval is complete.\n", direction.DirectionLabel)
+ }
+
+ stabilityCheckTime = time.Now().Add(specParameters.EvalInterval)
+ stabilityCheckTimeChannel = timeoutat.TimeoutAt(
+ operatingCtx,
+ stabilityCheckTime,
+ debugLevel,
+ )
+
+ // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy
+ // is *actually* important, but it can't hurt?
+ direction.StableResponsiveness = responsivenessStabilizer.IsStable()
+
+ if *debugCliFlag {
+ fmt.Printf(
+ "Responsiveness is instantaneously %s.\n",
+ utilities.Conditional(direction.StableResponsiveness, "stable", "unstable"))
+ }
+
+ responsivenessStabilizer.Interval()
+ }
}
}
@@ -753,7 +852,7 @@ func main() {
}
fmt.Printf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, directionResult.PNRpm, 90)
- fmt.Printf("%s RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel,
+ fmt.Printf("%s RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel,
directionResult.MeanRpm, specParameters.TrimmedMeanPct)
if len(*prometheusStatsFilename) > 0 {
@@ -808,7 +907,7 @@ func main() {
}
fmt.Printf("Final RPM: %5.0f (P%d)\n", result.PNRpm, 90)
- fmt.Printf("Final RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n",
+ fmt.Printf("Final RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n",
result.MeanRpm, specParameters.TrimmedMeanPct)
// Stop the world.