summaryrefslogtreecommitdiff
path: root/stabilizer/algorithm.go
blob: 42dc0c61cb98fdb09c20d596977f43ce4e5f37ea (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
/*
 * This file is part of Go Responsiveness.
 *
 * Go Responsiveness is free software: you can redistribute it and/or modify it under
 * the terms of the GNU General Public License as published by the Free Software Foundation,
 * either version 2 of the License, or (at your option) any later version.
 * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
 * PARTICULAR PURPOSE. See the GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
 */

package stabilizer

import (
	"fmt"
	"sync"

	"github.com/network-quality/goresponsiveness/debug"
	"github.com/network-quality/goresponsiveness/series"
	"github.com/network-quality/goresponsiveness/utilities"
	"golang.org/x/exp/constraints"
)

type MeasurementStablizer[Data constraints.Float | constraints.Integer, Bucket utilities.Number] struct {
	// The number of instantaneous measurements in the current interval could be infinite (Forever).
	instantaneousses series.WindowSeries[Data, Bucket]
	// There are a fixed, finite number of aggregates (WindowOnly).
	aggregates                 series.WindowSeries[series.WindowSeries[Data, Bucket], int]
	stabilityStandardDeviation float64
	trimmingLevel              uint
	m                          sync.Mutex
	dbgLevel                   debug.DebugLevel
	dbgConfig                  *debug.DebugWithPrefix
	units                      string
	currentInterval            int
}

// Stabilizer parameters:
// 1. MAD: An all-purpose value that determines the hysteresis of various calculations
//         that will affect saturation (of either throughput or responsiveness).
// 2: SDT: The standard deviation cutoff used to determine stability among the K preceding
//         moving averages of a measurement.
// 3: TMP: The percentage by which to trim the values before calculating the standard deviation
//         to determine whether the value is within acceptable range for stability (SDT).

// Stabilizer Algorithm:
// Throughput stabilization is achieved when the standard deviation of the MAD number of the most
// recent moving averages of instantaneous measurements is within an upper bound.
//
// Yes, that *is* a little confusing:
// The user will deliver us a steady diet of measurements of the number of bytes transmitted during the immediately
// previous interval. We will keep the MAD most recent of those measurements. Every time that we get a new
// measurement, we will recalculate the moving average of the MAD most instantaneous measurements. We will call that
// the moving average aggregate throughput at interval p. We keep the MAD most recent of those values.
// If the calculated standard deviation of *those* values is less than SDT, we declare
// stability.

func NewStabilizer[Data constraints.Float | constraints.Integer, Bucket utilities.Number](
	mad int,
	sdt float64,
	trimmingLevel uint,
	units string,
	debugLevel debug.DebugLevel,
	debug *debug.DebugWithPrefix,
) MeasurementStablizer[Data, Bucket] {
	return MeasurementStablizer[Data, Bucket]{
		instantaneousses: series.NewWindowSeries[Data, Bucket](series.Forever, 0),
		aggregates: series.NewWindowSeries[
			series.WindowSeries[Data, Bucket], int](series.WindowOnly, mad),
		stabilityStandardDeviation: sdt,
		trimmingLevel:              trimmingLevel,
		units:                      units,
		currentInterval:            0,
		dbgConfig:                  debug,
		dbgLevel:                   debugLevel,
	}
}

func (r3 *MeasurementStablizer[Data, Bucket]) Reserve(bucket Bucket) {
	r3.m.Lock()
	defer r3.m.Unlock()
	r3.instantaneousses.Reserve(bucket)
}

func (r3 *MeasurementStablizer[Data, Bucket]) AddMeasurement(bucket Bucket, measurement Data) {
	r3.m.Lock()
	defer r3.m.Unlock()

	// Fill in the bucket in the current interval.
	if err := r3.instantaneousses.Fill(bucket, measurement); err != nil {
		if debug.IsDebug(r3.dbgLevel) {
			fmt.Printf("%s: A bucket (with id %v) does not exist in the isntantaneousses.\n",
				r3.dbgConfig.String(),
				bucket)
		}
	}

	// The result may have been retired from the current interval. Look in the older series
	// to fill it in there if it is.
	r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) {
		if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) {
			md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md)
			if err := md.Fill(bucket, measurement); err != nil {
				if debug.IsDebug(r3.dbgLevel) {
					fmt.Printf("%s: A bucket (with id %v) does not exist in a historical window.\n",
						r3.dbgConfig.String(),
						bucket)
				}
			}
		}
	})

	/*
		// Add this instantaneous measurement to the mix of the MAD previous instantaneous measurements.
		r3.instantaneousses.Fill(bucket, measurement)
		// Calculate the moving average of the MAD previous instantaneous measurements (what the
		// algorithm calls moving average aggregate throughput at interval p) and add it to
		// the mix of MAD previous moving averages.

		r3.aggregates.AutoFill(r3.instantaneousses.CalculateAverage())

		if debug.IsDebug(r3.dbgLevel) {
			fmt.Printf(
				"%s: MA: %f Mbps (previous %d intervals).\n",
				r3.dbgConfig.String(),
				r3.aggregates.CalculateAverage(),
				r3.aggregates.Len(),
			)
		}
	*/
}

