summaryrefslogtreecommitdiff
path: root/stabilizer/algorithm.go
diff options
context:
space:
mode:
Diffstat (limited to 'stabilizer/algorithm.go')
-rw-r--r--stabilizer/algorithm.go176
1 files changed, 131 insertions, 45 deletions
diff --git a/stabilizer/algorithm.go b/stabilizer/algorithm.go
index 45c34d9..86b4bb6 100644
--- a/stabilizer/algorithm.go
+++ b/stabilizer/algorithm.go
@@ -19,19 +19,23 @@ import (
"sync"
"github.com/network-quality/goresponsiveness/debug"
- "github.com/network-quality/goresponsiveness/ms"
+ "github.com/network-quality/goresponsiveness/series"
+ "github.com/network-quality/goresponsiveness/utilities"
"golang.org/x/exp/constraints"
)
-type MeasurementStablizer[T constraints.Float | constraints.Integer] struct {
- instantaneousses ms.MathematicalSeries[T]
- aggregates ms.MathematicalSeries[float64]
+type MeasurementStablizer[Data constraints.Float | constraints.Integer, Bucket constraints.Ordered] struct {
+ // The number of instantaneous measurements in the current interval could be infinite (Forever).
+ instantaneousses series.WindowSeries[Data, Bucket]
+ // There are a fixed, finite number of aggregates (WindowOnly).
+ aggregates series.WindowSeries[series.WindowSeries[Data, Bucket], int]
stabilityStandardDeviation float64
trimmingLevel uint
m sync.Mutex
dbgLevel debug.DebugLevel
dbgConfig *debug.DebugWithPrefix
units string
+ currentInterval int
}
// Stabilizer parameters:
@@ -54,77 +58,159 @@ type MeasurementStablizer[T constraints.Float | constraints.Integer] struct {
// If the calculated standard deviation of *those* values is less than SDT, we declare
// stability.
-func NewStabilizer[T constraints.Float | constraints.Integer](
- mad uint,
+func NewStabilizer[Data constraints.Float | constraints.Integer, Bucket constraints.Ordered](
+ mad int,
sdt float64,
trimmingLevel uint,
units string,
debugLevel debug.DebugLevel,
debug *debug.DebugWithPrefix,
-) MeasurementStablizer[T] {
- return MeasurementStablizer[T]{
- instantaneousses: ms.NewCappedMathematicalSeries[T](mad),
- aggregates: ms.NewCappedMathematicalSeries[float64](mad),
+) MeasurementStablizer[Data, Bucket] {
+ return MeasurementStablizer[Data, Bucket]{
+ instantaneousses: series.NewWindowSeries[Data, Bucket](series.Forever, 0),
+ aggregates: series.NewWindowSeries[
+ series.WindowSeries[Data, Bucket], int](series.WindowOnly, mad),
stabilityStandardDeviation: sdt,
trimmingLevel: trimmingLevel,
units: units,
+ currentInterval: 0,
dbgConfig: debug,
dbgLevel: debugLevel,
}
}
-func (r3 *MeasurementStablizer[T]) AddMeasurement(measurement T) {
+func (r3 *MeasurementStablizer[Data, Bucket]) Reserve(bucket Bucket) {
+ r3.m.Lock()
+ defer r3.m.Unlock()
+ r3.instantaneousses.Reserve(bucket)
+}
+
+func (r3 *MeasurementStablizer[Data, Bucket]) AddMeasurement(bucket Bucket, measurement Data) {
+ r3.m.Lock()
+ defer r3.m.Unlock()
+
+ // Fill in the bucket in the current interval.
+ if err := r3.instantaneousses.Fill(bucket, measurement); err != nil {
+ if debug.IsDebug(r3.dbgLevel) {
+ fmt.Printf("%s: A bucket (with id %v) does not exist in the isntantaneousses.\n",
+ r3.dbgConfig.String(),
+ bucket)
+ }
+ }
+
+ // The result may have been retired from the current interval. Look in the older series
+ // to fill it in there if it is.
+ r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) {
+ if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) {
+ md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md)
+ if err := md.Fill(bucket, measurement); err != nil {
+ if debug.IsDebug(r3.dbgLevel) {
+ fmt.Printf("%s: A bucket (with id %v) does not exist in a historical window.\n",
+ r3.dbgConfig.String(),
+ bucket)
+ }
+ }
+ }
+ })
+
+ /*
+ // Add this instantaneous measurement to the mix of the MAD previous instantaneous measurements.
+ r3.instantaneousses.Fill(bucket, measurement)
+ // Calculate the moving average of the MAD previous instantaneous measurements (what the
+ // algorithm calls moving average aggregate throughput at interval p) and add it to
+ // the mix of MAD previous moving averages.
+
+ r3.aggregates.AutoFill(r3.instantaneousses.CalculateAverage())
+
+ if debug.IsDebug(r3.dbgLevel) {
+ fmt.Printf(
+ "%s: MA: %f Mbps (previous %d intervals).\n",
+ r3.dbgConfig.String(),
+ r3.aggregates.CalculateAverage(),
+ r3.aggregates.Len(),
+ )
+ }
+ */
+}
+
+func (r3 *MeasurementStablizer[Data, Bucket]) Interval() {
r3.m.Lock()
defer r3.m.Unlock()
- // Add this instantaneous measurement to the mix of the MAD previous instantaneous measurements.
- r3.instantaneousses.AddElement(measurement)
- // Calculate the moving average of the MAD previous instantaneous measurements (what the
- // algorithm calls moving average aggregate throughput at interval p) and add it to
- // the mix of MAD previous moving averages.
- r3.aggregates.AddElement(r3.instantaneousses.CalculateAverage())
if debug.IsDebug(r3.dbgLevel) {
fmt.Printf(
- "%s: MA: %f Mbps (previous %d intervals).\n",
+ "%s: stability interval marked (transitioning from %d to %d).\n",
r3.dbgConfig.String(),
- r3.aggregates.CalculateAverage(),
- r3.aggregates.Len(),
+ r3.currentInterval,
+ r3.currentInterval+1,
)
}
-}
-func (r3 *MeasurementStablizer[T]) IsStable() bool {
- // There are MAD number of measurements of the _moving average aggregate throughput
- // at interval p_ in movingAverages.
- isvalid, stddev := r3.aggregates.StandardDeviation()
+ // At the interval boundary, move the instantaneous series to
+ // the aggregates and start a new instantaneous series.
+ r3.aggregates.Reserve(r3.currentInterval)
+ r3.aggregates.Fill(r3.currentInterval, r3.instantaneousses)
- if !isvalid {
- // If there are not enough values in the series to be able to calculate a
- // standard deviation, then we know that we are not yet stable. Vamoose.
- return false
- }
+ r3.instantaneousses = series.NewWindowSeries[Data, Bucket](series.Forever, 0)
+ r3.currentInterval++
+}
- // Stability is determined by whether or not the standard deviation of the values
- // is within some percentage of the average.
- stabilityCutoff := r3.aggregates.CalculateAverage() * (r3.stabilityStandardDeviation / 100.0)
- isStable := stddev <= stabilityCutoff
+func (r3 *MeasurementStablizer[Data, Bucket]) IsStable() bool {
+ r3.m.Lock()
+ defer r3.m.Unlock()
if debug.IsDebug(r3.dbgLevel) {
fmt.Printf(
- "%s: Is Stable? %v; Standard Deviation: %f %s; Is Normally Distributed? %v; Standard Deviation Cutoff: %v %s).\n",
+ "%s: Determining stability in the %d th interval.\n",
r3.dbgConfig.String(),
- isStable,
- stddev,
- r3.units,
- r3.aggregates.IsNormallyDistributed(),
- stabilityCutoff,
- r3.units,
+ r3.currentInterval,
)
- fmt.Printf("%s: Values: ", r3.dbgConfig.String())
- for _, v := range r3.aggregates.Values() {
- fmt.Printf("%v, ", v)
+ }
+ // Determine if
+ // a) All the aggregates have values,
+ // b) All the aggregates are complete.
+ allComplete := true
+ r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) {
+ if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) {
+ md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md)
+ allComplete = md.Complete()
+ if debug.IsDebug(r3.dbgLevel) {
+ fmt.Printf("%s\n", md.String())
+ }
+ } else {
+ allComplete = false
}
- fmt.Printf("\n")
+ if debug.IsDebug(r3.dbgLevel) {
+ fmt.Printf(
+ "%s: The aggregate for the %d th interval was %s.\n",
+ r3.dbgConfig.String(),
+ b,
+ utilities.Conditional(allComplete, "complete", "incomplete"),
+ )
+ }
+ })
+
+ if !allComplete {
+ return false
}
+
+ // Calculate the averages of each of the aggregates.
+ averages := make([]float64, 0)
+ r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) {
+ if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) {
+ md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md)
+ _, average := series.CalculateAverage(md)
+ averages = append(averages, average)
+ }
+ })
+
+ // Calculate the standard deviation of the averages of the aggregates.
+ sd := utilities.CalculateStandardDeviation(averages)
+
+ // Take a percentage of the average of the averages of the aggregates ...
+ stabilityCutoff := utilities.CalculateAverage(averages) * (r3.stabilityStandardDeviation / 100.0)
+ // and compare that to the standard deviation to determine stability.
+ isStable := sd <= stabilityCutoff
+
return isStable
}