diff options
Diffstat (limited to 'stabilizer/algorithm.go')
| -rw-r--r-- | stabilizer/algorithm.go | 176 |
1 files changed, 131 insertions, 45 deletions
diff --git a/stabilizer/algorithm.go b/stabilizer/algorithm.go index 45c34d9..86b4bb6 100644 --- a/stabilizer/algorithm.go +++ b/stabilizer/algorithm.go @@ -19,19 +19,23 @@ import ( "sync" "github.com/network-quality/goresponsiveness/debug" - "github.com/network-quality/goresponsiveness/ms" + "github.com/network-quality/goresponsiveness/series" + "github.com/network-quality/goresponsiveness/utilities" "golang.org/x/exp/constraints" ) -type MeasurementStablizer[T constraints.Float | constraints.Integer] struct { - instantaneousses ms.MathematicalSeries[T] - aggregates ms.MathematicalSeries[float64] +type MeasurementStablizer[Data constraints.Float | constraints.Integer, Bucket constraints.Ordered] struct { + // The number of instantaneous measurements in the current interval could be infinite (Forever). + instantaneousses series.WindowSeries[Data, Bucket] + // There are a fixed, finite number of aggregates (WindowOnly). + aggregates series.WindowSeries[series.WindowSeries[Data, Bucket], int] stabilityStandardDeviation float64 trimmingLevel uint m sync.Mutex dbgLevel debug.DebugLevel dbgConfig *debug.DebugWithPrefix units string + currentInterval int } // Stabilizer parameters: @@ -54,77 +58,159 @@ type MeasurementStablizer[T constraints.Float | constraints.Integer] struct { // If the calculated standard deviation of *those* values is less than SDT, we declare // stability. -func NewStabilizer[T constraints.Float | constraints.Integer]( - mad uint, +func NewStabilizer[Data constraints.Float | constraints.Integer, Bucket constraints.Ordered]( + mad int, sdt float64, trimmingLevel uint, units string, debugLevel debug.DebugLevel, debug *debug.DebugWithPrefix, -) MeasurementStablizer[T] { - return MeasurementStablizer[T]{ - instantaneousses: ms.NewCappedMathematicalSeries[T](mad), - aggregates: ms.NewCappedMathematicalSeries[float64](mad), +) MeasurementStablizer[Data, Bucket] { + return MeasurementStablizer[Data, Bucket]{ + instantaneousses: series.NewWindowSeries[Data, Bucket](series.Forever, 0), + aggregates: series.NewWindowSeries[ + series.WindowSeries[Data, Bucket], int](series.WindowOnly, mad), stabilityStandardDeviation: sdt, trimmingLevel: trimmingLevel, units: units, + currentInterval: 0, dbgConfig: debug, dbgLevel: debugLevel, } } -func (r3 *MeasurementStablizer[T]) AddMeasurement(measurement T) { +func (r3 *MeasurementStablizer[Data, Bucket]) Reserve(bucket Bucket) { + r3.m.Lock() + defer r3.m.Unlock() + r3.instantaneousses.Reserve(bucket) +} + +func (r3 *MeasurementStablizer[Data, Bucket]) AddMeasurement(bucket Bucket, measurement Data) { + r3.m.Lock() + defer r3.m.Unlock() + + // Fill in the bucket in the current interval. + if err := r3.instantaneousses.Fill(bucket, measurement); err != nil { + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf("%s: A bucket (with id %v) does not exist in the isntantaneousses.\n", + r3.dbgConfig.String(), + bucket) + } + } + + // The result may have been retired from the current interval. Look in the older series + // to fill it in there if it is. + r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) { + if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) { + md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md) + if err := md.Fill(bucket, measurement); err != nil { + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf("%s: A bucket (with id %v) does not exist in a historical window.\n", + r3.dbgConfig.String(), + bucket) + } + } + } + }) + + /* + // Add this instantaneous measurement to the mix of the MAD previous instantaneous measurements. + r3.instantaneousses.Fill(bucket, measurement) + // Calculate the moving average of the MAD previous instantaneous measurements (what the + // algorithm calls moving average aggregate throughput at interval p) and add it to + // the mix of MAD previous moving averages. + + r3.aggregates.AutoFill(r3.instantaneousses.CalculateAverage()) + + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf( + "%s: MA: %f Mbps (previous %d intervals).\n", + r3.dbgConfig.String(), + r3.aggregates.CalculateAverage(), + r3.aggregates.Len(), + ) + } + */ +} + +func (r3 *MeasurementStablizer[Data, Bucket]) Interval() { r3.m.Lock() defer r3.m.Unlock() - // Add this instantaneous measurement to the mix of the MAD previous instantaneous measurements. - r3.instantaneousses.AddElement(measurement) - // Calculate the moving average of the MAD previous instantaneous measurements (what the - // algorithm calls moving average aggregate throughput at interval p) and add it to - // the mix of MAD previous moving averages. - r3.aggregates.AddElement(r3.instantaneousses.CalculateAverage()) if debug.IsDebug(r3.dbgLevel) { fmt.Printf( - "%s: MA: %f Mbps (previous %d intervals).\n", + "%s: stability interval marked (transitioning from %d to %d).\n", r3.dbgConfig.String(), - r3.aggregates.CalculateAverage(), - r3.aggregates.Len(), + r3.currentInterval, + r3.currentInterval+1, ) } -} -func (r3 *MeasurementStablizer[T]) IsStable() bool { - // There are MAD number of measurements of the _moving average aggregate throughput - // at interval p_ in movingAverages. - isvalid, stddev := r3.aggregates.StandardDeviation() + // At the interval boundary, move the instantaneous series to + // the aggregates and start a new instantaneous series. + r3.aggregates.Reserve(r3.currentInterval) + r3.aggregates.Fill(r3.currentInterval, r3.instantaneousses) - if !isvalid { - // If there are not enough values in the series to be able to calculate a - // standard deviation, then we know that we are not yet stable. Vamoose. - return false - } + r3.instantaneousses = series.NewWindowSeries[Data, Bucket](series.Forever, 0) + r3.currentInterval++ +} - // Stability is determined by whether or not the standard deviation of the values - // is within some percentage of the average. - stabilityCutoff := r3.aggregates.CalculateAverage() * (r3.stabilityStandardDeviation / 100.0) - isStable := stddev <= stabilityCutoff +func (r3 *MeasurementStablizer[Data, Bucket]) IsStable() bool { + r3.m.Lock() + defer r3.m.Unlock() if debug.IsDebug(r3.dbgLevel) { fmt.Printf( - "%s: Is Stable? %v; Standard Deviation: %f %s; Is Normally Distributed? %v; Standard Deviation Cutoff: %v %s).\n", + "%s: Determining stability in the %d th interval.\n", r3.dbgConfig.String(), - isStable, - stddev, - r3.units, - r3.aggregates.IsNormallyDistributed(), - stabilityCutoff, - r3.units, + r3.currentInterval, ) - fmt.Printf("%s: Values: ", r3.dbgConfig.String()) - for _, v := range r3.aggregates.Values() { - fmt.Printf("%v, ", v) + } + // Determine if + // a) All the aggregates have values, + // b) All the aggregates are complete. + allComplete := true + r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) { + if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) { + md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md) + allComplete = md.Complete() + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf("%s\n", md.String()) + } + } else { + allComplete = false } - fmt.Printf("\n") + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf( + "%s: The aggregate for the %d th interval was %s.\n", + r3.dbgConfig.String(), + b, + utilities.Conditional(allComplete, "complete", "incomplete"), + ) + } + }) + + if !allComplete { + return false } + + // Calculate the averages of each of the aggregates. + averages := make([]float64, 0) + r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) { + if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) { + md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md) + _, average := series.CalculateAverage(md) + averages = append(averages, average) + } + }) + + // Calculate the standard deviation of the averages of the aggregates. + sd := utilities.CalculateStandardDeviation(averages) + + // Take a percentage of the average of the averages of the aggregates ... + stabilityCutoff := utilities.CalculateAverage(averages) * (r3.stabilityStandardDeviation / 100.0) + // and compare that to the standard deviation to determine stability. + isStable := sd <= stabilityCutoff + return isStable } |