func (r3 *MeasurementStablizer[Data, Bucket]) Interval() {
	r3.m.Lock()
	defer r3.m.Unlock()

	if debug.IsDebug(r3.dbgLevel) {
		fmt.Printf(
			"%s: stability interval marked (transitioning from %d to %d).\n",
			r3.dbgConfig.String(),
			r3.currentInterval,
			r3.currentInterval+1,
		)
	}

	// At the interval boundary, move the instantaneous series to
	// the aggregates and start a new instantaneous series.
	r3.aggregates.Reserve(r3.currentInterval)
	r3.aggregates.Fill(r3.currentInterval, r3.instantaneousses)

	r3.instantaneousses = series.NewWindowSeries[Data, Bucket](series.Forever, 0)
	r3.currentInterval++
}

func (r3 *MeasurementStablizer[Data, Bucket]) IsStable() bool {
	r3.m.Lock()
	defer r3.m.Unlock()

	if debug.IsDebug(r3.dbgLevel) {
		fmt.Printf(
			"%s: Determining stability in the %d th interval.\n",
			r3.dbgConfig.String(),
			r3.currentInterval,
		)
	}
	// Determine if
	// a) All the aggregates have values,
	// b) All the aggregates are complete.
	allComplete := true
	r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) {
		if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) {
			md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md)
			allComplete = md.Complete()
			if debug.IsDebug(r3.dbgLevel) {
				fmt.Printf("%s\n", md.String())
			}
		} else {
			allComplete = false
		}
		if debug.IsDebug(r3.dbgLevel) {
			fmt.Printf(
				"%s: The aggregate for the %d th interval was %s.\n",
				r3.dbgConfig.String(),
				b,
				utilities.Conditional(allComplete, "complete", "incomplete"),
			)
		}
	})

	if !allComplete {
		return false
	}

	// Calculate the averages of each of the aggregates.
	averages := make([]float64, 0)
	r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) {
		if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) {
			md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md)

			_, average := series.CalculateAverage(md)
			averages = append(averages, average)
		}
	})

	// Calculate the standard deviation of the averages of the aggregates.
	sd := utilities.CalculateStandardDeviation(averages)

	// Take a percentage of the average of the averages of the aggregates ...
	stabilityCutoff := utilities.CalculateAverage(averages) * (r3.stabilityStandardDeviation / 100.0)
	// and compare that to the standard deviation to determine stability.
	isStable := sd <= stabilityCutoff

	return isStable
}

func (r3 *MeasurementStablizer[Data, Bucket]) GetBounds() (Bucket, Bucket) {
	r3.m.Lock()
	defer r3.m.Unlock()

	haveMinimum := false

	lowerBound := Bucket(0)
	upperBound := Bucket(0)

	r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) {
		if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) {
			md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md)
			currentAggregateLowerBound, currentAggregateUpperBound := md.GetBucketBounds()

			if !haveMinimum {
				lowerBound = currentAggregateLowerBound
				haveMinimum = true
			} else {
				lowerBound = min(lowerBound, currentAggregateLowerBound)
			}
			upperBound = max(upperBound, currentAggregateUpperBound)
		}
	})

	return lowerBound, upperBound
}