diff options
| author | Will Hawkins <[email protected]> | 2023-07-10 13:45:50 -0400 |
|---|---|---|
| committer | Will Hawkins <[email protected]> | 2023-07-10 13:45:50 -0400 |
| commit | 78d574a74665c8bc062c26755c80a8b524bce347 (patch) | |
| tree | 7ad65f0052defaea63acb2f3445be00ef97e24d6 | |
| parent | fe17152a507bbf94a11cca7f49a51cbae9c0d67b (diff) | |
[Feature] Major update: Track measurements that may be delayed
Among other major feature additions, this version of the client tracks
any measurements that may be long delayed and considers their presence
or absence as part of a stability measurement.
This version of the client also more closely tracks the spec. In
particular, it performs a sinle-sided trimmed mean rather than a
double-sided trimmed mean.
Signed-off-by: Will Hawkins <[email protected]>
| -rw-r--r-- | Makefile | 2 | ||||
| -rw-r--r-- | direction/direction.go | 29 | ||||
| -rw-r--r-- | ms/ms.go | 371 | ||||
| -rw-r--r-- | ms/ms_test.go | 431 | ||||
| -rw-r--r-- | networkQuality.go | 227 | ||||
| -rw-r--r-- | rpm/calculations.go | 62 | ||||
| -rw-r--r-- | rpm/rpm.go | 37 | ||||
| -rw-r--r-- | series/message.go | 19 | ||||
| -rw-r--r-- | series/series.go | 280 | ||||
| -rw-r--r-- | series/series_test.go | 846 | ||||
| -rw-r--r-- | series/statistics.go | 74 | ||||
| -rw-r--r-- | stabilizer/algorithm.go | 176 | ||||
| -rw-r--r-- | stabilizer/stabilizer.go | 9 | ||||
| -rw-r--r-- | utilities/math.go | 144 | ||||
| -rw-r--r-- | utilities/utilities.go | 61 | ||||
| -rw-r--r-- | utilities/utilities_test.go | 62 |
16 files changed, 1835 insertions, 995 deletions
@@ -6,7 +6,7 @@ all: build test build: go build $(LDFLAGS) networkQuality.go test: - go test ./timeoutat/ ./traceable/ ./ms/ ./utilities/ ./lgc ./qualityattenuation ./rpm + go test ./timeoutat/ ./traceable/ ./utilities/ ./lgc ./qualityattenuation ./rpm ./series golines: find . -name '*.go' -exec ~/go/bin/golines -w {} \; clean: diff --git a/direction/direction.go b/direction/direction.go index 3ef6585..865a273 100644 --- a/direction/direction.go +++ b/direction/direction.go @@ -23,19 +23,18 @@ import ( ) type Direction struct { - DirectionLabel string - SelfProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] - ForeignProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] - ThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] - GranularThroughputDataLogger datalogger.DataLogger[rpm.GranularThroughputDataPoint] - CreateLgdc func() lgc.LoadGeneratingConnection - Lgcc lgc.LoadGeneratingConnectionCollection - DirectionDebugging *debug.DebugWithPrefix - ProbeDebugging *debug.DebugWithPrefix - ThroughputStabilizerDebugging *debug.DebugWithPrefix - ResponsivenessStabilizerDebugging *debug.DebugWithPrefix - LgStabilizationCommunicationChannel chan rpm.ThroughputDataPoint - ExtendedStatsEligible bool - StableThroughput bool - StableResponsiveness bool + DirectionLabel string + SelfProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] + ForeignProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] + ThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] + GranularThroughputDataLogger datalogger.DataLogger[rpm.GranularThroughputDataPoint] + CreateLgdc func() lgc.LoadGeneratingConnection + Lgcc lgc.LoadGeneratingConnectionCollection + DirectionDebugging *debug.DebugWithPrefix + ProbeDebugging *debug.DebugWithPrefix + ThroughputStabilizerDebugging *debug.DebugWithPrefix + ResponsivenessStabilizerDebugging *debug.DebugWithPrefix + ExtendedStatsEligible bool + StableThroughput bool + StableResponsiveness bool } diff --git a/ms/ms.go b/ms/ms.go deleted file mode 100644 index 06c7340..0000000 --- a/ms/ms.go +++ /dev/null @@ -1,371 +0,0 @@ -/* - * This file is part of Go Responsiveness. - * - * Go Responsiveness is free software: you can redistribute it and/or modify it under - * the terms of the GNU General Public License as published by the Free Software Foundation, - * either version 2 of the License, or (at your option) any later version. - * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY - * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A - * PARTICULAR PURPOSE. See the GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. - */ - -package ms - -import ( - "fmt" - "math" - "sort" - - "github.com/network-quality/goresponsiveness/saturating" - "github.com/network-quality/goresponsiveness/utilities" - "golang.org/x/exp/constraints" -) - -type MathematicalSeries[T constraints.Float | constraints.Integer] interface { - AddElement(T) - CalculateAverage() float64 - AllSequentialIncreasesLessThan(float64) (bool, float64) - StandardDeviation() (bool, T) - IsNormallyDistributed() bool - Len() int - Values() []T - Percentile(int) T - DoubleSidedTrim(uint) MathematicalSeries[T] - Less(int, int) bool - Swap(int, int) -} - -func calculateAverage[T constraints.Integer | constraints.Float](elements []T) float64 { - total := T(0) - for i := 0; i < len(elements); i++ { - total += elements[i] - } - return float64(total) / float64(len(elements)) -} - -func calculatePercentile[T constraints.Integer | constraints.Float]( - elements []T, - p int, -) (result T) { - result = T(0) - if p < 0 || p > 100 { - return - } - - sort.Slice(elements, func(l int, r int) bool { return elements[l] < elements[r] }) - pindex := int64((float64(p) / float64(100)) * float64(len(elements))) - result = elements[pindex] - return -} - -type InfiniteMathematicalSeries[T constraints.Float | constraints.Integer] struct { - elements []T -} - -func NewInfiniteMathematicalSeries[T constraints.Float | constraints.Integer]() MathematicalSeries[T] { - return &InfiniteMathematicalSeries[T]{} -} - -func (ims *InfiniteMathematicalSeries[T]) Swap(i, j int) { - ims.elements[i], ims.elements[j] = ims.elements[j], ims.elements[i] -} - -func (ims *InfiniteMathematicalSeries[T]) Less(i, j int) bool { - return ims.elements[i] < ims.elements[j] -} - -func (ims *InfiniteMathematicalSeries[T]) DoubleSidedTrim(percent uint) MathematicalSeries[T] { - if percent >= 100 { - panic( - fmt.Sprintf("Cannot perform double-sided trim for an invalid percentage: %d", percent), - ) - } - - trimmed := &InfiniteMathematicalSeries[T]{} - trimmed.elements = make([]T, ims.Len()) - copy(trimmed.elements, ims.elements) - - sort.Sort(trimmed) - - elementsToTrim := uint64(float32(ims.Len()) * ((float32(percent)) / float32(100.0))) - trimmed.elements = trimmed.elements[elementsToTrim : len(trimmed.elements)-int(elementsToTrim)] - - return trimmed -} - -func (ims *InfiniteMathematicalSeries[T]) Copy() MathematicalSeries[T] { - newIms := InfiniteMathematicalSeries[T]{} - newIms.elements = make([]T, ims.Len()) - copy(newIms.elements, ims.elements) - return &newIms -} - -func (ims *InfiniteMathematicalSeries[T]) AddElement(element T) { - ims.elements = append(ims.elements, element) -} - -func (ims *InfiniteMathematicalSeries[T]) CalculateAverage() float64 { - return calculateAverage(ims.elements) -} - -func (ims *InfiniteMathematicalSeries[T]) AllSequentialIncreasesLessThan( - limit float64, -) (bool, float64) { - if len(ims.elements) < 2 { - return false, 0.0 - } - - maximumSequentialIncrease := float64(0) - for i := 1; i < len(ims.elements); i++ { - current := ims.elements[i] - previous := ims.elements[i-1] - percentChange := utilities.SignedPercentDifference(current, previous) - if percentChange > limit { - return false, percentChange - } - if percentChange > float64(maximumSequentialIncrease) { - maximumSequentialIncrease = percentChange - } - } - return true, maximumSequentialIncrease -} - -/* - * N.B.: Overflow is possible -- use at your discretion! - */ -func (ims *InfiniteMathematicalSeries[T]) StandardDeviation() (bool, T) { - // From https://www.mathsisfun.com/data/standard-deviation-calculator.html - // Yes, for real! - - // Calculate the average of the numbers ... - average := ims.CalculateAverage() - - // Calculate the square of each of the elements' differences from the mean. - differences_squared := make([]float64, len(ims.elements)) - for index, value := range ims.elements { - differences_squared[index] = math.Pow(float64(value-T(average)), 2) - } - - // The variance is the average of the squared differences. - // So, we need to ... - - // Accumulate all those squared differences. - sds := float64(0) - for _, dss := range differences_squared { - sds += dss - } - - // And then divide that total by the number of elements - variance := sds / float64(len(ims.elements)) - - // Finally, the standard deviation is the square root - // of the variance. - sd := T(math.Sqrt(variance)) - // sd := T(variance) - - return true, sd -} - -func (ims *InfiniteMathematicalSeries[T]) IsNormallyDistributed() bool { - return false -} - -func (ims *InfiniteMathematicalSeries[T]) Len() int { - return len(ims.elements) -} - -func (ims *InfiniteMathematicalSeries[T]) Values() []T { - return ims.elements -} - -func (ims *InfiniteMathematicalSeries[T]) Percentile(p int) T { - return calculatePercentile(ims.elements, p) -} - -type CappedMathematicalSeries[T constraints.Float | constraints.Integer] struct { - elements_count uint - elements []T - index uint - divisor *saturating.Saturating[uint] -} - -func NewCappedMathematicalSeries[T constraints.Float | constraints.Integer]( - instants_count uint, -) MathematicalSeries[T] { - return &CappedMathematicalSeries[T]{ - elements: make([]T, instants_count), - elements_count: instants_count, - divisor: saturating.NewSaturating(instants_count), - index: 0, - } -} - -func (ma *CappedMathematicalSeries[T]) AddElement(measurement T) { - ma.elements[ma.index] = measurement - ma.divisor.Add(1) - // Invariant: ma.index always points to the oldest measurement - ma.index = (ma.index + 1) % ma.elements_count -} - -func (ma *CappedMathematicalSeries[T]) CalculateAverage() float64 { - // If we do not yet have all the values, then we know that the values - // exist between 0 and ma.divisor.Value(). If we do have all the values, - // we know that they, too, exist between 0 and ma.divisor.Value(). - return calculateAverage(ma.elements[0:ma.divisor.Value()]) -} - -func (ma *CappedMathematicalSeries[T]) AllSequentialIncreasesLessThan( - limit float64, -) (_ bool, maximumSequentialIncrease float64) { - // If we have not yet accumulated a complete set of intervals, - // this is false. - if ma.divisor.Value() != ma.elements_count { - return false, 0 - } - - // Invariant: ma.index always points to the oldest (see AddMeasurement - // above) - oldestIndex := ma.index - previous := ma.elements[oldestIndex] - maximumSequentialIncrease = 0 - for i := uint(1); i < ma.elements_count; i++ { - currentIndex := (oldestIndex + i) % ma.elements_count - current := ma.elements[currentIndex] - percentChange := utilities.SignedPercentDifference(current, previous) - previous = current - if percentChange > limit { - return false, percentChange - } - } - return true, maximumSequentialIncrease -} - -/* - * N.B.: Overflow is possible -- use at your discretion! - */ -func (ma *CappedMathematicalSeries[T]) StandardDeviation() (bool, T) { - // If we have not yet accumulated a complete set of intervals, - // we are always false. - if ma.divisor.Value() != ma.elements_count { - return false, T(0) - } - - // From https://www.mathsisfun.com/data/standard-deviation-calculator.html - // Yes, for real! - - // Calculate the average of the numbers ... - average := ma.CalculateAverage() - - // Calculate the square of each of the elements' differences from the mean. - differences_squared := make([]float64, ma.elements_count) - for index, value := range ma.elements { - differences_squared[index] = math.Pow(float64(value-T(average)), 2) - } - - // The variance is the average of the squared differences. - // So, we need to ... - - // Accumulate all those squared differences. - sds := float64(0) - for _, dss := range differences_squared { - sds += dss - } - - // And then divide that total by the number of elements - variance := sds / float64(ma.divisor.Value()) - - // Finally, the standard deviation is the square root - // of the variance. - sd := T(math.Sqrt(variance)) - // sd := T(variance) - - return true, sd -} - -func (ma *CappedMathematicalSeries[T]) IsNormallyDistributed() bool { - valid, stddev := ma.StandardDeviation() - // If there are not enough values in our series to generate a standard - // deviation, then we cannot do this calculation either. - if !valid { - return false - } - avg := float64(ma.CalculateAverage()) - - fstddev := float64(stddev) - within := float64(0) - for _, v := range ma.Values() { - if (avg-fstddev) <= float64(v) && float64(v) <= (avg+fstddev) { - within++ - } - } - return within/float64(ma.divisor.Value()) >= 0.68 -} - -func (ma *CappedMathematicalSeries[T]) Values() []T { - return ma.elements -} - -func (ma *CappedMathematicalSeries[T]) Len() int { - if uint(len(ma.elements)) != ma.elements_count { - panic( - fmt.Sprintf( - "Error: A capped mathematical series' metadata is invalid: the length of its element array/slice does not match element_count! (%v vs %v)", - ma.elements_count, - len(ma.elements), - ), - ) - } - return len(ma.elements) -} - -func (ma *CappedMathematicalSeries[T]) Percentile(p int) T { - if p < 0 || p > 100 { - return 0 - } - - // Because we need to sort the list to perform the percentile calculation, - // we have to make a copy of the list so that we don't disturb - // the time-relative ordering of the elements. - - kopy := make([]T, len(ma.elements)) - copy(kopy, ma.elements) - return calculatePercentile(kopy, p) -} - -func (ims *CappedMathematicalSeries[T]) Swap(i, j int) { - ims.elements[i], ims.elements[j] = ims.elements[j], ims.elements[i] -} - -func (ims *CappedMathematicalSeries[T]) Less(i, j int) bool { - return ims.elements[i] < ims.elements[j] -} - -func (ims *CappedMathematicalSeries[T]) DoubleSidedTrim(percent uint) MathematicalSeries[T] { - if percent >= 100 { - panic( - fmt.Sprintf("Cannot perform double-sided trim for an invalid percentage: %d", percent), - ) - } - - trimmed := &CappedMathematicalSeries[T]{elements_count: uint(ims.Len())} - trimmed.elements = make([]T, ims.Len()) - copy(trimmed.elements, ims.elements) - sort.Sort(trimmed) - - elementsToTrim := uint(float32(ims.Len()) * ((float32(percent)) / float32(100.0))) - trimmed.elements = trimmed.elements[elementsToTrim : len(trimmed.elements)-int(elementsToTrim)] - - trimmed.elements_count -= (elementsToTrim * 2) - - return trimmed -} - -func (ims *CappedMathematicalSeries[T]) Copy() MathematicalSeries[T] { - newCms := CappedMathematicalSeries[T]{} - newCms.elements = make([]T, ims.Len()) - copy(newCms.elements, ims.elements) - return &newCms -} diff --git a/ms/ms_test.go b/ms/ms_test.go deleted file mode 100644 index 34817d0..0000000 --- a/ms/ms_test.go +++ /dev/null @@ -1,431 +0,0 @@ -package ms - -import ( - "reflect" - "testing" - - "github.com/network-quality/goresponsiveness/utilities" -) - -func Test_InfiniteValues(test *testing.T) { - series := NewInfiniteMathematicalSeries[float64]() - shouldMatch := make([]float64, 0) - previous := float64(1.0) - for range utilities.Iota(1, 80) { - previous *= 1.059 - series.AddElement(float64(previous)) - shouldMatch = append(shouldMatch, previous) - } - - if !reflect.DeepEqual(shouldMatch, series.Values()) { - test.Fatalf("Values() on infinite mathematical series does not work.") - } -} - -func Test_InfiniteSequentialIncreasesAlwaysLessThan(test *testing.T) { - series := NewInfiniteMathematicalSeries[float64]() - previous := float64(1.0) - for range utilities.Iota(1, 80) { - previous *= 1.059 - series.AddElement(float64(previous)) - } - if islt, maxSeqIncrease := series.AllSequentialIncreasesLessThan(6.0); !islt { - test.Fatalf( - "(infinite) Sequential increases are not always less than 6.0 (%f).", - maxSeqIncrease, - ) - } -} - -func Test_CappedTooFewInstantsSequentialIncreasesLessThanAlwaysFalse(test *testing.T) { - series := NewCappedMathematicalSeries[float64](500) - series.AddElement(0.0) - if islt, _ := series.AllSequentialIncreasesLessThan(6.0); islt { - test.Fatalf( - "(infinite) 0 elements in a series should always yield false when asking if sequential increases are less than a value.", - ) - } -} - -func Test_Infinite_degenerate_percentile_too_high(test *testing.T) { - series := NewInfiniteMathematicalSeries[int]() - if series.Percentile(101) != 0 { - test.Fatalf("(infinite) Series percentile of 101 failed.") - } -} - -func Test_Infinite_degenerate_percentile_too_low(test *testing.T) { - series := NewInfiniteMathematicalSeries[int]() - if series.Percentile(-1) != 0 { - test.Fatalf("(infinite) Series percentile of -1 failed.") - } -} - -func Test_Infinite90_percentile(test *testing.T) { - var expected int64 = 10 - series := NewInfiniteMathematicalSeries[int64]() - series.AddElement(10) - series.AddElement(9) - series.AddElement(8) - series.AddElement(7) - series.AddElement(6) - series.AddElement(5) - series.AddElement(4) - series.AddElement(3) - series.AddElement(2) - series.AddElement(1) - - if series.Percentile(90) != expected { - test.Fatalf( - "(infinite) Series 90th percentile of 0 ... 10 failed: Expected: %v; Actual: %v.", expected, - series.Percentile(90), - ) - } -} - -func Test_Infinite90_percentile_reversed(test *testing.T) { - var expected int64 = 10 - series := NewInfiniteMathematicalSeries[int64]() - series.AddElement(1) - series.AddElement(2) - series.AddElement(3) - series.AddElement(4) - series.AddElement(5) - series.AddElement(6) - series.AddElement(7) - series.AddElement(8) - series.AddElement(9) - series.AddElement(10) - - if series.Percentile(90) != expected { - test.Fatalf( - "(infinite) Series 90th percentile of 0 ... 10 failed: Expected %v; Actual: %v.", expected, - series.Percentile(90), - ) - } -} - -func Test_Infinite50_percentile_jumbled(test *testing.T) { - var expected int64 = 15 - series := NewInfiniteMathematicalSeries[int64]() - series.AddElement(7) - series.AddElement(2) - series.AddElement(15) - series.AddElement(27) - series.AddElement(5) - series.AddElement(52) - series.AddElement(18) - series.AddElement(23) - series.AddElement(11) - series.AddElement(12) - - if series.Percentile(50) != expected { - test.Fatalf( - "(infinite) Series 50 percentile of a jumble of numbers failed: Expected %v; Actual: %v.", expected, - series.Percentile(50), - ) - } -} - -func Test_InfiniteDoubleSidedTrimmedMean_jumbled(test *testing.T) { - expected := 16 - series := NewInfiniteMathematicalSeries[int64]() - series.AddElement(7) - series.AddElement(2) - series.AddElement(15) - series.AddElement(27) - series.AddElement(5) - series.AddElement(5) - series.AddElement(52) - series.AddElement(18) - series.AddElement(23) - series.AddElement(11) - series.AddElement(22) - series.AddElement(17) - series.AddElement(14) - series.AddElement(9) - series.AddElement(100) - series.AddElement(72) - series.AddElement(91) - series.AddElement(43) - series.AddElement(37) - series.AddElement(62) - - trimmed := series.DoubleSidedTrim(10) - - if trimmed.Len() != expected { - test.Fatalf( - "Capped series is not of the proper size. Expected %v and got %v", - expected, - trimmed.Len(), - ) - } - - prev := int64(0) - for _, v := range trimmed.Values() { - if !(prev <= v) { - test.Fatalf("Not sorted: %v is not less than or equal to %v\n", prev, v) - } - prev = v - } -} - -func Test_CappedSequentialIncreasesAlwaysLessThan(test *testing.T) { - series := NewCappedMathematicalSeries[float64](40) - previous := float64(1.0) - for range utilities.Iota(1, 80) { - previous *= 1.059 - series.AddElement(float64(previous)) - } - if islt, maxSeqIncrease := series.AllSequentialIncreasesLessThan(6.0); !islt { - test.Fatalf("Sequential increases are not always less than 6.0 (%f).", maxSeqIncrease) - } -} - -func Test_CappedSequentialIncreasesAlwaysLessThanWithWraparound(test *testing.T) { - series := NewCappedMathematicalSeries[float64](20) - previous := float64(1.0) - for range utilities.Iota(1, 20) { - previous *= 1.15 - series.AddElement(float64(previous)) - } - - // All those measurements should be ejected by the following - // loop! - for range utilities.Iota(1, 20) { - previous *= 1.10 - series.AddElement(float64(previous)) - } - - if islt, maxSeqIncrease := series.AllSequentialIncreasesLessThan(11.0); !islt { - test.Fatalf( - "Sequential increases are not always less than 11.0 in wraparound situation (%f v 11.0).", - maxSeqIncrease, - ) - } -} - -func Test_CappedSequentialIncreasesAlwaysLessThanWithWraparoundInverse(test *testing.T) { - series := NewCappedMathematicalSeries[float64](20) - previous := float64(1.0) - for range utilities.Iota(1, 20) { - previous *= 1.15 - series.AddElement(float64(previous)) - } - - // *Not* all those measurements should be ejected by the following - // loop! - for range utilities.Iota(1, 15) { - previous *= 1.10 - series.AddElement(float64(previous)) - } - - if islt, maxSeqIncrease := series.AllSequentialIncreasesLessThan(11.0); islt { - test.Fatalf( - "Sequential increases are (unexpectedly) always less than 11.0 in wraparound situation: %f v 11.0.", - maxSeqIncrease, - ) - } -} - -func Test_CappedStandardDeviationCalculation(test *testing.T) { - expected := 2.93 - series := NewCappedMathematicalSeries[float64](5) - // 5.7, 1.0, 8.6, 7.4, 2.2 - series.AddElement(5.7) - series.AddElement(5.7) - series.AddElement(5.7) - series.AddElement(5.7) - series.AddElement(5.7) - series.AddElement(5.7) - series.AddElement(5.7) - series.AddElement(5.7) - series.AddElement(5.7) - series.AddElement(1.0) - series.AddElement(8.6) - series.AddElement(7.4) - series.AddElement(2.2) - - if _, sd := series.StandardDeviation(); !utilities.ApproximatelyEqual(sd, expected, 0.01) { - test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd) - } else { - test.Logf("Standard deviation calculation result: %v", sd) - } -} - -func Test_CappedStandardDeviationCalculation2(test *testing.T) { - expected := 1.41 - series := NewCappedMathematicalSeries[float64](5) - series.AddElement(8) - series.AddElement(9) - series.AddElement(10) - series.AddElement(11) - series.AddElement(12) - - if _, sd := series.StandardDeviation(); !utilities.ApproximatelyEqual(sd, expected, 0.01) { - test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd) - } else { - test.Logf("Standard deviation calculation result: %v", sd) - } -} - -func Test_CappedRotatingValues(test *testing.T) { - series := NewCappedMathematicalSeries[int](5) - - series.AddElement(1) - series.AddElement(2) - series.AddElement(3) - series.AddElement(4) - series.AddElement(5) - - series.AddElement(6) - series.AddElement(7) - - if !reflect.DeepEqual([]int{6, 7, 3, 4, 5}, series.Values()) { - test.Fatalf("Adding values does not properly erase earlier values.") - } -} - -func Test_CappedLen(test *testing.T) { - series := NewCappedMathematicalSeries[int](5) - - series.AddElement(1) - series.AddElement(2) - series.AddElement(3) - series.AddElement(4) - series.AddElement(5) - - series.AddElement(6) - series.AddElement(7) - - if series.Len() != 5 { - test.Fatalf("Series size calculations failed.") - } -} - -func Test_Capped_degenerate_percentile_too_high(test *testing.T) { - series := NewCappedMathematicalSeries[int](21) - if series.Percentile(101) != 0 { - test.Fatalf("Series percentile of 101 failed.") - } -} - -func Test_Capped_degenerate_percentile_too_low(test *testing.T) { - series := NewCappedMathematicalSeries[int](21) - if series.Percentile(-1) != 0 { - test.Fatalf("Series percentile of -1 failed.") - } -} - -func Test_Capped90_percentile(test *testing.T) { - var expected int = 10 - series := NewCappedMathematicalSeries[int](10) - series.AddElement(10) - series.AddElement(9) - series.AddElement(8) - series.AddElement(7) - series.AddElement(6) - series.AddElement(5) - series.AddElement(4) - series.AddElement(3) - series.AddElement(2) - series.AddElement(1) - - if series.Percentile(90) != expected { - test.Fatalf( - "Series 90th percentile of 0 ... 10 failed: Expected %v got %v.", expected, - series.Percentile(90), - ) - } -} - -func Test_Capped90_percentile_reversed(test *testing.T) { - series := NewCappedMathematicalSeries[int64](10) - series.AddElement(1) - series.AddElement(2) - series.AddElement(3) - series.AddElement(4) - series.AddElement(5) - series.AddElement(6) - series.AddElement(7) - series.AddElement(8) - series.AddElement(9) - series.AddElement(10) - - if series.Percentile(90) != 10 { - test.Fatalf( - "Series 90th percentile of 0 ... 10 failed: Expected 10 got %v.", - series.Percentile(90), - ) - } -} - -func Test_Capped50_percentile_jumbled(test *testing.T) { - var expected int64 = 15 - series := NewCappedMathematicalSeries[int64](10) - series.AddElement(7) - series.AddElement(2) - series.AddElement(15) - series.AddElement(27) - series.AddElement(5) - series.AddElement(52) - series.AddElement(18) - series.AddElement(23) - series.AddElement(11) - series.AddElement(12) - - if series.Percentile(50) != expected { - test.Fatalf( - "Series 50 percentile of a jumble of numbers failed: Expected %v got %v.", expected, - series.Percentile(50), - ) - } -} - -func Test_CappedDoubleSidedTrimmedMean_jumbled(test *testing.T) { - expected := 8 - series := NewCappedMathematicalSeries[int64](10) - series.AddElement(7) - series.AddElement(2) - series.AddElement(15) - series.AddElement(27) - series.AddElement(5) - series.AddElement(5) - series.AddElement(52) - series.AddElement(18) - series.AddElement(23) - series.AddElement(11) - series.AddElement(12) - - trimmed := series.DoubleSidedTrim(10) - - if trimmed.Len() != expected { - test.Fatalf( - "Capped series is not of the proper size. Expected %v and got %v", - expected, - trimmed.Len(), - ) - } - - prev := int64(0) - for _, v := range trimmed.Values() { - if !(prev <= v) { - test.Fatalf("Not sorted: %v is not less than or equal to %v\n", prev, v) - } - prev = v - } -} - -func Test_CappedAverage(test *testing.T) { - expected := 1.0082230220488836e+08 - series := NewCappedMathematicalSeries[float64](4) - series.AddElement(9.94747772516195e+07) - series.AddElement(9.991286984703423e+07) - series.AddElement(1.0285437111086299e+08) - series.AddElement(1.0104719061003672e+08) - if average := series.CalculateAverage(); !utilities.ApproximatelyEqual(average, 0.01, expected) { - test.Fatalf( - "Expected: %v; Actual: %v.", average, expected, - ) - } -} diff --git a/networkQuality.go b/networkQuality.go index 776c0e7..aa8d854 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -33,10 +33,10 @@ import ( "github.com/network-quality/goresponsiveness/direction" "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" - "github.com/network-quality/goresponsiveness/ms" "github.com/network-quality/goresponsiveness/probe" "github.com/network-quality/goresponsiveness/qualityattenuation" "github.com/network-quality/goresponsiveness/rpm" + "github.com/network-quality/goresponsiveness/series" "github.com/network-quality/goresponsiveness/stabilizer" "github.com/network-quality/goresponsiveness/timeoutat" "github.com/network-quality/goresponsiveness/utilities" @@ -471,8 +471,8 @@ func main() { } // All tests will accumulate data to these series because it will all matter for RPM calculation! - selfRtts := ms.NewInfiniteMathematicalSeries[float64]() - foreignRtts := ms.NewInfiniteMathematicalSeries[float64]() + selfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) + foreignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil if *printQualityAttenuation { @@ -521,8 +521,8 @@ func main() { if *debugCliFlag { downloadThroughputStabilizerDebugLevel = debug.Debug } - throughputStabilizer := stabilizer.NewStabilizer[float64]( - uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance, 0, "bytes", + throughputStabilizer := stabilizer.NewStabilizer[float64, uint64]( + specParameters.MovingAvgDist, specParameters.StdDevTolerance, 0, "bytes", downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig) responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, @@ -531,8 +531,8 @@ func main() { if *debugCliFlag { responsivenessStabilizerDebugLevel = debug.Debug } - responsivenessStabilizer := stabilizer.NewStabilizer[int64]( - uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance, + responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint]( + specParameters.MovingAvgDist, specParameters.StdDevTolerance, specParameters.TrimmedMeanPct, "milliseconds", responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig) @@ -541,32 +541,70 @@ func main() { lastThroughputRate := float64(0) lastThroughputOpenConnectionCount := int(0) + stabilityCheckTime := time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel := timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) + lg_timeout: for !direction.StableThroughput { select { case throughputMeasurement := <-lgStabilizationCommunicationChannel: { - throughputStabilizer.AddMeasurement( - throughputMeasurement.Throughput) + switch throughputMeasurement.Type { + case series.SeriesMessageReserve: + { + throughputStabilizer.Reserve(throughputMeasurement.Bucket) + if *debugCliFlag { + fmt.Printf( + "%s: Reserving a throughput bucket with id %v.\n", + direction.DirectionLabel, throughputMeasurement.Bucket) + } + } + case series.SeriesMessageMeasure: + { + bucket := throughputMeasurement.Bucket + measurement := utilities.GetSome(throughputMeasurement.Measure) + + throughputStabilizer.AddMeasurement(bucket, measurement.Throughput) + + direction.ThroughputDataLogger.LogRecord(measurement) + for _, v := range measurement.GranularThroughputDataPoints { + v.Direction = "Download" + direction.GranularThroughputDataLogger.LogRecord(v) + } + + lastThroughputRate = measurement.Throughput + lastThroughputOpenConnectionCount = measurement.Connections + } + } + } + case <-stabilityCheckTimeChannel: + { + if *debugCliFlag { + fmt.Printf( + "%v throughput stability interval is complete.\n", direction.DirectionLabel) + } + stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel = timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) + direction.StableThroughput = throughputStabilizer.IsStable() if *debugCliFlag { fmt.Printf( - "################# %v is instantaneously %s.\n", direction.DirectionLabel, + "%v is instantaneously %s.\n", direction.DirectionLabel, utilities.Conditional(direction.StableThroughput, "stable", "unstable")) } - direction.ThroughputDataLogger.LogRecord(throughputMeasurement) - for i := range throughputMeasurement.GranularThroughputDataPoints { - datapoint := throughputMeasurement.GranularThroughputDataPoints[i] - datapoint.Direction = "Download" - direction.GranularThroughputDataLogger.LogRecord(datapoint) - } - - lastThroughputRate = throughputMeasurement.Throughput - lastThroughputOpenConnectionCount = throughputMeasurement.Connections if direction.StableThroughput { throughputGeneratorCtxCancel() } + throughputStabilizer.Interval() } case <-timeoutChannel: { @@ -577,7 +615,7 @@ func main() { if direction.StableThroughput { if *debugCliFlag { - fmt.Printf("################# Throughput is stable; beginning responsiveness testing.\n") + fmt.Printf("Throughput is stable; beginning responsiveness testing.\n") } } else { fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Adding 15 seconds to calculate speculative RPM results.\n") @@ -590,8 +628,8 @@ func main() { ) } - perDirectionSelfRtts := ms.NewInfiniteMathematicalSeries[float64]() - perDirectionForeignRtts := ms.NewInfiniteMathematicalSeries[float64]() + perDirectionSelfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) + perDirectionForeignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber( proberOperatorCtx, @@ -611,61 +649,122 @@ func main() { select { case probeMeasurement := <-responsivenessStabilizationCommunicationChannel: { - foreignDataPoint := probeMeasurement.First - selfDataPoint := probeMeasurement.Second + switch probeMeasurement.Type { + case series.SeriesMessageReserve: + { + bucket := probeMeasurement.Bucket + if *debugCliFlag { + fmt.Printf( + "%s: Reserving a responsiveness bucket with id %v.\n", direction.DirectionLabel, bucket) + } + responsivenessStabilizer.Reserve(bucket) + selfRtts.Reserve(bucket) + foreignRtts.Reserve(bucket) + perDirectionForeignRtts.Reserve(bucket) + perDirectionSelfRtts.Reserve(bucket) + } + case series.SeriesMessageMeasure: + { + bucket := probeMeasurement.Bucket + measurement := utilities.GetSome(probeMeasurement.Measure) + foreignDataPoint := measurement.Foreign + selfDataPoint := measurement.Self - responsivenessStabilizer.AddMeasurement( - (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds()) + if *debugCliFlag { + fmt.Printf( + "%s: Filling a responsiveness bucket with id %v with value %v.\n", + direction.DirectionLabel, bucket, measurement) + } + responsivenessStabilizer.AddMeasurement(bucket, + (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds()) - // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy - // is *actually* important, but it can't hurt? - direction.StableResponsiveness = responsivenessStabilizer.IsStable() + if err := selfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (selfRtts)\n", bucket) + } + if perDirectionSelfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionSelfRtts)\n", bucket) + } - if *debugCliFlag { - fmt.Printf( - "################# Responsiveness is instantaneously %s.\n", - utilities.Conditional(direction.StableResponsiveness, "stable", "unstable")) - } - // There may be more than one round trip accumulated together. If that is the case, - // we will blow them apart in to three separate measurements and each one will just - // be 1 / measurement.RoundTripCount of the total length. - for range utilities.Iota(0, int(foreignDataPoint.RoundTripCount)) { - foreignRtts.AddElement(foreignDataPoint.Duration.Seconds() / - float64(foreignDataPoint.RoundTripCount)) - perDirectionForeignRtts.AddElement(foreignDataPoint.Duration.Seconds() / - float64(foreignDataPoint.RoundTripCount)) - } - selfRtts.AddElement(selfDataPoint.Duration.Seconds()) - perDirectionSelfRtts.AddElement(selfDataPoint.Duration.Seconds()) + if foreignRtts.Fill(bucket, foreignDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (foreignRtts)\n", bucket) + } - if selfRttsQualityAttenuation != nil { - selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds()) - } + if perDirectionForeignRtts.Fill(bucket, + foreignDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionForeignRtts)\n", bucket) + } + + if selfRttsQualityAttenuation != nil { + selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds()) + } - direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint) - direction.SelfProbeDataLogger.LogRecord(*selfDataPoint) + direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint) + direction.SelfProbeDataLogger.LogRecord(*selfDataPoint) + + } + } } case throughputMeasurement := <-lgStabilizationCommunicationChannel: { - if *debugCliFlag { - fmt.Printf("Adding a throughput measurement.\n") - } - // There may be more than one round trip accumulated together. If that is the case, - direction.ThroughputDataLogger.LogRecord(throughputMeasurement) - for i := range throughputMeasurement.GranularThroughputDataPoints { - datapoint := throughputMeasurement.GranularThroughputDataPoints[i] - datapoint.Direction = direction.DirectionLabel - direction.GranularThroughputDataLogger.LogRecord(datapoint) - } + switch throughputMeasurement.Type { + case series.SeriesMessageReserve: + { + // We are no longer tracking stability, so reservation messages are useless! + if *debugCliFlag { + fmt.Printf( + "%s: Discarding a throughput bucket with id %v when ascertaining responsiveness.\n", + direction.DirectionLabel, throughputMeasurement.Bucket) + } + } + case series.SeriesMessageMeasure: + { + measurement := utilities.GetSome(throughputMeasurement.Measure) - lastThroughputRate = throughputMeasurement.Throughput - lastThroughputOpenConnectionCount = throughputMeasurement.Connections + if *debugCliFlag { + fmt.Printf("Adding a throughput measurement (while ascertaining responsiveness).\n") + } + // There may be more than one round trip accumulated together. If that is the case, + direction.ThroughputDataLogger.LogRecord(measurement) + for _, v := range measurement.GranularThroughputDataPoints { + v.Direction = direction.DirectionLabel + direction.GranularThroughputDataLogger.LogRecord(v) + } + lastThroughputRate = measurement.Throughput + lastThroughputOpenConnectionCount = measurement.Connections + } + } } case <-timeoutChannel: { break responsiveness_timeout } + case <-stabilityCheckTimeChannel: + { + if *debugCliFlag { + fmt.Printf( + "%v responsiveness stability interval is complete.\n", direction.DirectionLabel) + } + + stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel = timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) + + // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy + // is *actually* important, but it can't hurt? + direction.StableResponsiveness = responsivenessStabilizer.IsStable() + + if *debugCliFlag { + fmt.Printf( + "Responsiveness is instantaneously %s.\n", + utilities.Conditional(direction.StableResponsiveness, "stable", "unstable")) + } + + responsivenessStabilizer.Interval() + } } } @@ -753,7 +852,7 @@ func main() { } fmt.Printf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, directionResult.PNRpm, 90) - fmt.Printf("%s RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel, + fmt.Printf("%s RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel, directionResult.MeanRpm, specParameters.TrimmedMeanPct) if len(*prometheusStatsFilename) > 0 { @@ -808,7 +907,7 @@ func main() { } fmt.Printf("Final RPM: %5.0f (P%d)\n", result.PNRpm, 90) - fmt.Printf("Final RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n", + fmt.Printf("Final RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", result.MeanRpm, specParameters.TrimmedMeanPct) // Stop the world. diff --git a/rpm/calculations.go b/rpm/calculations.go index 5387aa7..4b61413 100644 --- a/rpm/calculations.go +++ b/rpm/calculations.go @@ -17,40 +17,60 @@ package rpm import ( "fmt" - "github.com/network-quality/goresponsiveness/ms" + "github.com/network-quality/goresponsiveness/series" + "github.com/network-quality/goresponsiveness/utilities" + "golang.org/x/exp/constraints" ) -type Rpm struct { +type Rpm[Data utilities.Number] struct { SelfRttsTotal int ForeignRttsTotal int SelfRttsTrimmed int ForeignRttsTrimmed int - SelfProbeRttPN float64 - ForeignProbeRttPN float64 + SelfProbeRttPN Data + ForeignProbeRttPN Data SelfProbeRttMean float64 ForeignProbeRttMean float64 PNRpm float64 MeanRpm float64 } -func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.MathematicalSeries[float64], trimming uint, percentile int) Rpm { - // First, let's do a double-sided trim of the top/bottom 10% of our measurements. - selfRttsTotalCount := selfRtts.Len() - foreignRttsTotalCount := foreignRtts.Len() +func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered]( + selfRtts series.WindowSeries[Data, Bucket], aggregatedForeignRtts series.WindowSeries[Data, Bucket], trimming uint, percentile int, +) Rpm[Data] { + // There may be more than one round trip accumulated together. If that is the case, + // we will blow them apart in to three separate measurements and each one will just + // be 1 / 3. + foreignRtts := series.NewWindowSeries[Data, int](series.Forever, 0) + foreignBuckets := 0 + for _, v := range aggregatedForeignRtts.GetValues() { + if utilities.IsSome(v) { + v := utilities.GetSome(v) + foreignRtts.Reserve(foreignBuckets) + foreignRtts.Reserve(foreignBuckets + 1) + foreignRtts.Reserve(foreignBuckets + 2) + foreignRtts.Fill(foreignBuckets, v/3) + foreignRtts.Fill(foreignBuckets+1, v/3) + foreignRtts.Fill(foreignBuckets+2, v/3) + foreignBuckets += 3 + } + } - selfRttsTrimmed := selfRtts.DoubleSidedTrim(trimming) - foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(trimming) + // First, let's do a double-sided trim of the top/bottom 10% of our measurements. + selfRttsTotalCount, _ := selfRtts.Count() + foreignRttsTotalCount, _ := foreignRtts.Count() - selfRttsTrimmedCount := selfRttsTrimmed.Len() - foreignRttsTrimmedCount := foreignRttsTrimmed.Len() + _, selfProbeRoundTripTimeMean, selfRttsTrimmed := + series.TrimmedMean(selfRtts, int(trimming)) + _, foreignProbeRoundTripTimeMean, foreignRttsTrimmed := + series.TrimmedMean(foreignRtts, int(trimming)) - // Then, let's take the mean of those ... - selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage() - foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage() + selfRttsTrimmedCount := len(selfRttsTrimmed) + foreignRttsTrimmedCount := len(foreignRttsTrimmed) // Second, let's do the P90 calculations. - selfProbeRoundTripTimePN := selfRtts.Percentile(percentile) - foreignProbeRoundTripTimePN := foreignRtts.Percentile(percentile) + _, selfProbeRoundTripTimePN := series.Percentile(selfRtts, percentile) + _, foreignProbeRoundTripTimePN := series.Percentile(foreignRtts, percentile) // Note: The specification indicates that we want to calculate the foreign probes as such: // 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign @@ -62,7 +82,7 @@ func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.Mathem pnRpm := 60.0 / (float64(selfProbeRoundTripTimePN+foreignProbeRoundTripTimePN) / 2.0) meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0) - return Rpm{ + return Rpm[Data]{ SelfRttsTotal: selfRttsTotalCount, ForeignRttsTotal: foreignRttsTotalCount, SelfRttsTrimmed: selfRttsTrimmedCount, ForeignRttsTrimmed: foreignRttsTrimmedCount, SelfProbeRttPN: selfProbeRoundTripTimePN, ForeignProbeRttPN: foreignProbeRoundTripTimePN, @@ -71,14 +91,14 @@ func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.Mathem } } -func (rpm *Rpm) ToString() string { +func (rpm *Rpm[Data]) ToString() string { return fmt.Sprintf( `Total Self Probes: %d Total Foreign Probes: %d Trimmed Self Probes Count: %d Trimmed Foreign Probes Count: %d -P90 Self RTT: %f -P90 Foreign RTT: %f +P90 Self RTT: %v +P90 Foreign RTT: %v Trimmed Mean Self RTT: %f Trimmed Mean Foreign RTT: %f `, @@ -29,6 +29,7 @@ import ( "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" "github.com/network-quality/goresponsiveness/probe" + "github.com/network-quality/goresponsiveness/series" "github.com/network-quality/goresponsiveness/utilities" ) @@ -84,6 +85,11 @@ type SelfDataCollectionResult struct { LoggingContinuation func() } +type ResponsivenessProbeResult struct { + Foreign *probe.ProbeDataPoint + Self *probe.ProbeDataPoint +} + func ResponsivenessProber( proberCtx context.Context, networkActivityCtx context.Context, @@ -95,7 +101,7 @@ func ResponsivenessProber( keyLogger io.Writer, captureExtendedStats bool, debugging *debug.DebugWithPrefix, -) (dataPoints chan utilities.Pair[*probe.ProbeDataPoint]) { +) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, uint]) { if debug.IsDebug(debugging.Level) { fmt.Printf( "(%s) Starting to collect responsiveness information at an interval of %v!\n", @@ -106,7 +112,7 @@ func ResponsivenessProber( // Make a channel to send back all the generated data points // when we are probing. - dataPoints = make(chan utilities.Pair[*probe.ProbeDataPoint]) + dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, uint]) go func() { wg := sync.WaitGroup{} @@ -141,6 +147,11 @@ func ResponsivenessProber( ) } + dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{ + Type: series.SeriesMessageReserve, Bucket: probeCount, + Measure: utilities.None[ResponsivenessProbeResult](), + } + // The presence of a custom TLSClientConfig in a *generic* `transport` // means that go will default to HTTP/1.1 and cowardly avoid HTTP/2: // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278 @@ -262,8 +273,13 @@ func ResponsivenessProber( dataPointsLock.Lock() // Now we have our four data points (three in the foreign probe data point and one in the self probe data point) if dataPoints != nil { - dataPoints <- utilities.Pair[*probe.ProbeDataPoint]{ - First: foreignProbeDataPoint, Second: selfProbeDataPoint, + measurement := ResponsivenessProbeResult{ + Foreign: foreignProbeDataPoint, Self: selfProbeDataPoint, + } + + dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{ + Type: series.SeriesMessageMeasure, Bucket: probeCount, + Measure: utilities.Some[ResponsivenessProbeResult](measurement), } } dataPointsLock.Unlock() @@ -300,8 +316,8 @@ func LoadGenerator( mnp int, captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections? debugging *debug.DebugWithPrefix, // How can we forget debugging? -) (stabilizerCommunicationChannel chan ThroughputDataPoint) { // Send back all the instantaneous throughputs that we generate. - stabilizerCommunicationChannel = make(chan ThroughputDataPoint) +) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, uint64]) { // Send back all the instantaneous throughputs that we generate. + seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, uint64]) go func() { flowsCreated := uint64(0) @@ -454,7 +470,14 @@ func LoadGenerator( len(*loadGeneratingConnectionsCollection.LGCs), granularThroughputDatapoints, } - stabilizerCommunicationChannel <- throughputDataPoint + + seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{ + Type: series.SeriesMessageReserve, Bucket: currentInterval, + } + seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{ + Type: series.SeriesMessageMeasure, Bucket: currentInterval, + Measure: utilities.Some[ThroughputDataPoint](throughputDataPoint), + } if generateLoadCtx.Err() != nil { // No need to add additional data points because the controller told us diff --git a/series/message.go b/series/message.go new file mode 100644 index 0000000..fa0b096 --- /dev/null +++ b/series/message.go @@ -0,0 +1,19 @@ +package series + +import ( + "github.com/network-quality/goresponsiveness/utilities" + "golang.org/x/exp/constraints" +) + +type SeriesMessageType int + +const ( + SeriesMessageReserve SeriesMessageType = iota + SeriesMessageMeasure SeriesMessageType = iota +) + +type SeriesMessage[Data any, BucketType constraints.Ordered] struct { + Type SeriesMessageType + Bucket BucketType + Measure utilities.Optional[Data] +} diff --git a/series/series.go b/series/series.go new file mode 100644 index 0000000..0084007 --- /dev/null +++ b/series/series.go @@ -0,0 +1,280 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ +package series + +import ( + "fmt" + + "github.com/network-quality/goresponsiveness/utilities" + "golang.org/x/exp/constraints" +) + +type WindowSeriesDuration int + +const ( + Forever WindowSeriesDuration = iota + WindowOnly WindowSeriesDuration = iota +) + +type WindowSeries[Data any, Bucket constraints.Ordered] interface { + fmt.Stringer + + Reserve(b Bucket) error + Fill(b Bucket, d Data) error + + Count() (some int, none int) + + ForEach(func(Bucket, *utilities.Optional[Data])) + + GetValues() []utilities.Optional[Data] + Complete() bool + GetType() WindowSeriesDuration +} + +type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct { + windowSize int + data []utilities.Pair[Bucket, utilities.Optional[Data]] + latestIndex int + empty bool +} + +/* + * Beginning of WindowSeries interface methods. + */ + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error { + if !wsi.empty && b <= wsi.data[wsi.latestIndex].First { + return fmt.Errorf("reserving must be monotonically increasing") + } + + if wsi.empty { + /* Special case if we are empty: The latestIndex is where we want this value to go! */ + wsi.data[wsi.latestIndex] = utilities.Pair[Bucket, utilities.Optional[Data]]{ + First: b, Second: utilities.None[Data](), + } + } else { + /* Otherwise, bump ourselves forward and place the new reservation there. */ + wsi.latestIndex = wsi.nextIndex(wsi.latestIndex) + wsi.data[wsi.latestIndex] = utilities.Pair[Bucket, utilities.Optional[Data]]{ + First: b, Second: utilities.None[Data](), + } + } + wsi.empty = false + return nil +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) error { + iterator := wsi.latestIndex + for { + if wsi.data[iterator].First == b { + wsi.data[iterator].Second = utilities.Some[Data](d) + return nil + } + iterator = wsi.nextIndex(iterator) + if iterator == wsi.latestIndex { + break + } + } + return fmt.Errorf("attempting to fill a bucket that does not exist") +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Count() (some int, none int) { + some = 0 + none = 0 + for _, v := range wsi.data { + if utilities.IsSome[Data](v.Second) { + some++ + } else { + none++ + } + } + return +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Complete() bool { + for _, v := range wsi.data { + if utilities.IsNone(v.Second) { + return false + } + } + return true +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) nextIndex(currentIndex int) int { + return (currentIndex + 1) % wsi.windowSize +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) previousIndex(currentIndex int) int { + nextIndex := currentIndex - 1 + if nextIndex < 0 { + nextIndex += wsi.windowSize + } + return nextIndex +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optional[Data] { + result := make([]utilities.Optional[Data], wsi.windowSize) + + iterator := wsi.latestIndex + parallelIterator := 0 + for { + result[parallelIterator] = wsi.data[iterator].Second + iterator = wsi.previousIndex(iterator) + parallelIterator++ + if iterator == wsi.latestIndex { + break + } + } + return result +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] { + return wsi.toArray() +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetType() WindowSeriesDuration { + return WindowOnly +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ForEach(eacher func(b Bucket, d *utilities.Optional[Data])) { + for _, v := range wsi.data { + eacher(v.First, &v.Second) + } +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string { + result := fmt.Sprintf("Window series (window (%d) only, latest index: %v): ", wsi.windowSize, wsi.latestIndex) + for _, v := range wsi.data { + valueString := "None" + if utilities.IsSome[Data](v.Second) { + valueString = fmt.Sprintf("%v", utilities.GetSome[Data](v.Second)) + } + result += fmt.Sprintf("%v: %v; ", v.First, valueString) + } + return result +} + +func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered]( + windowSize int, +) *windowSeriesWindowOnlyImpl[Data, Bucket] { + result := windowSeriesWindowOnlyImpl[Data, Bucket]{windowSize: windowSize, latestIndex: 0, empty: true} + + result.data = make([]utilities.Pair[Bucket, utilities.Optional[Data]], windowSize) + + return &result +} + +/* + * End of WindowSeries interface methods. + */ + +type windowSeriesForeverImpl[Data any, Bucket constraints.Ordered] struct { + data []utilities.Pair[Bucket, utilities.Optional[Data]] + empty bool +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error { + if !wsi.empty && b <= wsi.data[len(wsi.data)-1].First { + return fmt.Errorf("reserving must be monotonically increasing") + } + + wsi.empty = false + wsi.data = append(wsi.data, utilities.Pair[Bucket, utilities.Optional[Data]]{First: b, Second: utilities.None[Data]()}) + return nil +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error { + for i := range wsi.data { + if wsi.data[i].First == b { + wsi.data[i].Second = utilities.Some[Data](d) + return nil + } + } + return fmt.Errorf("attempting to fill a bucket that does not exist") +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] { + result := make([]utilities.Optional[Data], len(wsi.data)) + + for i, v := range utilities.Reverse(wsi.data) { + result[i] = v.Second + } + + return result +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) Count() (some int, none int) { + some = 0 + none = 0 + for _, v := range wsi.data { + if utilities.IsSome[Data](v.Second) { + some++ + } else { + none++ + } + } + return +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) Complete() bool { + for _, v := range wsi.data { + if utilities.IsNone(v.Second) { + return false + } + } + return true +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetType() WindowSeriesDuration { + return Forever +} + +func newWindowSeriesForeverImpl[Data any, Bucket constraints.Ordered]() *windowSeriesForeverImpl[Data, Bucket] { + result := windowSeriesForeverImpl[Data, Bucket]{empty: true} + + result.data = nil + + return &result +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) ForEach(eacher func(b Bucket, d *utilities.Optional[Data])) { + for _, v := range wsi.data { + eacher(v.First, &v.Second) + } +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string { + result := "Window series (forever): " + for _, v := range wsi.data { + valueString := "None" + if utilities.IsSome[Data](v.Second) { + valueString = fmt.Sprintf("%v", utilities.GetSome[Data](v.Second)) + } + result += fmt.Sprintf("%v: %v; ", v.First, valueString) + } + return result +} + +/* + * End of WindowSeries interface methods. + */ + +func NewWindowSeries[Data any, Bucket constraints.Ordered](tipe WindowSeriesDuration, windowSize int) WindowSeries[Data, Bucket] { + if tipe == WindowOnly { + return newWindowSeriesWindowOnlyImpl[Data, Bucket](windowSize) + } else if tipe == Forever { + return newWindowSeriesForeverImpl[Data, Bucket]() + } + panic("") +} diff --git a/series/series_test.go b/series/series_test.go new file mode 100644 index 0000000..5d2ab57 --- /dev/null +++ b/series/series_test.go @@ -0,0 +1,846 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ +package series + +import ( + "reflect" + "testing" + + "github.com/network-quality/goresponsiveness/utilities" +) + +func TestNextIndex(t *testing.T) { + wsi := newWindowSeriesWindowOnlyImpl[int, int](4) + + idx := wsi.nextIndex(wsi.latestIndex) + if idx != 1 { + t.Fatalf("nextIndex is wrong (1)") + } + wsi.latestIndex = idx + + idx = wsi.nextIndex(wsi.latestIndex) + if idx != 2 { + t.Fatalf("nextIndex is wrong (2)") + } + wsi.latestIndex = idx + + idx = wsi.nextIndex(wsi.latestIndex) + if idx != 3 { + t.Fatalf("nextIndex is wrong (3)") + } + wsi.latestIndex = idx + + idx = wsi.nextIndex(wsi.latestIndex) + if idx != 0 { + t.Fatalf("nextIndex is wrong (0)") + } + wsi.latestIndex = idx + + idx = wsi.nextIndex(wsi.latestIndex) + if idx != 1 { + t.Fatalf("nextIndex is wrong (1)") + } + wsi.latestIndex = idx +} + +func TestSimpleWindowComplete(t *testing.T) { + wsi := newWindowSeriesWindowOnlyImpl[int, int](4) + if wsi.Complete() { + t.Fatalf("Window should not be complete.") + } + wsfImpl := newWindowSeriesForeverImpl[int, int]() + wsfImpl.Reserve(1) + if wsfImpl.Complete() { + t.Fatalf("Window should not be complete.") + } +} + +func TestSimpleReserve(t *testing.T) { + wswoImpl := newWindowSeriesWindowOnlyImpl[int, int](4) + result := wswoImpl.Reserve(0) + if result != nil { + t.Fatalf("Reserving 1 should be a-ok!") + } + wsfImpl := newWindowSeriesForeverImpl[int, int]() + result = wsfImpl.Reserve(0) + if result != nil { + t.Fatalf("Reserving 1 should be a-ok!") + } +} + +func Test_ForeverValues(test *testing.T) { + series := newWindowSeriesForeverImpl[float64, int]() + shouldMatch := make([]utilities.Optional[float64], 0) + previous := float64(1.0) + for i := range utilities.Iota(1, 81) { + previous *= 1.059 + series.Reserve(i) + series.Fill(i, float64(previous)) + shouldMatch = append(shouldMatch, utilities.Some[float64](previous)) + } + + if !reflect.DeepEqual(utilities.Reverse(shouldMatch), series.GetValues()) { + test.Fatalf("Values() on infinite mathematical series does not work.") + } +} + +func Test_WindowOnlySequentialIncreasesAlwaysLessThan(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[float64, int](10) + previous := float64(1.0) + for i := range utilities.Iota(1, 11) { + previous *= 1.5 + series.Reserve(i) + series.Fill(i, float64(previous)) + } + if complete, islt, maxSeqIncrease := AllSequentialIncreasesLessThan[float64, int](series, + 100); !complete || maxSeqIncrease != 50.0 || !islt { + test.Fatalf( + "(Window Only) Sequential increases are not always less than 100 (%v, %v, %f ).", + complete, islt, maxSeqIncrease, + ) + } +} + +func Test_WindowOnlyTooFewInstantsSequentialIncreasesLessThanAlwaysFalse(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[float64, int](500) + series.Reserve(1) + series.Fill(1, 0.0) + if complete, islt, _ := AllSequentialIncreasesLessThan[float64, int](series, 0.0); complete || islt { + test.Fatalf( + "(Window Only) 0 elements in a series should always yield false when asking if sequential increases are less than a value.", + ) + } +} + +func Test_Forever_Complete(test *testing.T) { + series := newWindowSeriesForeverImpl[int, int]() + series.Reserve(1) + series.Fill(1, 10) + if !series.Complete() { + test.Fatalf("(infinite) Series with one element and a window size of 1 is not complete.") + } +} + +func Test_Forever_CompleteN(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[float64, int](10) + previous := float64(1.0) + for i := range utilities.Iota(1, 11) { + previous *= 1.5 + series.Reserve(i) + series.Fill(i, float64(previous)) + } + if !series.Complete() { + test.Fatalf("(infinite) Series with one element and a window size of 2 is complete.") + } +} + +func Test_Forever_degenerate_percentile_too_high(test *testing.T) { + series := newWindowSeriesForeverImpl[int, int]() + if complete, result := Percentile[int, int](series, 101); !complete || result != 0.0 { + test.Fatalf("(infinite) Series percentile of 101 failed.") + } +} + +func Test_Forever_degenerate_percentile_too_low(test *testing.T) { + series := newWindowSeriesForeverImpl[int, int]() + if complete, result := Percentile[int, int](series, -1); !complete || result != 0.0 { + test.Fatalf("(infinite) Series percentile of -1 failed.") + } +} + +/////////// + +func Test_Forever90_percentile(test *testing.T) { + var expected int = 10 + series := newWindowSeriesForeverImpl[int, int]() + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(10, 10) + series.Fill(9, 9) + series.Fill(8, 8) + series.Fill(7, 7) + series.Fill(6, 6) + series.Fill(5, 5) + series.Fill(4, 4) + series.Fill(3, 3) + series.Fill(2, 2) + series.Fill(1, 1) + + if complete, result := Percentile[int, int](series, 90); !complete || result != expected { + test.Fatalf( + "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_Forever90_WindowOnly_percentile(test *testing.T) { + var expected int = 10 + series := newWindowSeriesForeverImpl[int, int]() + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(10, 10) + series.Fill(9, 9) + series.Fill(8, 8) + series.Fill(7, 7) + series.Fill(6, 6) + series.Fill(5, 5) + series.Fill(4, 4) + series.Fill(3, 3) + series.Fill(2, 2) + series.Fill(1, 1) + + if complete, result := Percentile[int, int](series, 90); !complete || result != expected { + test.Fatalf( + "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_Forever90_percentile_reversed(test *testing.T) { + var expected int = 10 + series := newWindowSeriesForeverImpl[int, int]() + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(10, 1) + series.Fill(9, 2) + series.Fill(8, 3) + series.Fill(7, 4) + series.Fill(6, 5) + series.Fill(5, 6) + series.Fill(4, 7) + series.Fill(3, 8) + series.Fill(2, 9) + series.Fill(1, 10) + + if complete, result := Percentile[int, int](series, 90); !complete || result != expected { + test.Fatalf( + "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_Forever50_percentile_jumbled(test *testing.T) { + var expected int64 = 15 + series := newWindowSeriesForeverImpl[int64, int]() + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(1, 7) + series.Fill(2, 2) + series.Fill(3, 15) + series.Fill(4, 27) + series.Fill(5, 5) + series.Fill(6, 52) + series.Fill(7, 18) + series.Fill(8, 23) + series.Fill(9, 11) + series.Fill(10, 12) + + if complete, result := Percentile[int64, int](series, 50); !complete || result != expected { + test.Fatalf( + "Series 50 percentile of a jumble of numbers failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_Forever90_partial_percentile(test *testing.T) { + var expected int = 10 + series := newWindowSeriesForeverImpl[int, int]() + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(10, 10) + series.Fill(9, 9) + series.Fill(8, 8) + series.Fill(7, 7) + series.Fill(6, 6) + series.Fill(5, 5) + series.Fill(4, 4) + series.Fill(3, 3) + series.Fill(2, 2) + series.Fill(1, 1) + + if complete, result := Percentile[int, int](series, 90); !complete || result != expected { + test.Fatalf( + "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_Forever90_partial_percentile_reversed(test *testing.T) { + var expected int = 10 + series := newWindowSeriesForeverImpl[int, int]() + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(10, 1) + series.Fill(9, 2) + series.Fill(8, 3) + series.Fill(7, 4) + series.Fill(6, 5) + series.Fill(5, 6) + series.Fill(4, 7) + series.Fill(3, 8) + series.Fill(2, 9) + series.Fill(1, 10) + + if complete, result := Percentile[int, int](series, 90); !complete || result != expected { + test.Fatalf( + "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_Forever50_partial_percentile_jumbled(test *testing.T) { + var expected int64 = 15 + series := newWindowSeriesForeverImpl[int64, int]() + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(1, 7) + series.Fill(2, 2) + series.Fill(3, 15) + series.Fill(4, 27) + series.Fill(5, 5) + series.Fill(6, 52) + series.Fill(7, 18) + series.Fill(8, 23) + series.Fill(9, 11) + series.Fill(10, 12) + + if complete, result := Percentile[int64, int](series, 50); !complete || result != expected { + test.Fatalf( + "Series 50 percentile of a jumble of numbers failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +/////////////////////// + +func Test_WindowOnlySequentialIncreasesAlwaysLessThanWithWraparound(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[float64, int](20) + previous := float64(1.0) + for i := range utilities.Iota(1, 21) { + previous *= 1.15 + series.Reserve(i) + series.Fill(1, float64(previous)) + } + + // All those measurements should be ejected by the following + // loop! + for i := range utilities.Iota(1, 21) { + previous *= 1.10 + series.Reserve(i + 20) + series.Fill(i+20, float64(previous)) + } + + if complete, islt, maxSeqIncrease := AllSequentialIncreasesLessThan[float64, int](series, + 11.0); !complete || !islt || !utilities.ApproximatelyEqual(maxSeqIncrease, 10, 0.1) { + test.Fatalf( + "Sequential increases are not always less than 11.0 in wraparound situation (%f v 11.0).", + maxSeqIncrease, + ) + } +} + +func Test_WindowOnlySequentialIncreasesAlwaysLessThanWithWraparoundInverse(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[float64, int](20) + previous := float64(1.0) + i := 0 + for i = range utilities.Iota(1, 21) { + previous *= 1.15 + series.Reserve(i) + series.Fill(i, float64(previous)) + } + + // *Not* all those measurements should be ejected by the following + // loop! + for j := range utilities.Iota(1, 16) { + previous *= 1.10 + series.Reserve(i + j) + series.Fill(i+j, float64(previous)) + } + + if complete, islt, maxSeqIncrease := AllSequentialIncreasesLessThan[float64, int](series, 11.0); complete == false || islt { + test.Fatalf( + "Sequential increases are (unexpectedly) always less than 11.0 in wraparound situation: %f v 11.0.", + maxSeqIncrease, + ) + } +} + +func Test_WindowOnlyStandardDeviationIncompleteCalculation(test *testing.T) { + expected := 2.93 + series := newWindowSeriesWindowOnlyImpl[float64, int](6) + // 5.7, 1.0, 8.6, 7.4, 2.2 + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + + series.Fill(1, 5.7) + series.Fill(2, 1.0) + series.Fill(3, 8.6) + series.Fill(4, 7.4) + series.Fill(5, 2.2) + + if complete, sd := SeriesStandardDeviation[float64, int](series); complete != false || + !utilities.ApproximatelyEqual(sd, expected, 0.01) { + test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd) + } else { + test.Logf("Standard deviation calculation result: %v", sd) + } +} + +func Test_WindowOnlyStandardDeviationCalculation(test *testing.T) { + expected := 2.93 + series := newWindowSeriesWindowOnlyImpl[float64, int](5) + // 5.7, 1.0, 8.6, 7.4, 2.2 + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + series.Reserve(11) + series.Reserve(12) + series.Reserve(13) + + series.Fill(1, 5.7) + series.Fill(2, 5.7) + series.Fill(3, 5.7) + series.Fill(4, 5.7) + series.Fill(5, 5.7) + series.Fill(6, 5.7) + series.Fill(7, 5.7) + series.Fill(8, 5.7) + series.Fill(9, 5.7) + series.Fill(10, 1.0) + series.Fill(11, 8.6) + series.Fill(12, 7.4) + series.Fill(13, 2.2) + + if complete, sd := SeriesStandardDeviation[float64, int](series); complete != true || + !utilities.ApproximatelyEqual(sd, expected, 0.01) { + test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd) + } +} + +func Test_WindowOnlyStandardDeviationCalculation2(test *testing.T) { + expected := 1.41 + series := newWindowSeriesWindowOnlyImpl[float64, int](5) + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + + series.Fill(1, 8) + series.Fill(2, 9) + series.Fill(3, 10) + series.Fill(4, 11) + series.Fill(5, 12) + + if _, sd := SeriesStandardDeviation[float64, int](series); !utilities.ApproximatelyEqual(sd, expected, 0.01) { + test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd) + } else { + test.Logf("Standard deviation calculation result: %v", sd) + } +} + +func Test_WindowOnlyRotatingValues(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[int, int](5) + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + + series.Reserve(6) + series.Reserve(7) + + series.Fill(1, 1) + series.Fill(2, 2) + series.Fill(3, 3) + series.Fill(4, 4) + series.Fill(5, 5) + + series.Fill(6, 6) + series.Fill(7, 7) + + actual := utilities.Fmap(series.GetValues(), func(i utilities.Optional[int]) int { + return utilities.GetSome[int](i) + }) + if !reflect.DeepEqual([]int{7, 6, 5, 4, 3}, actual) { + test.Fatalf("Adding values does not properly erase earlier values.") + } +} + +func Test_WindowOnly_degenerate_percentile_too_high(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[int, int](21) + if complete, p := Percentile[int, int](series, 101); complete != false || p != 0 { + test.Fatalf("Series percentile of 101 failed.") + } +} + +func Test_WindowOnly_degenerate_percentile_too_low(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[int, int](21) + if complete, p := Percentile[int, int](series, -1); complete != false || p != 0 { + test.Fatalf("Series percentile of -1 failed.") + } +} + +func Test_WindowOnly90_percentile(test *testing.T) { + var expected int = 10 + series := newWindowSeriesWindowOnlyImpl[int, int](10) + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(10, 10) + series.Fill(9, 9) + series.Fill(8, 8) + series.Fill(7, 7) + series.Fill(6, 6) + series.Fill(5, 5) + series.Fill(4, 4) + series.Fill(3, 3) + series.Fill(2, 2) + series.Fill(1, 1) + + if complete, result := Percentile[int, int](series, 90); !complete || result != expected { + test.Fatalf( + "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_WindowOnly90_percentile_reversed(test *testing.T) { + var expected int = 10 + series := newWindowSeriesWindowOnlyImpl[int, int](10) + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(10, 1) + series.Fill(9, 2) + series.Fill(8, 3) + series.Fill(7, 4) + series.Fill(6, 5) + series.Fill(5, 6) + series.Fill(4, 7) + series.Fill(3, 8) + series.Fill(2, 9) + series.Fill(1, 10) + + if complete, result := Percentile[int, int](series, 90); !complete || result != expected { + test.Fatalf( + "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_WindowOnly50_percentile_jumbled(test *testing.T) { + var expected int64 = 15 + series := newWindowSeriesWindowOnlyImpl[int64, int](10) + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(1, 7) + series.Fill(2, 2) + series.Fill(3, 15) + series.Fill(4, 27) + series.Fill(5, 5) + series.Fill(6, 52) + series.Fill(7, 18) + series.Fill(8, 23) + series.Fill(9, 11) + series.Fill(10, 12) + + if complete, result := Percentile[int64, int](series, 50); !complete || result != expected { + test.Fatalf( + "Series 50 percentile of a jumble of numbers failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_WindowOnly90_partial_percentile(test *testing.T) { + var expected int = 10 + series := newWindowSeriesWindowOnlyImpl[int, int](20) + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(10, 10) + series.Fill(9, 9) + series.Fill(8, 8) + series.Fill(7, 7) + series.Fill(6, 6) + series.Fill(5, 5) + series.Fill(4, 4) + series.Fill(3, 3) + series.Fill(2, 2) + series.Fill(1, 1) + + if complete, result := Percentile[int, int](series, 90); complete != false || result != expected { + test.Fatalf( + "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_WindowOnly90_partial_percentile_reversed(test *testing.T) { + var expected int = 10 + series := newWindowSeriesWindowOnlyImpl[int, int](20) + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(10, 1) + series.Fill(9, 2) + series.Fill(8, 3) + series.Fill(7, 4) + series.Fill(6, 5) + series.Fill(5, 6) + series.Fill(4, 7) + series.Fill(3, 8) + series.Fill(2, 9) + series.Fill(1, 10) + + if complete, result := Percentile[int, int](series, 90); complete != false || result != expected { + test.Fatalf( + "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_WindowOnly50_partial_percentile_jumbled(test *testing.T) { + var expected int64 = 15 + series := newWindowSeriesWindowOnlyImpl[int64, int](20) + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(1, 7) + series.Fill(2, 2) + series.Fill(3, 15) + series.Fill(4, 27) + series.Fill(5, 5) + series.Fill(6, 52) + series.Fill(7, 18) + series.Fill(8, 23) + series.Fill(9, 11) + series.Fill(10, 12) + + if complete, result := Percentile[int64, int](series, 50); complete != false || result != expected { + test.Fatalf( + "Series 50 percentile of a jumble of numbers failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +/* + +func Test_WindowOnlyDoubleSidedTrimmedMean_jumbled(test *testing.T) { + expected := 8 + series := newWindowSeriesWindowOnlyImpl[int64, int](10) + series.AddElement(7) + series.AddElement(2) + series.AddElement(15) + series.AddElement(27) + series.AddElement(5) + series.AddElement(5) + series.AddElement(52) + series.AddElement(18) + series.AddElement(23) + series.AddElement(11) + series.AddElement(12) + + trimmed := series.DoubleSidedTrim(10) + + if trimmed.Len() != expected { + test.Fatalf( + "WindowOnly series is not of the proper size. Expected %v and got %v", + expected, + trimmed.Len(), + ) + } + + prev := int64(0) + for _, v := range trimmed.Values() { + if !(prev <= v) { + test.Fatalf("Not sorted: %v is not less than or equal to %v\n", prev, v) + } + prev = v + } +} + +func Test_WindowOnlyAverage(test *testing.T) { + expected := 1.0082230220488836e+08 + series := newWindowSeriesWindowOnlyImpl[float64, int](4) + series.AddElement(9.94747772516195e+07) + series.AddElement(9.991286984703423e+07) + series.AddElement(1.0285437111086299e+08) + series.AddElement(1.0104719061003672e+08) + if average := series.CalculateAverage(); !utilities.ApproximatelyEqual(average, 0.01, expected) { + test.Fatalf( + "Expected: %v; Actual: %v.", average, expected, + ) + } +} +*/ + +func Test_ForeverStandardDeviationIncompleteCalculation(test *testing.T) { + foreverExpected := 2.90 + series := newWindowSeriesForeverImpl[float64, int]() + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + + series.Fill(1, 5.7) + series.Fill(2, 1.0) + series.Fill(3, 8.6) + series.Fill(4, 7.4) + series.Fill(5, 2.2) + series.Fill(6, 8.0) + + if complete, sd := SeriesStandardDeviation[float64, int](series); !complete || + !utilities.ApproximatelyEqual(sd, foreverExpected, 0.01) { + test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", foreverExpected, sd) + } +} + +func Test_ForeverStandardDeviationCalculation2(test *testing.T) { + expected := 1.41 + series := newWindowSeriesForeverImpl[float64, int]() + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + + series.Fill(1, 8) + series.Fill(2, 9) + series.Fill(3, 10) + series.Fill(4, 11) + series.Fill(5, 12) + + if _, sd := SeriesStandardDeviation[float64, int](series); !utilities.ApproximatelyEqual(sd, expected, 0.01) { + test.Fatalf("Standard deviation(series) max calculation(series) failed: Expected: %v; Actual: %v.", expected, sd) + } +} diff --git a/series/statistics.go b/series/statistics.go new file mode 100644 index 0000000..2742aa7 --- /dev/null +++ b/series/statistics.go @@ -0,0 +1,74 @@ +package series + +import ( + "github.com/network-quality/goresponsiveness/utilities" + "golang.org/x/exp/constraints" +) + +func SeriesStandardDeviation[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket]) (bool, float64) { + complete := s.Complete() + + inputValues := s.GetValues() + + actualValues := utilities.Filter(inputValues, func(d utilities.Optional[Data]) bool { + return utilities.IsSome[Data](d) + }) + values := utilities.Fmap(actualValues, func(d utilities.Optional[Data]) Data { return utilities.GetSome[Data](d) }) + + return complete, utilities.CalculateStandardDeviation[Data](values) +} + +func Percentile[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], p int) (bool, Data) { + complete := s.Complete() + + inputValues := s.GetValues() + + actualValues := utilities.Filter(inputValues, func(d utilities.Optional[Data]) bool { + return utilities.IsSome[Data](d) + }) + values := utilities.Fmap(actualValues, func(d utilities.Optional[Data]) Data { return utilities.GetSome[Data](d) }) + + return complete, utilities.CalculatePercentile(values, p) +} + +func AllSequentialIncreasesLessThan[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], limit float64, +) (bool, bool, float64) { + complete := s.Complete() + + inputValues := s.GetValues() + + actualValues := utilities.Filter(utilities.Reverse(inputValues), func(d utilities.Optional[Data]) bool { + return utilities.IsSome[Data](d) + }) + values := utilities.Fmap(actualValues, func(d utilities.Optional[Data]) Data { return utilities.GetSome[Data](d) }) + + result, actualLimit := utilities.AllSequentialIncreasesLessThan(values, limit) + return complete, result, actualLimit +} + +func CalculateAverage[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket]) (bool, float64) { + complete := s.Complete() + + inputValues := s.GetValues() + + actualValues := utilities.Filter(inputValues, func(d utilities.Optional[Data]) bool { + return utilities.IsSome[Data](d) + }) + values := utilities.Fmap(actualValues, func(d utilities.Optional[Data]) Data { return utilities.GetSome[Data](d) }) + + return complete, utilities.CalculateAverage(values) +} + +func TrimmedMean[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], trim int) (bool, float64, []Data) { + complete := s.Complete() + + inputValues := s.GetValues() + + actualValues := utilities.Filter(inputValues, func(d utilities.Optional[Data]) bool { + return utilities.IsSome[Data](d) + }) + values := utilities.Fmap(actualValues, func(d utilities.Optional[Data]) Data { return utilities.GetSome[Data](d) }) + + trimmedMean, trimmedElements := utilities.TrimmedMean(values, trim) + return complete, trimmedMean, trimmedElements +} diff --git a/stabilizer/algorithm.go b/stabilizer/algorithm.go index 45c34d9..86b4bb6 100644 --- a/stabilizer/algorithm.go +++ b/stabilizer/algorithm.go @@ -19,19 +19,23 @@ import ( "sync" "github.com/network-quality/goresponsiveness/debug" - "github.com/network-quality/goresponsiveness/ms" + "github.com/network-quality/goresponsiveness/series" + "github.com/network-quality/goresponsiveness/utilities" "golang.org/x/exp/constraints" ) -type MeasurementStablizer[T constraints.Float | constraints.Integer] struct { - instantaneousses ms.MathematicalSeries[T] - aggregates ms.MathematicalSeries[float64] +type MeasurementStablizer[Data constraints.Float | constraints.Integer, Bucket constraints.Ordered] struct { + // The number of instantaneous measurements in the current interval could be infinite (Forever). + instantaneousses series.WindowSeries[Data, Bucket] + // There are a fixed, finite number of aggregates (WindowOnly). + aggregates series.WindowSeries[series.WindowSeries[Data, Bucket], int] stabilityStandardDeviation float64 trimmingLevel uint m sync.Mutex dbgLevel debug.DebugLevel dbgConfig *debug.DebugWithPrefix units string + currentInterval int } // Stabilizer parameters: @@ -54,77 +58,159 @@ type MeasurementStablizer[T constraints.Float | constraints.Integer] struct { // If the calculated standard deviation of *those* values is less than SDT, we declare // stability. -func NewStabilizer[T constraints.Float | constraints.Integer]( - mad uint, +func NewStabilizer[Data constraints.Float | constraints.Integer, Bucket constraints.Ordered]( + mad int, sdt float64, trimmingLevel uint, units string, debugLevel debug.DebugLevel, debug *debug.DebugWithPrefix, -) MeasurementStablizer[T] { - return MeasurementStablizer[T]{ - instantaneousses: ms.NewCappedMathematicalSeries[T](mad), - aggregates: ms.NewCappedMathematicalSeries[float64](mad), +) MeasurementStablizer[Data, Bucket] { + return MeasurementStablizer[Data, Bucket]{ + instantaneousses: series.NewWindowSeries[Data, Bucket](series.Forever, 0), + aggregates: series.NewWindowSeries[ + series.WindowSeries[Data, Bucket], int](series.WindowOnly, mad), stabilityStandardDeviation: sdt, trimmingLevel: trimmingLevel, units: units, + currentInterval: 0, dbgConfig: debug, dbgLevel: debugLevel, } } -func (r3 *MeasurementStablizer[T]) AddMeasurement(measurement T) { +func (r3 *MeasurementStablizer[Data, Bucket]) Reserve(bucket Bucket) { + r3.m.Lock() + defer r3.m.Unlock() + r3.instantaneousses.Reserve(bucket) +} + +func (r3 *MeasurementStablizer[Data, Bucket]) AddMeasurement(bucket Bucket, measurement Data) { + r3.m.Lock() + defer r3.m.Unlock() + + // Fill in the bucket in the current interval. + if err := r3.instantaneousses.Fill(bucket, measurement); err != nil { + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf("%s: A bucket (with id %v) does not exist in the isntantaneousses.\n", + r3.dbgConfig.String(), + bucket) + } + } + + // The result may have been retired from the current interval. Look in the older series + // to fill it in there if it is. + r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) { + if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) { + md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md) + if err := md.Fill(bucket, measurement); err != nil { + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf("%s: A bucket (with id %v) does not exist in a historical window.\n", + r3.dbgConfig.String(), + bucket) + } + } + } + }) + + /* + // Add this instantaneous measurement to the mix of the MAD previous instantaneous measurements. + r3.instantaneousses.Fill(bucket, measurement) + // Calculate the moving average of the MAD previous instantaneous measurements (what the + // algorithm calls moving average aggregate throughput at interval p) and add it to + // the mix of MAD previous moving averages. + + r3.aggregates.AutoFill(r3.instantaneousses.CalculateAverage()) + + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf( + "%s: MA: %f Mbps (previous %d intervals).\n", + r3.dbgConfig.String(), + r3.aggregates.CalculateAverage(), + r3.aggregates.Len(), + ) + } + */ +} + +func (r3 *MeasurementStablizer[Data, Bucket]) Interval() { r3.m.Lock() defer r3.m.Unlock() - // Add this instantaneous measurement to the mix of the MAD previous instantaneous measurements. - r3.instantaneousses.AddElement(measurement) - // Calculate the moving average of the MAD previous instantaneous measurements (what the - // algorithm calls moving average aggregate throughput at interval p) and add it to - // the mix of MAD previous moving averages. - r3.aggregates.AddElement(r3.instantaneousses.CalculateAverage()) if debug.IsDebug(r3.dbgLevel) { fmt.Printf( - "%s: MA: %f Mbps (previous %d intervals).\n", + "%s: stability interval marked (transitioning from %d to %d).\n", r3.dbgConfig.String(), - r3.aggregates.CalculateAverage(), - r3.aggregates.Len(), + r3.currentInterval, + r3.currentInterval+1, ) } -} -func (r3 *MeasurementStablizer[T]) IsStable() bool { - // There are MAD number of measurements of the _moving average aggregate throughput - // at interval p_ in movingAverages. - isvalid, stddev := r3.aggregates.StandardDeviation() + // At the interval boundary, move the instantaneous series to + // the aggregates and start a new instantaneous series. + r3.aggregates.Reserve(r3.currentInterval) + r3.aggregates.Fill(r3.currentInterval, r3.instantaneousses) - if !isvalid { - // If there are not enough values in the series to be able to calculate a - // standard deviation, then we know that we are not yet stable. Vamoose. - return false - } + r3.instantaneousses = series.NewWindowSeries[Data, Bucket](series.Forever, 0) + r3.currentInterval++ +} - // Stability is determined by whether or not the standard deviation of the values - // is within some percentage of the average. - stabilityCutoff := r3.aggregates.CalculateAverage() * (r3.stabilityStandardDeviation / 100.0) - isStable := stddev <= stabilityCutoff +func (r3 *MeasurementStablizer[Data, Bucket]) IsStable() bool { + r3.m.Lock() + defer r3.m.Unlock() if debug.IsDebug(r3.dbgLevel) { fmt.Printf( - "%s: Is Stable? %v; Standard Deviation: %f %s; Is Normally Distributed? %v; Standard Deviation Cutoff: %v %s).\n", + "%s: Determining stability in the %d th interval.\n", r3.dbgConfig.String(), - isStable, - stddev, - r3.units, - r3.aggregates.IsNormallyDistributed(), - stabilityCutoff, - r3.units, + r3.currentInterval, ) - fmt.Printf("%s: Values: ", r3.dbgConfig.String()) - for _, v := range r3.aggregates.Values() { - fmt.Printf("%v, ", v) + } + // Determine if + // a) All the aggregates have values, + // b) All the aggregates are complete. + allComplete := true + r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) { + if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) { + md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md) + allComplete = md.Complete() + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf("%s\n", md.String()) + } + } else { + allComplete = false } - fmt.Printf("\n") + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf( + "%s: The aggregate for the %d th interval was %s.\n", + r3.dbgConfig.String(), + b, + utilities.Conditional(allComplete, "complete", "incomplete"), + ) + } + }) + + if !allComplete { + return false } + + // Calculate the averages of each of the aggregates. + averages := make([]float64, 0) + r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) { + if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) { + md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md) + _, average := series.CalculateAverage(md) + averages = append(averages, average) + } + }) + + // Calculate the standard deviation of the averages of the aggregates. + sd := utilities.CalculateStandardDeviation(averages) + + // Take a percentage of the average of the averages of the aggregates ... + stabilityCutoff := utilities.CalculateAverage(averages) * (r3.stabilityStandardDeviation / 100.0) + // and compare that to the standard deviation to determine stability. + isStable := sd <= stabilityCutoff + return isStable } diff --git a/stabilizer/stabilizer.go b/stabilizer/stabilizer.go index b582751..b157ce1 100644 --- a/stabilizer/stabilizer.go +++ b/stabilizer/stabilizer.go @@ -14,7 +14,12 @@ package stabilizer -type Stabilizer[T any] interface { - AddMeasurement(T) +import ( + "golang.org/x/exp/constraints" +) + +type Stabilizer[Data any, Bucket constraints.Ordered] interface { + Interval() + AddMeasurement(Data, Bucket) IsStable() bool } diff --git a/utilities/math.go b/utilities/math.go new file mode 100644 index 0000000..1ecca61 --- /dev/null +++ b/utilities/math.go @@ -0,0 +1,144 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package utilities + +import ( + "math" + "sort" + + "golang.org/x/exp/constraints" + "golang.org/x/exp/slices" +) + +type Number interface { + constraints.Float | constraints.Integer +} + +func CalculateAverage[T Number](elements []T) float64 { + total := T(0) + for i := 0; i < len(elements); i++ { + total += elements[i] + } + return float64(total) / float64(len(elements)) +} + +func CalculatePercentile[T Number]( + elements []T, + p int, +) (result T) { + result = T(0) + if p < 0 || p > 100 { + return + } + + sort.Slice(elements, func(l int, r int) bool { return elements[l] < elements[r] }) + pindex := int64((float64(p) / float64(100)) * float64(len(elements))) + result = elements[pindex] + return +} + +func Max(x, y uint64) uint64 { + if x > y { + return x + } + return y +} + +func SignedPercentDifference[T constraints.Float | constraints.Integer]( + current T, + previous T, +) (difference float64) { + fCurrent := float64(current) + fPrevious := float64(previous) + return ((fCurrent - fPrevious) / fPrevious) * 100.0 +} + +func AbsPercentDifference( + current float64, + previous float64, +) (difference float64) { + return (math.Abs(current-previous) / (float64(current+previous) / 2.0)) * float64( + 100, + ) +} + +func CalculateStandardDeviation[T constraints.Float | constraints.Integer](elements []T) float64 { + // From https://www.mathsisfun.com/data/standard-deviation-calculator.html + // Yes, for real! + + // Calculate the average of the numbers ... + average := CalculateAverage(elements) + + // Calculate the square of each of the elements' differences from the mean. + differences_squared := make([]float64, len(elements)) + for index, value := range elements { + differences_squared[index] = math.Pow(float64(value-T(average)), 2) + } + + // The variance is the average of the squared differences. + // So, we need to ... + + // Accumulate all those squared differences. + sds := float64(0) + for _, dss := range differences_squared { + sds += dss + } + + // And then divide that total by the number of elements + variance := sds / float64(len(elements)) + + // Finally, the standard deviation is the square root + // of the variance. + sd := float64(math.Sqrt(variance)) + // sd := T(variance) + + return sd +} + +func AllSequentialIncreasesLessThan[T Number](elements []T, limit float64) (bool, float64) { + if len(elements) < 2 { + return false, 0.0 + } + + maximumSequentialIncrease := float64(0) + for i := 1; i < len(elements); i++ { + current := elements[i] + previous := elements[i-1] + percentChange := SignedPercentDifference(current, previous) + if percentChange > limit { + return false, percentChange + } + if percentChange > float64(maximumSequentialIncrease) { + maximumSequentialIncrease = percentChange + } + } + return true, maximumSequentialIncrease +} + +// elements must already be sorted! +func TrimBy[T Number](elements []T, trim int) []T { + numberToKeep := int(float32(len(elements)) * (float32(trim) / 100.0)) + + return elements[:numberToKeep] +} + +func TrimmedMean[T Number](elements []T, trim int) (float64, []T) { + sortedElements := make([]T, len(elements)) + copy(sortedElements, elements) + slices.Sort(sortedElements) + + trimmedElements := TrimBy(sortedElements, trim) + return CalculateAverage(trimmedElements), trimmedElements +} diff --git a/utilities/utilities.go b/utilities/utilities.go index ff04023..b976f77 100644 --- a/utilities/utilities.go +++ b/utilities/utilities.go @@ -21,13 +21,10 @@ import ( "math/rand" "os" "reflect" - "sort" "strings" "sync" "sync/atomic" "time" - - "golang.org/x/exp/constraints" ) // GitVersion is the Git revision hash @@ -46,24 +43,6 @@ func IsInterfaceNil(ifc interface{}) bool { (reflect.ValueOf(ifc).Kind() == reflect.Ptr && reflect.ValueOf(ifc).IsNil()) } -func SignedPercentDifference[T constraints.Float | constraints.Integer]( - current T, - previous T, -) (difference float64) { - fCurrent := float64(current) - fPrevious := float64(previous) - return ((fCurrent - fPrevious) / fPrevious) * 100.0 -} - -func AbsPercentDifference( - current float64, - previous float64, -) (difference float64) { - return (math.Abs(current-previous) / (float64(current+previous) / 2.0)) * float64( - 100, - ) -} - func Conditional[T any](condition bool, t T, f T) T { if condition { return t @@ -137,13 +116,6 @@ func RandBetween(max int) int { return rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % max } -func Max(x, y uint64) uint64 { - if x > y { - return x - } - return y -} - func ChannelToSlice[S any](channel <-chan S) (slice []S) { slice = make([]S, 0) for element := range channel { @@ -152,6 +124,26 @@ func ChannelToSlice[S any](channel <-chan S) (slice []S) { return } +func Reverse[T any](elements []T) []T { + result := make([]T, len(elements)) + iterator := len(elements) - 1 + for _, v := range elements { + result[iterator] = v + iterator-- + } + return result +} + +func Filter[S any](elements []S, filterer func(S) bool) []S { + result := make([]S, 0) + for _, s := range elements { + if filterer(s) { + result = append(result, s) + } + } + return result +} + func Fmap[S any, F any](elements []S, mapper func(S) F) []F { result := make([]F, 0) for _, s := range elements { @@ -160,13 +152,6 @@ func Fmap[S any, F any](elements []S, mapper func(S) F) []F { return result } -func CalculatePercentile[S float32 | int32 | float64 | int64](elements []S, percentile int) S { - sort.Slice(elements, func(a, b int) bool { return elements[a] < elements[b] }) - elementsCount := len(elements) - percentileIdx := elementsCount * (percentile / 100) - return elements[percentileIdx] -} - func OrTimeout(f func(), timeout time.Duration) { completeChannel := func() chan interface{} { completed := make(chan interface{}) @@ -227,9 +212,9 @@ func ContextSignaler(ctxt context.Context, st time.Duration, condition *func() b } } -type Pair[T any] struct { - First T - Second T +type Pair[T1, T2 any] struct { + First T1 + Second T2 } func PerSecondToInterval(rate int64) time.Duration { diff --git a/utilities/utilities_test.go b/utilities/utilities_test.go index 9cd4ef0..7f3d83a 100644 --- a/utilities/utilities_test.go +++ b/utilities/utilities_test.go @@ -126,3 +126,65 @@ func TestPerSecondToInterval(t *testing.T) { t.Fatalf("Something that happens twice per second should happen every 5000ns.") } } + +func TestTrim(t *testing.T) { + elements := Iota(1, 101) + + trimmedElements := TrimBy(elements, 75) + + trimmedLength := len(trimmedElements) + trimmedLast := trimmedElements[trimmedLength-1] + + if trimmedLength != 75 || trimmedLast != 75 { + t.Fatalf("When trimming, the length should be 75 but it is %d and/or the last element should be 75 but it is %d", trimmedLength, trimmedLast) + } +} + +func TestTrim2(t *testing.T) { + elements := Iota(1, 11) + + trimmedElements := TrimBy(elements, 75) + + trimmedLength := len(trimmedElements) + trimmedLast := trimmedElements[trimmedLength-1] + + if trimmedLength != 7 || trimmedLast != 7 { + t.Fatalf("When trimming, the length should be 7 but it is %d and/or the last element should be 7 but it is %d", trimmedLength, trimmedLast) + } +} + +func TestTrim3(t *testing.T) { + elements := Iota(1, 6) + + trimmedElements := TrimBy(elements, 101) + + trimmedLength := len(trimmedElements) + trimmedLast := trimmedElements[trimmedLength-1] + + if trimmedLength != 5 || trimmedLast != 5 { + t.Fatalf("When trimming, the length should be 5 but it is %d and/or the last element should be 5 but it is %d", trimmedLength, trimmedLast) + } +} + +func TestTrim4(t *testing.T) { + elements := Iota(1, 11) + + trimmedElements := TrimBy(elements, 81) + + trimmedLength := len(trimmedElements) + trimmedLast := trimmedElements[trimmedLength-1] + + if trimmedLength != 8 || trimmedLast != 8 { + t.Fatalf("When trimming, the length should be 8 but it is %d and/or the last element should be 8 but it is %d", trimmedLength, trimmedLast) + } +} + +func TestTrimmedMean(t *testing.T) { + expected := 2.5 + elements := []int{5, 4, 3, 2, 1} + + result, elements := TrimmedMean(elements, 80) + if result != expected || len(elements) != 4 || elements[len(elements)-1] != 4 { + t.Fatalf("The trimmed mean result %v does not match the expected value %v", result, expected) + } +} |
