diff options
| author | Will Hawkins <[email protected]> | 2023-07-10 13:45:50 -0400 |
|---|---|---|
| committer | Will Hawkins <[email protected]> | 2023-07-10 13:45:50 -0400 |
| commit | 78d574a74665c8bc062c26755c80a8b524bce347 (patch) | |
| tree | 7ad65f0052defaea63acb2f3445be00ef97e24d6 /stabilizer/algorithm.go | |
| parent | fe17152a507bbf94a11cca7f49a51cbae9c0d67b (diff) | |
[Feature] Major update: Track measurements that may be delayed
Among other major feature additions, this version of the client tracks
any measurements that may be long delayed and considers their presence
or absence as part of a stability measurement.
This version of the client also more closely tracks the spec. In
particular, it performs a sinle-sided trimmed mean rather than a
double-sided trimmed mean.
Signed-off-by: Will Hawkins <[email protected]>
Diffstat (limited to 'stabilizer/algorithm.go')
| -rw-r--r-- | stabilizer/algorithm.go | 176 |
1 files changed, 131 insertions, 45 deletions
diff --git a/stabilizer/algorithm.go b/stabilizer/algorithm.go index 45c34d9..86b4bb6 100644 --- a/stabilizer/algorithm.go +++ b/stabilizer/algorithm.go @@ -19,19 +19,23 @@ import ( "sync" "github.com/network-quality/goresponsiveness/debug" - "github.com/network-quality/goresponsiveness/ms" + "github.com/network-quality/goresponsiveness/series" + "github.com/network-quality/goresponsiveness/utilities" "golang.org/x/exp/constraints" ) -type MeasurementStablizer[T constraints.Float | constraints.Integer] struct { - instantaneousses ms.MathematicalSeries[T] - aggregates ms.MathematicalSeries[float64] +type MeasurementStablizer[Data constraints.Float | constraints.Integer, Bucket constraints.Ordered] struct { + // The number of instantaneous measurements in the current interval could be infinite (Forever). + instantaneousses series.WindowSeries[Data, Bucket] + // There are a fixed, finite number of aggregates (WindowOnly). + aggregates series.WindowSeries[series.WindowSeries[Data, Bucket], int] stabilityStandardDeviation float64 trimmingLevel uint m sync.Mutex dbgLevel debug.DebugLevel dbgConfig *debug.DebugWithPrefix units string + currentInterval int } // Stabilizer parameters: @@ -54,77 +58,159 @@ type MeasurementStablizer[T constraints.Float | constraints.Integer] struct { // If the calculated standard deviation of *those* values is less than SDT, we declare // stability. -func NewStabilizer[T constraints.Float | constraints.Integer]( - mad uint, +func NewStabilizer[Data constraints.Float | constraints.Integer, Bucket constraints.Ordered]( + mad int, sdt float64, trimmingLevel uint, units string, debugLevel debug.DebugLevel, debug *debug.DebugWithPrefix, -) MeasurementStablizer[T] { - return MeasurementStablizer[T]{ - instantaneousses: ms.NewCappedMathematicalSeries[T](mad), - aggregates: ms.NewCappedMathematicalSeries[float64](mad), +) MeasurementStablizer[Data, Bucket] { + return MeasurementStablizer[Data, Bucket]{ + instantaneousses: series.NewWindowSeries[Data, Bucket](series.Forever, 0), + aggregates: series.NewWindowSeries[ + series.WindowSeries[Data, Bucket], int](series.WindowOnly, mad), stabilityStandardDeviation: sdt, trimmingLevel: trimmingLevel, units: units, + currentInterval: 0, dbgConfig: debug, dbgLevel: debugLevel, } } -func (r3 *MeasurementStablizer[T]) AddMeasurement(measurement T) { +func (r3 *MeasurementStablizer[Data, Bucket]) Reserve(bucket Bucket) { + r3.m.Lock() + defer r3.m.Unlock() + r3.instantaneousses.Reserve(bucket) +} + +func (r3 *MeasurementStablizer[Data, Bucket]) AddMeasurement(bucket Bucket, measurement Data) { + r3.m.Lock() + defer r3.m.Unlock() + + // Fill in the bucket in the current interval. + if err := r3.instantaneousses.Fill(bucket, measurement); err != nil { + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf("%s: A bucket (with id %v) does not exist in the isntantaneousses.\n", + r3.dbgConfig.String(), + bucket) + } + } + + // The result may have been retired from the current interval. Look in the older series + // to fill it in there if it is. + r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) { + if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) { + md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md) + if err := md.Fill(bucket, measurement); err != nil { + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf("%s: A bucket (with id %v) does not exist in a historical window.\n", + r3.dbgConfig.String(), + bucket) + } + } + } + }) + + /* + // Add this instantaneous measurement to the mix of the MAD previous instantaneous measurements. + r3.instantaneousses.Fill(bucket, measurement) + // Calculate the moving average of the MAD previous instantaneous measurements (what the + // algorithm calls moving average aggregate throughput at interval p) and add it to + // the mix of MAD previous moving averages. + + r3.aggregates.AutoFill(r3.instantaneousses.CalculateAverage()) + + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf( + "%s: MA: %f Mbps (previous %d intervals).\n", + r3.dbgConfig.String(), + r3.aggregates.CalculateAverage(), + r3.aggregates.Len(), + ) + } + */ +} + +func (r3 *MeasurementStablizer[Data, Bucket]) Interval() { r3.m.Lock() defer r3.m.Unlock() - // Add this instantaneous measurement to the mix of the MAD previous instantaneous measurements. - r3.instantaneousses.AddElement(measurement) - // Calculate the moving average of the MAD previous instantaneous measurements (what the - // algorithm calls moving average aggregate throughput at interval p) and add it to - // the mix of MAD previous moving averages. - r3.aggregates.AddElement(r3.instantaneousses.CalculateAverage()) if debug.IsDebug(r3.dbgLevel) { fmt.Printf( - "%s: MA: %f Mbps (previous %d intervals).\n", + "%s: stability interval marked (transitioning from %d to %d).\n", r3.dbgConfig.String(), - r3.aggregates.CalculateAverage(), - r3.aggregates.Len(), + r3.currentInterval, + r3.currentInterval+1, ) } -} -func (r3 *MeasurementStablizer[T]) IsStable() bool { - // There are MAD number of measurements of the _moving average aggregate throughput - // at interval p_ in movingAverages. - isvalid, stddev := r3.aggregates.StandardDeviation() + // At the interval boundary, move the instantaneous series to + // the aggregates and start a new instantaneous series. + r3.aggregates.Reserve(r3.currentInterval) + r3.aggregates.Fill(r3.currentInterval, r3.instantaneousses) - if !isvalid { - // If there are not enough values in the series to be able to calculate a - // standard deviation, then we know that we are not yet stable. Vamoose. - return false - } + r3.instantaneousses = series.NewWindowSeries[Data, Bucket](series.Forever, 0) + r3.currentInterval++ +} - // Stability is determined by whether or not the standard deviation of the values - // is within some percentage of the average. - stabilityCutoff := r3.aggregates.CalculateAverage() * (r3.stabilityStandardDeviation / 100.0) - isStable := stddev <= stabilityCutoff +func (r3 *MeasurementStablizer[Data, Bucket]) IsStable() bool { + r3.m.Lock() + defer r3.m.Unlock() if debug.IsDebug(r3.dbgLevel) { fmt.Printf( - "%s: Is Stable? %v; Standard Deviation: %f %s; Is Normally Distributed? %v; Standard Deviation Cutoff: %v %s).\n", + "%s: Determining stability in the %d th interval.\n", r3.dbgConfig.String(), - isStable, - stddev, - r3.units, - r3.aggregates.IsNormallyDistributed(), - stabilityCutoff, - r3.units, + r3.currentInterval, ) - fmt.Printf("%s: Values: ", r3.dbgConfig.String()) - for _, v := range r3.aggregates.Values() { - fmt.Printf("%v, ", v) + } + // Determine if + // a) All the aggregates have values, + // b) All the aggregates are complete. + allComplete := true + r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) { + if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) { + md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md) + allComplete = md.Complete() + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf("%s\n", md.String()) + } + } else { + allComplete = false } - fmt.Printf("\n") + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf( + "%s: The aggregate for the %d th interval was %s.\n", + r3.dbgConfig.String(), + b, + utilities.Conditional(allComplete, "complete", "incomplete"), + ) + } + }) + + if !allComplete { + return false } + + // Calculate the averages of each of the aggregates. + averages := make([]float64, 0) + r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) { + if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) { + md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md) + _, average := series.CalculateAverage(md) + averages = append(averages, average) + } + }) + + // Calculate the standard deviation of the averages of the aggregates. + sd := utilities.CalculateStandardDeviation(averages) + + // Take a percentage of the average of the averages of the aggregates ... + stabilityCutoff := utilities.CalculateAverage(averages) * (r3.stabilityStandardDeviation / 100.0) + // and compare that to the standard deviation to determine stability. + isStable := sd <= stabilityCutoff + return isStable } |
