diff options
| -rw-r--r-- | Makefile | 2 | ||||
| -rw-r--r-- | direction/direction.go | 29 | ||||
| -rw-r--r-- | ms/ms.go | 371 | ||||
| -rw-r--r-- | ms/ms_test.go | 431 | ||||
| -rw-r--r-- | networkQuality.go | 227 | ||||
| -rw-r--r-- | rpm/calculations.go | 62 | ||||
| -rw-r--r-- | rpm/rpm.go | 37 | ||||
| -rw-r--r-- | series/message.go | 19 | ||||
| -rw-r--r-- | series/series.go | 280 | ||||
| -rw-r--r-- | series/series_test.go | 846 | ||||
| -rw-r--r-- | series/statistics.go | 74 | ||||
| -rw-r--r-- | stabilizer/algorithm.go | 176 | ||||
| -rw-r--r-- | stabilizer/stabilizer.go | 9 | ||||
| -rw-r--r-- | utilities/math.go | 144 | ||||
| -rw-r--r-- | utilities/utilities.go | 61 | ||||
| -rw-r--r-- | utilities/utilities_test.go | 62 |
16 files changed, 1835 insertions, 995 deletions
@@ -6,7 +6,7 @@ all: build test build: go build $(LDFLAGS) networkQuality.go test: - go test ./timeoutat/ ./traceable/ ./ms/ ./utilities/ ./lgc ./qualityattenuation ./rpm + go test ./timeoutat/ ./traceable/ ./utilities/ ./lgc ./qualityattenuation ./rpm ./series golines: find . -name '*.go' -exec ~/go/bin/golines -w {} \; clean: diff --git a/direction/direction.go b/direction/direction.go index 3ef6585..865a273 100644 --- a/direction/direction.go +++ b/direction/direction.go @@ -23,19 +23,18 @@ import ( ) type Direction struct { - DirectionLabel string - SelfProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] - ForeignProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] - ThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] - GranularThroughputDataLogger datalogger.DataLogger[rpm.GranularThroughputDataPoint] - CreateLgdc func() lgc.LoadGeneratingConnection - Lgcc lgc.LoadGeneratingConnectionCollection - DirectionDebugging *debug.DebugWithPrefix - ProbeDebugging *debug.DebugWithPrefix - ThroughputStabilizerDebugging *debug.DebugWithPrefix - ResponsivenessStabilizerDebugging *debug.DebugWithPrefix - LgStabilizationCommunicationChannel chan rpm.ThroughputDataPoint - ExtendedStatsEligible bool - StableThroughput bool - StableResponsiveness bool + DirectionLabel string + SelfProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] + ForeignProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] + ThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] + GranularThroughputDataLogger datalogger.DataLogger[rpm.GranularThroughputDataPoint] + CreateLgdc func() lgc.LoadGeneratingConnection + Lgcc lgc.LoadGeneratingConnectionCollection + DirectionDebugging *debug.DebugWithPrefix + ProbeDebugging *debug.DebugWithPrefix + ThroughputStabilizerDebugging *debug.DebugWithPrefix + ResponsivenessStabilizerDebugging *debug.DebugWithPrefix + ExtendedStatsEligible bool + StableThroughput bool + StableResponsiveness bool } diff --git a/ms/ms.go b/ms/ms.go deleted file mode 100644 index 06c7340..0000000 --- a/ms/ms.go +++ /dev/null @@ -1,371 +0,0 @@ -/* - * This file is part of Go Responsiveness. - * - * Go Responsiveness is free software: you can redistribute it and/or modify it under - * the terms of the GNU General Public License as published by the Free Software Foundation, - * either version 2 of the License, or (at your option) any later version. - * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY - * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A - * PARTICULAR PURPOSE. See the GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. - */ - -package ms - -import ( - "fmt" - "math" - "sort" - - "github.com/network-quality/goresponsiveness/saturating" - "github.com/network-quality/goresponsiveness/utilities" - "golang.org/x/exp/constraints" -) - -type MathematicalSeries[T constraints.Float | constraints.Integer] interface { - AddElement(T) - CalculateAverage() float64 - AllSequentialIncreasesLessThan(float64) (bool, float64) - StandardDeviation() (bool, T) - IsNormallyDistributed() bool - Len() int - Values() []T - Percentile(int) T - DoubleSidedTrim(uint) MathematicalSeries[T] - Less(int, int) bool - Swap(int, int) -} - -func calculateAverage[T constraints.Integer | constraints.Float](elements []T) float64 { - total := T(0) - for i := 0; i < len(elements); i++ { - total += elements[i] - } - return float64(total) / float64(len(elements)) -} - -func calculatePercentile[T constraints.Integer | constraints.Float]( - elements []T, - p int, -) (result T) { - result = T(0) - if p < 0 || p > 100 { - return - } - - sort.Slice(elements, func(l int, r int) bool { return elements[l] < elements[r] }) - pindex := int64((float64(p) / float64(100)) * float64(len(elements))) - result = elements[pindex] - return -} - -type InfiniteMathematicalSeries[T constraints.Float | constraints.Integer] struct { - elements []T -} - -func NewInfiniteMathematicalSeries[T constraints.Float | constraints.Integer]() MathematicalSeries[T] { - return &InfiniteMathematicalSeries[T]{} -} - -func (ims *InfiniteMathematicalSeries[T]) Swap(i, j int) { - ims.elements[i], ims.elements[j] = ims.elements[j], ims.elements[i] -} - -func (ims *InfiniteMathematicalSeries[T]) Less(i, j int) bool { - return ims.elements[i] < ims.elements[j] -} - -func (ims *InfiniteMathematicalSeries[T]) DoubleSidedTrim(percent uint) MathematicalSeries[T] { - if percent >= 100 { - panic( - fmt.Sprintf("Cannot perform double-sided trim for an invalid percentage: %d", percent), - ) - } - - trimmed := &InfiniteMathematicalSeries[T]{} - trimmed.elements = make([]T, ims.Len()) - copy(trimmed.elements, ims.elements) - - sort.Sort(trimmed) - - elementsToTrim := uint64(float32(ims.Len()) * ((float32(percent)) / float32(100.0))) - trimmed.elements = trimmed.elements[elementsToTrim : len(trimmed.elements)-int(elementsToTrim)] - - return trimmed -} - -func (ims *InfiniteMathematicalSeries[T]) Copy() MathematicalSeries[T] { - newIms := InfiniteMathematicalSeries[T]{} - newIms.elements = make([]T, ims.Len()) - copy(newIms.elements, ims.elements) - return &newIms -} - -func (ims *InfiniteMathematicalSeries[T]) AddElement(element T) { - ims.elements = append(ims.elements, element) -} - -func (ims *InfiniteMathematicalSeries[T]) CalculateAverage() float64 { - return calculateAverage(ims.elements) -} - -func (ims *InfiniteMathematicalSeries[T]) AllSequentialIncreasesLessThan( - limit float64, -) (bool, float64) { - if len(ims.elements) < 2 { - return false, 0.0 - } - - maximumSequentialIncrease := float64(0) - for i := 1; i < len(ims.elements); i++ { - current := ims.elements[i] - previous := ims.elements[i-1] - percentChange := utilities.SignedPercentDifference(current, previous) - if percentChange > limit { - return false, percentChange - } - if percentChange > float64(maximumSequentialIncrease) { - maximumSequentialIncrease = percentChange - } - } - return true, maximumSequentialIncrease -} - -/* - * N.B.: Overflow is possible -- use at your discretion! - */ -func (ims *InfiniteMathematicalSeries[T]) StandardDeviation() (bool, T) { - // From https://www.mathsisfun.com/data/standard-deviation-calculator.html - // Yes, for real! - - // Calculate the average of the numbers ... - average := ims.CalculateAverage() - - // Calculate the square of each of the elements' differences from the mean. - differences_squared := make([]float64, len(ims.elements)) - for index, value := range ims.elements { - differences_squared[index] = math.Pow(float64(value-T(average)), 2) - } - - // The variance is the average of the squared differences. - // So, we need to ... - - // Accumulate all those squared differences. - sds := float64(0) - for _, dss := range differences_squared { - sds += dss - } - - // And then divide that total by the number of elements - variance := sds / float64(len(ims.elements)) - - // Finally, the standard deviation is the square root - // of the variance. - sd := T(math.Sqrt(variance)) - // sd := T(variance) - - return true, sd -} - -func (ims *InfiniteMathematicalSeries[T]) IsNormallyDistributed() bool { - return false -} - -func (ims *InfiniteMathematicalSeries[T]) Len() int { - return len(ims.elements) -} - -func (ims *InfiniteMathematicalSeries[T]) Values() []T { - return ims.elements -} - -func (ims *InfiniteMathematicalSeries[T]) Percentile(p int) T { - return calculatePercentile(ims.elements, p) -} - -type CappedMathematicalSeries[T constraints.Float | constraints.Integer] struct { - elements_count uint - elements []T - index uint - divisor *saturating.Saturating[uint] -} - -func NewCappedMathematicalSeries[T constraints.Float | constraints.Integer]( - instants_count uint, -) MathematicalSeries[T] { - return &CappedMathematicalSeries[T]{ - elements: make([]T, instants_count), - elements_count: instants_count, - divisor: saturating.NewSaturating(instants_count), - index: 0, - } -} - -func (ma *CappedMathematicalSeries[T]) AddElement(measurement T) { - ma.elements[ma.index] = measurement - ma.divisor.Add(1) - // Invariant: ma.index always points to the oldest measurement - ma.index = (ma.index + 1) % ma.elements_count -} - -func (ma *CappedMathematicalSeries[T]) CalculateAverage() float64 { - // If we do not yet have all the values, then we know that the values - // exist between 0 and ma.divisor.Value(). If we do have all the values, - // we know that they, too, exist between 0 and ma.divisor.Value(). - return calculateAverage(ma.elements[0:ma.divisor.Value()]) -} - -func (ma *CappedMathematicalSeries[T]) AllSequentialIncreasesLessThan( - limit float64, -) (_ bool, maximumSequentialIncrease float64) { - // If we have not yet accumulated a complete set of intervals, - // this is false. - if ma.divisor.Value() != ma.elements_count { - return false, 0 - } - - // Invariant: ma.index always points to the oldest (see AddMeasurement - // above) - oldestIndex := ma.index - previous := ma.elements[oldestIndex] - maximumSequentialIncrease = 0 - for i := uint(1); i < ma.elements_count; i++ { - currentIndex := (oldestIndex + i) % ma.elements_count - current := ma.elements[currentIndex] - percentChange := utilities.SignedPercentDifference(current, previous) - previous = current - if percentChange > limit { - return false, percentChange - } - } - return true, maximumSequentialIncrease -} - -/* - * N.B.: Overflow is possible -- use at your discretion! - */ -func (ma *CappedMathematicalSeries[T]) StandardDeviation() (bool, T) { - // If we have not yet accumulated a complete set of intervals, - // we are always false. - if ma.divisor.Value() != ma.elements_count { - return false, T(0) - } - - // From https://www.mathsisfun.com/data/standard-deviation-calculator.html - // Yes, for real! - - // Calculate the average of the numbers ... - average := ma.CalculateAverage() - - // Calculate the square of each of the elements' differences from the mean. - differences_squared := make([]float64, ma.elements_count) - for index, value := range ma.elements { - differences_squared[index] = math.Pow(float64(value-T(average)), 2) - } - - // The variance is the average of the squared differences. - // So, we need to ... - - // Accumulate all those squared differences. - sds := float64(0) - for _, dss := range differences_squared { - sds += dss - } - - // And then divide that total by the number of elements - variance := sds / float64(ma.divisor.Value()) - - // Finally, the standard deviation is the square root - // of the variance. - sd := T(math.Sqrt(variance)) - // sd := T(variance) - - return true, sd -} - -func (ma *CappedMathematicalSeries[T]) IsNormallyDistributed() bool { - valid, stddev := ma.StandardDeviation() - // If there are not enough values in our series to generate a standard - // deviation, then we cannot do this calculation either. - if !valid { - return false - } - avg := float64(ma.CalculateAverage()) - - fstddev := float64(stddev) - within := float64(0) - for _, v := range ma.Values() { - if (avg-fstddev) <= float64(v) && float64(v) <= (avg+fstddev) { - within++ - } - } - return within/float64(ma.divisor.Value()) >= 0.68 -} - -func (ma *CappedMathematicalSeries[T]) Values() []T { - return ma.elements -} - -func (ma *CappedMathematicalSeries[T]) Len() int { - if uint(len(ma.elements)) != ma.elements_count { - panic( - fmt.Sprintf( - "Error: A capped mathematical series' metadata is invalid: the length of its element array/slice does not match element_count! (%v vs %v)", - ma.elements_count, - len(ma.elements), - ), - ) - } - return len(ma.elements) -} - -func (ma *CappedMathematicalSeries[T]) Percentile(p int) T { - if p < 0 || p > 100 { - return 0 - } - - // Because we need to sort the list to perform the percentile calculation, - // we have to make a copy of the list so that we don't disturb - // the time-relative ordering of the elements. - - kopy := make([]T, len(ma.elements)) - copy(kopy, ma.elements) - return calculatePercentile(kopy, p) -} - -func (ims *CappedMathematicalSeries[T]) Swap(i, j int) { - ims.elements[i], ims.elements[j] = ims.elements[j], ims.elements[i] -} - -func (ims *CappedMathematicalSeries[T]) Less(i, j int) bool { - return ims.elements[i] < ims.elements[j] -} - -func (ims *CappedMathematicalSeries[T]) DoubleSidedTrim(percent uint) MathematicalSeries[T] { - if percent >= 100 { - panic( - fmt.Sprintf("Cannot perform double-sided trim for an invalid percentage: %d", percent), - ) - } - - trimmed := &CappedMathematicalSeries[T]{elements_count: uint(ims.Len())} - trimmed.elements = make([]T, ims.Len()) - copy(trimmed.elements, ims.elements) - sort.Sort(trimmed) - - elementsToTrim := uint(float32(ims.Len()) * ((float32(percent)) / float32(100.0))) - trimmed.elements = trimmed.elements[elementsToTrim : len(trimmed.elements)-int(elementsToTrim)] - - trimmed.elements_count -= (elementsToTrim * 2) - - return trimmed -} - -func (ims *CappedMathematicalSeries[T]) Copy() MathematicalSeries[T] { - newCms := CappedMathematicalSeries[T]{} - newCms.elements = make([]T, ims.Len()) - copy(newCms.elements, ims.elements) - return &newCms -} diff --git a/ms/ms_test.go b/ms/ms_test.go deleted file mode 100644 index 34817d0..0000000 --- a/ms/ms_test.go +++ /dev/null @@ -1,431 +0,0 @@ -package ms - -import ( - "reflect" - "testing" - - "github.com/network-quality/goresponsiveness/utilities" -) - -func Test_InfiniteValues(test *testing.T) { - series := NewInfiniteMathematicalSeries[float64]() - shouldMatch := make([]float64, 0) - previous := float64(1.0) - for range utilities.Iota(1, 80) { - previous *= 1.059 - series.AddElement(float64(previous)) - shouldMatch = append(shouldMatch, previous) - } - - if !reflect.DeepEqual(shouldMatch, series.Values()) { - test.Fatalf("Values() on infinite mathematical series does not work.") - } -} - -func Test_InfiniteSequentialIncreasesAlwaysLessThan(test *testing.T) { - series := NewInfiniteMathematicalSeries[float64]() - previous := float64(1.0) - for range utilities.Iota(1, 80) { - previous *= 1.059 - series.AddElement(float64(previous)) - } - if islt, maxSeqIncrease := series.AllSequentialIncreasesLessThan(6.0); !islt { - test.Fatalf( - "(infinite) Sequential increases are not always less than 6.0 (%f).", - maxSeqIncrease, - ) - } -} - -func Test_CappedTooFewInstantsSequentialIncreasesLessThanAlwaysFalse(test *testing.T) { - series := NewCappedMathematicalSeries[float64](500) - series.AddElement(0.0) - if islt, _ := series.AllSequentialIncreasesLessThan(6.0); islt { - test.Fatalf( - "(infinite) 0 elements in a series should always yield false when asking if sequential increases are less than a value.", - ) - } -} - -func Test_Infinite_degenerate_percentile_too_high(test *testing.T) { - series := NewInfiniteMathematicalSeries[int]() - if series.Percentile(101) != 0 { - test.Fatalf("(infinite) Series percentile of 101 failed.") - } -} - -func Test_Infinite_degenerate_percentile_too_low(test *testing.T) { - series := NewInfiniteMathematicalSeries[int]() - if series.Percentile(-1) != 0 { - test.Fatalf("(infinite) Series percentile of -1 failed.") - } -} - -func Test_Infinite90_percentile(test *testing.T) { - var expected int64 = 10 - series := NewInfiniteMathematicalSeries[int64]() - series.AddElement(10) - series.AddElement(9) - series.AddElement(8) - series.AddElement(7) - series.AddElement(6) - series.AddElement(5) - series.AddElement(4) - series.AddElement(3) - series.AddElement(2) - series.AddElement(1) - - if series.Percentile(90) != expected { - test.Fatalf( - "(infinite) Series 90th percentile of 0 ... 10 failed: Expected: %v; Actual: %v.", expected, - series.Percentile(90), - ) - } -} - -func Test_Infinite90_percentile_reversed(test *testing.T) { - var expected int64 = 10 - series := NewInfiniteMathematicalSeries[int64]() - series.AddElement(1) - series.AddElement(2) - series.AddElement(3) - series.AddElement(4) - series.AddElement(5) - series.AddElement(6) - series.AddElement(7) - series.AddElement(8) - series.AddElement(9) - series.AddElement(10) - - if series.Percentile(90) != expected { - test.Fatalf( - "(infinite) Series 90th percentile of 0 ... 10 failed: Expected %v; Actual: %v.", expected, - series.Percentile(90), - ) - } -} - -func Test_Infinite50_percentile_jumbled(test *testing.T) { - var expected int64 = 15 - series := NewInfiniteMathematicalSeries[int64]() - series.AddElement(7) - series.AddElement(2) - series.AddElement(15) - series.AddElement(27) - series.AddElement(5) - series.AddElement(52) - series.AddElement(18) - series.AddElement(23) - series.AddElement(11) - series.AddElement(12) - - if series.Percentile(50) != expected { - test.Fatalf( - "(infinite) Series 50 percentile of a jumble of numbers failed: Expected %v; Actual: %v.", expected, - series.Percentile(50), - ) - } -} - -func Test_InfiniteDoubleSidedTrimmedMean_jumbled(test *testing.T) { - expected := 16 - series := NewInfiniteMathematicalSeries[int64]() - series.AddElement(7) - series.AddElement(2) - series.AddElement(15) - series.AddElement(27) - series.AddElement(5) - series.AddElement(5) - series.AddElement(52) - series.AddElement(18) - series.AddElement(23) - series.AddElement(11) - series.AddElement(22) - series.AddElement(17) - series.AddElement(14) - series.AddElement(9) - series.AddElement(100) - series.AddElement(72) - series.AddElement(91) - series.AddElement(43) - series.AddElement(37) - series.AddElement(62) - - trimmed := series.DoubleSidedTrim(10) - - if trimmed.Len() != expected { - test.Fatalf( - "Capped series is not of the proper size. Expected %v and got %v", - expected, - trimmed.Len(), - ) - } - - prev := int64(0) - for _, v := range trimmed.Values() { - if !(prev <= v) { - test.Fatalf("Not sorted: %v is not less than or equal to %v\n", prev, v) - } - prev = v - } -} - -func Test_CappedSequentialIncreasesAlwaysLessThan(test *testing.T) { - series := NewCappedMathematicalSeries[float64](40) - previous := float64(1.0) - for range utilities.Iota(1, 80) { - previous *= 1.059 - series.AddElement(float64(previous)) - } - if islt, maxSeqIncrease := series.AllSequentialIncreasesLessThan(6.0); !islt { - test.Fatalf("Sequential increases are not always less than 6.0 (%f).", maxSeqIncrease) - } -} - -func Test_CappedSequentialIncreasesAlwaysLessThanWithWraparound(test *testing.T) { - series := NewCappedMathematicalSeries[float64](20) - previous := float64(1.0) - for range utilities.Iota(1, 20) { - previous *= 1.15 - series.AddElement(float64(previous)) - } - - // All those measurements should be ejected by the following - // loop! - for range utilities.Iota(1, 20) { - previous *= 1.10 - series.AddElement(float64(previous)) - } - - if islt, maxSeqIncrease := series.AllSequentialIncreasesLessThan(11.0); !islt { - test.Fatalf( - "Sequential increases are not always less than 11.0 in wraparound situation (%f v 11.0).", - maxSeqIncrease, - ) - } -} - -func Test_CappedSequentialIncreasesAlwaysLessThanWithWraparoundInverse(test *testing.T) { - series := NewCappedMathematicalSeries[float64](20) - previous := float64(1.0) - for range utilities.Iota(1, 20) { - previous *= 1.15 - series.AddElement(float64(previous)) - } - - // *Not* all those measurements should be ejected by the following - // loop! - for range utilities.Iota(1, 15) { - previous *= 1.10 - series.AddElement(float64(previous)) - } - - if islt, maxSeqIncrease := series.AllSequentialIncreasesLessThan(11.0); islt { - test.Fatalf( - "Sequential increases are (unexpectedly) always less than 11.0 in wraparound situation: %f v 11.0.", - maxSeqIncrease, - ) - } -} - -func Test_CappedStandardDeviationCalculation(test *testing.T) { - expected := 2.93 - series := NewCappedMathematicalSeries[float64](5) - // 5.7, 1.0, 8.6, 7.4, 2.2 - series.AddElement(5.7) - series.AddElement(5.7) - series.AddElement(5.7) - series.AddElement(5.7) - series.AddElement(5.7) - series.AddElement(5.7) - series.AddElement(5.7) - series.AddElement(5.7) - series.AddElement(5.7) - series.AddElement(1.0) - series.AddElement(8.6) - series.AddElement(7.4) - series.AddElement(2.2) - - if _, sd := series.StandardDeviation(); !utilities.ApproximatelyEqual(sd, expected, 0.01) { - test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd) - } else { - test.Logf("Standard deviation calculation result: %v", sd) - } -} - -func Test_CappedStandardDeviationCalculation2(test *testing.T) { - expected := 1.41 - series := NewCappedMathematicalSeries[float64](5) - series.AddElement(8) - series.AddElement(9) - series.AddElement(10) - series.AddElement(11) - series.AddElement(12) - - if _, sd := series.StandardDeviation(); !utilities.ApproximatelyEqual(sd, expected, 0.01) { - test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd) - } else { - test.Logf("Standard deviation calculation result: %v", sd) - } -} - -func Test_CappedRotatingValues(test *testing.T) { - series := NewCappedMathematicalSeries[int](5) - - series.AddElement(1) - series.AddElement(2) - series.AddElement(3) - series.AddElement(4) - series.AddElement(5) - - series.AddElement(6) - series.AddElement(7) - - if !reflect.DeepEqual([]int{6, 7, 3, 4, 5}, series.Values()) { - test.Fatalf("Adding values does not properly erase earlier values.") - } -} - -func Test_CappedLen(test *testing.T) { - series := NewCappedMathematicalSeries[int](5) - - series.AddElement(1) - series.AddElement(2) - series.AddElement(3) - series.AddElement(4) - series.AddElement(5) - - series.AddElement(6) - series.AddElement(7) - - if series.Len() != 5 { - test.Fatalf("Series size calculations failed.") - } -} - -func Test_Capped_degenerate_percentile_too_high(test *testing.T) { - series := NewCappedMathematicalSeries[int](21) - if series.Percentile(101) != 0 { - test.Fatalf("Series percentile of 101 failed.") - } -} - -func Test_Capped_degenerate_percentile_too_low(test *testing.T) { - series := NewCappedMathematicalSeries[int](21) - if series.Percentile(-1) != 0 { - test.Fatalf("Series percentile of -1 failed.") - } -} - -func Test_Capped90_percentile(test *testing.T) { - var expected int = 10 - series := NewCappedMathematicalSeries[int](10) - series.AddElement(10) - series.AddElement(9) - series.AddElement(8) - series.AddElement(7) - series.AddElement(6) - series.AddElement(5) - series.AddElement(4) - series.AddElement(3) - series.AddElement(2) - series.AddElement(1) - - if series.Percentile(90) != expected { - test.Fatalf( - "Series 90th percentile of 0 ... 10 failed: Expected %v got %v.", expected, - series.Percentile(90), - ) - } -} - -func Test_Capped90_percentile_reversed(test *testing.T) { - series := NewCappedMathematicalSeries[int64](10) - series.AddElement(1) - series.AddElement(2) - series.AddElement(3) - series.AddElement(4) - series.AddElement(5) - series.AddElement(6) - series.AddElement(7) - series.AddElement(8) - series.AddElement(9) - series.AddElement(10) - - if series.Percentile(90) != 10 { - test.Fatalf( - "Series 90th percentile of 0 ... 10 failed: Expected 10 got %v.", - series.Percentile(90), - ) - } -} - -func Test_Capped50_percentile_jumbled(test *testing.T) { - var expected int64 = 15 - series := NewCappedMathematicalSeries[int64](10) - series.AddElement(7) - series.AddElement(2) - series.AddElement(15) - series.AddElement(27) - series.AddElement(5) - series.AddElement(52) - series.AddElement(18) - series.AddElement(23) - series.AddElement(11) - series.AddElement(12) - - if series.Percentile(50) != expected { - test.Fatalf( - "Series 50 percentile of a jumble of numbers failed: Expected %v got %v.", expected, - series.Percentile(50), - ) - } -} - -func Test_CappedDoubleSidedTrimmedMean_jumbled(test *testing.T) { - expected := 8 - series := NewCappedMathematicalSeries[int64](10) - series.AddElement(7) - series.AddElement(2) - series.AddElement(15) - series.AddElement(27) - series.AddElement(5) - series.AddElement(5) - series.AddElement(52) - series.AddElement(18) - series.AddElement(23) - series.AddElement(11) - series.AddElement(12) - - trimmed := series.DoubleSidedTrim(10) - - if trimmed.Len() != expected { - test.Fatalf( - "Capped series is not of the proper size. Expected %v and got %v", - expected, - trimmed.Len(), - ) - } - - prev := int64(0) - for _, v := range trimmed.Values() { - if !(prev <= v) { - test.Fatalf("Not sorted: %v is not less than or equal to %v\n", prev, v) - } - prev = v - } -} - -func Test_CappedAverage(test *testing.T) { - expected := 1.0082230220488836e+08 - series := NewCappedMathematicalSeries[float64](4) - series.AddElement(9.94747772516195e+07) - series.AddElement(9.991286984703423e+07) - series.AddElement(1.0285437111086299e+08) - series.AddElement(1.0104719061003672e+08) - if average := series.CalculateAverage(); !utilities.ApproximatelyEqual(average, 0.01, expected) { - test.Fatalf( - "Expected: %v; Actual: %v.", average, expected, - ) - } -} diff --git a/networkQuality.go b/networkQuality.go index 776c0e7..aa8d854 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -33,10 +33,10 @@ import ( "github.com/network-quality/goresponsiveness/direction" "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" - "github.com/network-quality/goresponsiveness/ms" "github.com/network-quality/goresponsiveness/probe" "github.com/network-quality/goresponsiveness/qualityattenuation" "github.com/network-quality/goresponsiveness/rpm" + "github.com/network-quality/goresponsiveness/series" "github.com/network-quality/goresponsiveness/stabilizer" "github.com/network-quality/goresponsiveness/timeoutat" "github.com/network-quality/goresponsiveness/utilities" @@ -471,8 +471,8 @@ func main() { } // All tests will accumulate data to these series because it will all matter for RPM calculation! - selfRtts := ms.NewInfiniteMathematicalSeries[float64]() - foreignRtts := ms.NewInfiniteMathematicalSeries[float64]() + selfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) + foreignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil if *printQualityAttenuation { @@ -521,8 +521,8 @@ func main() { if *debugCliFlag { downloadThroughputStabilizerDebugLevel = debug.Debug } - throughputStabilizer := stabilizer.NewStabilizer[float64]( - uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance, 0, "bytes", + throughputStabilizer := stabilizer.NewStabilizer[float64, uint64]( + specParameters.MovingAvgDist, specParameters.StdDevTolerance, 0, "bytes", downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig) responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, @@ -531,8 +531,8 @@ func main() { if *debugCliFlag { responsivenessStabilizerDebugLevel = debug.Debug } - responsivenessStabilizer := stabilizer.NewStabilizer[int64]( - uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance, + responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint]( + specParameters.MovingAvgDist, specParameters.StdDevTolerance, specParameters.TrimmedMeanPct, "milliseconds", responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig) @@ -541,32 +541,70 @@ func main() { lastThroughputRate := float64(0) lastThroughputOpenConnectionCount := int(0) + stabilityCheckTime := time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel := timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) + lg_timeout: for !direction.StableThroughput { select { case throughputMeasurement := <-lgStabilizationCommunicationChannel: { - throughputStabilizer.AddMeasurement( - throughputMeasurement.Throughput) + switch throughputMeasurement.Type { + case series.SeriesMessageReserve: + { + throughputStabilizer.Reserve(throughputMeasurement.Bucket) + if *debugCliFlag { + fmt.Printf( + "%s: Reserving a throughput bucket with id %v.\n", + direction.DirectionLabel, throughputMeasurement.Bucket) + } + } + case series.SeriesMessageMeasure: + { + bucket := throughputMeasurement.Bucket + measurement := utilities.GetSome(throughputMeasurement.Measure) + + throughputStabilizer.AddMeasurement(bucket, measurement.Throughput) + + direction.ThroughputDataLogger.LogRecord(measurement) + for _, v := range measurement.GranularThroughputDataPoints { + v.Direction = "Download" + direction.GranularThroughputDataLogger.LogRecord(v) + } + + lastThroughputRate = measurement.Throughput + lastThroughputOpenConnectionCount = measurement.Connections + } + } + } + case <-stabilityCheckTimeChannel: + { + if *debugCliFlag { + fmt.Printf( + "%v throughput stability interval is complete.\n", direction.DirectionLabel) + } + stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel = timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) + direction.StableThroughput = throughputStabilizer.IsStable() if *debugCliFlag { fmt.Printf( - "################# %v is instantaneously %s.\n", direction.DirectionLabel, + "%v is instantaneously %s.\n", direction.DirectionLabel, utilities.Conditional(direction.StableThroughput, "stable", "unstable")) } - direction.ThroughputDataLogger.LogRecord(throughputMeasurement) - for i := range throughputMeasurement.GranularThroughputDataPoints { - datapoint := throughputMeasurement.GranularThroughputDataPoints[i] - datapoint.Direction = "Download" - direction.GranularThroughputDataLogger.LogRecord(datapoint) - } - - lastThroughputRate = throughputMeasurement.Throughput - lastThroughputOpenConnectionCount = throughputMeasurement.Connections if direction.StableThroughput { throughputGeneratorCtxCancel() } + throughputStabilizer.Interval() } case <-timeoutChannel: { @@ -577,7 +615,7 @@ func main() { if direction.StableThroughput { if *debugCliFlag { - fmt.Printf("################# Throughput is stable; beginning responsiveness testing.\n") + fmt.Printf("Throughput is stable; beginning responsiveness testing.\n") } } else { fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Adding 15 seconds to calculate speculative RPM results.\n") @@ -590,8 +628,8 @@ func main() { ) } - perDirectionSelfRtts := ms.NewInfiniteMathematicalSeries[float64]() - perDirectionForeignRtts := ms.NewInfiniteMathematicalSeries[float64]() + perDirectionSelfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) + perDirectionForeignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0) responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber( proberOperatorCtx, @@ -611,61 +649,122 @@ func main() { select { case probeMeasurement := <-responsivenessStabilizationCommunicationChannel: { - foreignDataPoint := probeMeasurement.First - selfDataPoint := probeMeasurement.Second + switch probeMeasurement.Type { + case series.SeriesMessageReserve: + { + bucket := probeMeasurement.Bucket + if *debugCliFlag { + fmt.Printf( + "%s: Reserving a responsiveness bucket with id %v.\n", direction.DirectionLabel, bucket) + } + responsivenessStabilizer.Reserve(bucket) + selfRtts.Reserve(bucket) + foreignRtts.Reserve(bucket) + perDirectionForeignRtts.Reserve(bucket) + perDirectionSelfRtts.Reserve(bucket) + } + case series.SeriesMessageMeasure: + { + bucket := probeMeasurement.Bucket + measurement := utilities.GetSome(probeMeasurement.Measure) + foreignDataPoint := measurement.Foreign + selfDataPoint := measurement.Self - responsivenessStabilizer.AddMeasurement( - (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds()) + if *debugCliFlag { + fmt.Printf( + "%s: Filling a responsiveness bucket with id %v with value %v.\n", + direction.DirectionLabel, bucket, measurement) + } + responsivenessStabilizer.AddMeasurement(bucket, + (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds()) - // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy - // is *actually* important, but it can't hurt? - direction.StableResponsiveness = responsivenessStabilizer.IsStable() + if err := selfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (selfRtts)\n", bucket) + } + if perDirectionSelfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionSelfRtts)\n", bucket) + } - if *debugCliFlag { - fmt.Printf( - "################# Responsiveness is instantaneously %s.\n", - utilities.Conditional(direction.StableResponsiveness, "stable", "unstable")) - } - // There may be more than one round trip accumulated together. If that is the case, - // we will blow them apart in to three separate measurements and each one will just - // be 1 / measurement.RoundTripCount of the total length. - for range utilities.Iota(0, int(foreignDataPoint.RoundTripCount)) { - foreignRtts.AddElement(foreignDataPoint.Duration.Seconds() / - float64(foreignDataPoint.RoundTripCount)) - perDirectionForeignRtts.AddElement(foreignDataPoint.Duration.Seconds() / - float64(foreignDataPoint.RoundTripCount)) - } - selfRtts.AddElement(selfDataPoint.Duration.Seconds()) - perDirectionSelfRtts.AddElement(selfDataPoint.Duration.Seconds()) + if foreignRtts.Fill(bucket, foreignDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (foreignRtts)\n", bucket) + } - if selfRttsQualityAttenuation != nil { - selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds()) - } + if perDirectionForeignRtts.Fill(bucket, + foreignDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionForeignRtts)\n", bucket) + } + + if selfRttsQualityAttenuation != nil { + selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds()) + } - direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint) - direction.SelfProbeDataLogger.LogRecord(*selfDataPoint) + direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint) + direction.SelfProbeDataLogger.LogRecord(*selfDataPoint) + + } + } } case throughputMeasurement := <-lgStabilizationCommunicationChannel: { - if *debugCliFlag { - fmt.Printf("Adding a throughput measurement.\n") - } - // There may be more than one round trip accumulated together. If that is the case, - direction.ThroughputDataLogger.LogRecord(throughputMeasurement) - for i := range throughputMeasurement.GranularThroughputDataPoints { - datapoint := throughputMeasurement.GranularThroughputDataPoints[i] - datapoint.Direction = direction.DirectionLabel - direction.GranularThroughputDataLogger.LogRecord(datapoint) - } + switch throughputMeasurement.Type { + case series.SeriesMessageReserve: + { + // We are no longer tracking stability, so reservation messages are useless! + if *debugCliFlag { + fmt.Printf( + "%s: Discarding a throughput bucket with id %v when ascertaining responsiveness.\n", + direction.DirectionLabel, throughputMeasurement.Bucket) + } + } + case series.SeriesMessageMeasure: + { + measurement := utilities.GetSome(throughputMeasurement.Measure) - lastThroughputRate = throughputMeasurement.Throughput - lastThroughputOpenConnectionCount = throughputMeasurement.Connections + if *debugCliFlag { + fmt.Printf("Adding a throughput measurement (while ascertaining responsiveness).\n") + } + // There may be more than one round trip accumulated together. If that is the case, + direction.ThroughputDataLogger.LogRecord(measurement) + for _, v := range measurement.GranularThroughputDataPoints { + v.Direction = direction.DirectionLabel + direction.GranularThroughputDataLogger.LogRecord(v) + } + lastThroughputRate = measurement.Throughput + lastThroughputOpenConnectionCount = measurement.Connections + } + } } case <-timeoutChannel: { break responsiveness_timeout } + case <-stabilityCheckTimeChannel: + { + if *debugCliFlag { + fmt.Printf( + "%v responsiveness stability interval is complete.\n", direction.DirectionLabel) + } + + stabilityCheckTime = time.Now().Add(specParameters.EvalInterval) + stabilityCheckTimeChannel = timeoutat.TimeoutAt( + operatingCtx, + stabilityCheckTime, + debugLevel, + ) + + // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy + // is *actually* important, but it can't hurt? + direction.StableResponsiveness = responsivenessStabilizer.IsStable() + + if *debugCliFlag { + fmt.Printf( + "Responsiveness is instantaneously %s.\n", + utilities.Conditional(direction.StableResponsiveness, "stable", "unstable")) + } + + responsivenessStabilizer.Interval() + } } } @@ -753,7 +852,7 @@ func main() { } fmt.Printf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, directionResult.PNRpm, 90) - fmt.Printf("%s RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel, + fmt.Printf("%s RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel, directionResult.MeanRpm, specParameters.TrimmedMeanPct) if len(*prometheusStatsFilename) > 0 { @@ -808,7 +907,7 @@ func main() { } fmt.Printf("Final RPM: %5.0f (P%d)\n", result.PNRpm, 90) - fmt.Printf("Final RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n", + fmt.Printf("Final RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", result.MeanRpm, specParameters.TrimmedMeanPct) // Stop the world. diff --git a/rpm/calculations.go b/rpm/calculations.go index 5387aa7..4b61413 100644 --- a/rpm/calculations.go +++ b/rpm/calculations.go @@ -17,40 +17,60 @@ package rpm import ( "fmt" - "github.com/network-quality/goresponsiveness/ms" + "github.com/network-quality/goresponsiveness/series" + "github.com/network-quality/goresponsiveness/utilities" + "golang.org/x/exp/constraints" ) -type Rpm struct { +type Rpm[Data utilities.Number] struct { SelfRttsTotal int ForeignRttsTotal int SelfRttsTrimmed int ForeignRttsTrimmed int - SelfProbeRttPN float64 - ForeignProbeRttPN float64 + SelfProbeRttPN Data + ForeignProbeRttPN Data SelfProbeRttMean float64 ForeignProbeRttMean float64 PNRpm float64 MeanRpm float64 } -func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.MathematicalSeries[float64], trimming uint, percentile int) Rpm { - // First, let's do a double-sided trim of the top/bottom 10% of our measurements. - selfRttsTotalCount := selfRtts.Len() - foreignRttsTotalCount := foreignRtts.Len() +func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered]( + selfRtts series.WindowSeries[Data, Bucket], aggregatedForeignRtts series.WindowSeries[Data, Bucket], trimming uint, percentile int, +) Rpm[Data] { + // There may be more than one round trip accumulated together. If that is the case, + // we will blow them apart in to three separate measurements and each one will just + // be 1 / 3. + foreignRtts := series.NewWindowSeries[Data, int](series.Forever, 0) + foreignBuckets := 0 + for _, v := range aggregatedForeignRtts.GetValues() { + if utilities.IsSome(v) { + v := utilities.GetSome(v) + foreignRtts.Reserve(foreignBuckets) + foreignRtts.Reserve(foreignBuckets + 1) + foreignRtts.Reserve(foreignBuckets + 2) + foreignRtts.Fill(foreignBuckets, v/3) + foreignRtts.Fill(foreignBuckets+1, v/3) + foreignRtts.Fill(foreignBuckets+2, v/3) + foreignBuckets += 3 + } + } - selfRttsTrimmed := selfRtts.DoubleSidedTrim(trimming) - foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(trimming) + // First, let's do a double-sided trim of the top/bottom 10% of our measurements. + selfRttsTotalCount, _ := selfRtts.Count() + foreignRttsTotalCount, _ := foreignRtts.Count() - selfRttsTrimmedCount := selfRttsTrimmed.Len() - foreignRttsTrimmedCount := foreignRttsTrimmed.Len() + _, selfProbeRoundTripTimeMean, selfRttsTrimmed := + series.TrimmedMean(selfRtts, int(trimming)) + _, foreignProbeRoundTripTimeMean, foreignRttsTrimmed := + series.TrimmedMean(foreignRtts, int(trimming)) - // Then, let's take the mean of those ... - selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage() - foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage() + selfRttsTrimmedCount := len(selfRttsTrimmed) + foreignRttsTrimmedCount := len(foreignRttsTrimmed) // Second, let's do the P90 calculations. - selfProbeRoundTripTimePN := selfRtts.Percentile(percentile) - foreignProbeRoundTripTimePN := foreignRtts.Percentile(percentile) + _, selfProbeRoundTripTimePN := series.Percentile(selfRtts, percentile) + _, foreignProbeRoundTripTimePN := series.Percentile(foreignRtts, percentile) // Note: The specification indicates that we want to calculate the foreign probes as such: // 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign @@ -62,7 +82,7 @@ func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.Mathem pnRpm := 60.0 / (float64(selfProbeRoundTripTimePN+foreignProbeRoundTripTimePN) / 2.0) meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0) - return Rpm{ + return Rpm[Data]{ SelfRttsTotal: selfRttsTotalCount, ForeignRttsTotal: foreignRttsTotalCount, SelfRttsTrimmed: selfRttsTrimmedCount, ForeignRttsTrimmed: foreignRttsTrimmedCount, SelfProbeRttPN: selfProbeRoundTripTimePN, ForeignProbeRttPN: foreignProbeRoundTripTimePN, @@ -71,14 +91,14 @@ func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.Mathem } } -func (rpm *Rpm) ToString() string { +func (rpm *Rpm[Data]) ToString() string { return fmt.Sprintf( `Total Self Probes: %d Total Foreign Probes: %d Trimmed Self Probes Count: %d Trimmed Foreign Probes Count: %d -P90 Self RTT: %f -P90 Foreign RTT: %f +P90 Self RTT: %v +P90 Foreign RTT: %v Trimmed Mean Self RTT: %f Trimmed Mean Foreign RTT: %f `, @@ -29,6 +29,7 @@ import ( "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" "github.com/network-quality/goresponsiveness/probe" + "github.com/network-quality/goresponsiveness/series" "github.com/network-quality/goresponsiveness/utilities" ) @@ -84,6 +85,11 @@ type SelfDataCollectionResult struct { LoggingContinuation func() } +type ResponsivenessProbeResult struct { + Foreign *probe.ProbeDataPoint + Self *probe.ProbeDataPoint +} + func ResponsivenessProber( proberCtx context.Context, networkActivityCtx context.Context, @@ -95,7 +101,7 @@ func ResponsivenessProber( keyLogger io.Writer, captureExtendedStats bool, debugging *debug.DebugWithPrefix, -) (dataPoints chan utilities.Pair[*probe.ProbeDataPoint]) { +) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, uint]) { if debug.IsDebug(debugging.Level) { fmt.Printf( "(%s) Starting to collect responsiveness information at an interval of %v!\n", @@ -106,7 +112,7 @@ func ResponsivenessProber( // Make a channel to send back all the generated data points // when we are probing. - dataPoints = make(chan utilities.Pair[*probe.ProbeDataPoint]) + dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, uint]) go func() { wg := sync.WaitGroup{} @@ -141,6 +147,11 @@ func ResponsivenessProber( ) } + dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{ + Type: series.SeriesMessageReserve, Bucket: probeCount, + Measure: utilities.None[ResponsivenessProbeResult](), + } + // The presence of a custom TLSClientConfig in a *generic* `transport` // means that go will default to HTTP/1.1 and cowardly avoid HTTP/2: // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278 @@ -262,8 +273,13 @@ func ResponsivenessProber( dataPointsLock.Lock() // Now we have our four data points (three in the foreign probe data point and one in the self probe data point) if dataPoints != nil { - dataPoints <- utilities.Pair[*probe.ProbeDataPoint]{ - First: foreignProbeDataPoint, Second: selfProbeDataPoint, + measurement := ResponsivenessProbeResult{ + Foreign: foreignProbeDataPoint, Self: selfProbeDataPoint, + } + + dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{ + Type: series.SeriesMessageMeasure, Bucket: probeCount, + Measure: utilities.Some[ResponsivenessProbeResult](measurement), } } dataPointsLock.Unlock() @@ -300,8 +316,8 @@ func LoadGenerator( mnp int, captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections? debugging *debug.DebugWithPrefix, // How can we forget debugging? -) (stabilizerCommunicationChannel chan ThroughputDataPoint) { // Send back all the instantaneous throughputs that we generate. - stabilizerCommunicationChannel = make(chan ThroughputDataPoint) +) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, uint64]) { // Send back all the instantaneous throughputs that we generate. + seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, uint64]) go func() { flowsCreated := uint64(0) @@ -454,7 +470,14 @@ func LoadGenerator( len(*loadGeneratingConnectionsCollection.LGCs), granularThroughputDatapoints, } - stabilizerCommunicationChannel <- throughputDataPoint + + seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{ + Type: series.SeriesMessageReserve, Bucket: currentInterval, + } + seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{ + Type: series.SeriesMessageMeasure, Bucket: currentInterval, + Measure: utilities.Some[ThroughputDataPoint](throughputDataPoint), + } if generateLoadCtx.Err() != nil { // No need to add additional data points because the controller told us diff --git a/series/message.go b/series/message.go new file mode 100644 index 0000000..fa0b096 --- /dev/null +++ b/series/message.go @@ -0,0 +1,19 @@ +package series + +import ( + "github.com/network-quality/goresponsiveness/utilities" + "golang.org/x/exp/constraints" +) + +type SeriesMessageType int + +const ( + SeriesMessageReserve SeriesMessageType = iota + SeriesMessageMeasure SeriesMessageType = iota +) + +type SeriesMessage[Data any, BucketType constraints.Ordered] struct { + Type SeriesMessageType + Bucket BucketType + Measure utilities.Optional[Data] +} diff --git a/series/series.go b/series/series.go new file mode 100644 index 0000000..0084007 --- /dev/null +++ b/series/series.go @@ -0,0 +1,280 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ +package series + +import ( + "fmt" + + "github.com/network-quality/goresponsiveness/utilities" + "golang.org/x/exp/constraints" +) + +type WindowSeriesDuration int + +const ( + Forever WindowSeriesDuration = iota + WindowOnly WindowSeriesDuration = iota +) + +type WindowSeries[Data any, Bucket constraints.Ordered] interface { + fmt.Stringer + + Reserve(b Bucket) error + Fill(b Bucket, d Data) error + + Count() (some int, none int) + + ForEach(func(Bucket, *utilities.Optional[Data])) + + GetValues() []utilities.Optional[Data] + Complete() bool + GetType() WindowSeriesDuration +} + +type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct { + windowSize int + data []utilities.Pair[Bucket, utilities.Optional[Data]] + latestIndex int + empty bool +} + +/* + * Beginning of WindowSeries interface methods. + */ + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error { + if !wsi.empty && b <= wsi.data[wsi.latestIndex].First { + return fmt.Errorf("reserving must be monotonically increasing") + } + + if wsi.empty { + /* Special case if we are empty: The latestIndex is where we want this value to go! */ + wsi.data[wsi.latestIndex] = utilities.Pair[Bucket, utilities.Optional[Data]]{ + First: b, Second: utilities.None[Data](), + } + } else { + /* Otherwise, bump ourselves forward and place the new reservation there. */ + wsi.latestIndex = wsi.nextIndex(wsi.latestIndex) + wsi.data[wsi.latestIndex] = utilities.Pair[Bucket, utilities.Optional[Data]]{ + First: b, Second: utilities.None[Data](), + } + } + wsi.empty = false + return nil +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) error { + iterator := wsi.latestIndex + for { + if wsi.data[iterator].First == b { + wsi.data[iterator].Second = utilities.Some[Data](d) + return nil + } + iterator = wsi.nextIndex(iterator) + if iterator == wsi.latestIndex { + break + } + } + return fmt.Errorf("attempting to fill a bucket that does not exist") +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Count() (some int, none int) { + some = 0 + none = 0 + for _, v := range wsi.data { + if utilities.IsSome[Data](v.Second) { + some++ + } else { + none++ + } + } + return +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Complete() bool { + for _, v := range wsi.data { + if utilities.IsNone(v.Second) { + return false + } + } + return true +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) nextIndex(currentIndex int) int { + return (currentIndex + 1) % wsi.windowSize +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) previousIndex(currentIndex int) int { + nextIndex := currentIndex - 1 + if nextIndex < 0 { + nextIndex += wsi.windowSize + } + return nextIndex +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optional[Data] { + result := make([]utilities.Optional[Data], wsi.windowSize) + + iterator := wsi.latestIndex + parallelIterator := 0 + for { + result[parallelIterator] = wsi.data[iterator].Second + iterator = wsi.previousIndex(iterator) + parallelIterator++ + if iterator == wsi.latestIndex { + break + } + } + return result +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] { + return wsi.toArray() +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetType() WindowSeriesDuration { + return WindowOnly +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ForEach(eacher func(b Bucket, d *utilities.Optional[Data])) { + for _, v := range wsi.data { + eacher(v.First, &v.Second) + } +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string { + result := fmt.Sprintf("Window series (window (%d) only, latest index: %v): ", wsi.windowSize, wsi.latestIndex) + for _, v := range wsi.data { + valueString := "None" + if utilities.IsSome[Data](v.Second) { + valueString = fmt.Sprintf("%v", utilities.GetSome[Data](v.Second)) + } + result += fmt.Sprintf("%v: %v; ", v.First, valueString) + } + return result +} + +func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered]( + windowSize int, +) *windowSeriesWindowOnlyImpl[Data, Bucket] { + result := windowSeriesWindowOnlyImpl[Data, Bucket]{windowSize: windowSize, latestIndex: 0, empty: true} + + result.data = make([]utilities.Pair[Bucket, utilities.Optional[Data]], windowSize) + + return &result +} + +/* + * End of WindowSeries interface methods. + */ + +type windowSeriesForeverImpl[Data any, Bucket constraints.Ordered] struct { + data []utilities.Pair[Bucket, utilities.Optional[Data]] + empty bool +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error { + if !wsi.empty && b <= wsi.data[len(wsi.data)-1].First { + return fmt.Errorf("reserving must be monotonically increasing") + } + + wsi.empty = false + wsi.data = append(wsi.data, utilities.Pair[Bucket, utilities.Optional[Data]]{First: b, Second: utilities.None[Data]()}) + return nil +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error { + for i := range wsi.data { + if wsi.data[i].First == b { + wsi.data[i].Second = utilities.Some[Data](d) + return nil + } + } + return fmt.Errorf("attempting to fill a bucket that does not exist") +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] { + result := make([]utilities.Optional[Data], len(wsi.data)) + + for i, v := range utilities.Reverse(wsi.data) { + result[i] = v.Second + } + + return result +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) Count() (some int, none int) { + some = 0 + none = 0 + for _, v := range wsi.data { + if utilities.IsSome[Data](v.Second) { + some++ + } else { + none++ + } + } + return +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) Complete() bool { + for _, v := range wsi.data { + if utilities.IsNone(v.Second) { + return false + } + } + return true +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetType() WindowSeriesDuration { + return Forever +} + +func newWindowSeriesForeverImpl[Data any, Bucket constraints.Ordered]() *windowSeriesForeverImpl[Data, Bucket] { + result := windowSeriesForeverImpl[Data, Bucket]{empty: true} + + result.data = nil + + return &result +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) ForEach(eacher func(b Bucket, d *utilities.Optional[Data])) { + for _, v := range wsi.data { + eacher(v.First, &v.Second) + } +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string { + result := "Window series (forever): " + for _, v := range wsi.data { + valueString := "None" + if utilities.IsSome[Data](v.Second) { + valueString = fmt.Sprintf("%v", utilities.GetSome[Data](v.Second)) + } + result += fmt.Sprintf("%v: %v; ", v.First, valueString) + } + return result +} + +/* + * End of WindowSeries interface methods. + */ + +func NewWindowSeries[Data any, Bucket constraints.Ordered](tipe WindowSeriesDuration, windowSize int) WindowSeries[Data, Bucket] { + if tipe == WindowOnly { + return newWindowSeriesWindowOnlyImpl[Data, Bucket](windowSize) + } else if tipe == Forever { + return newWindowSeriesForeverImpl[Data, Bucket]() + } + panic("") +} diff --git a/series/series_test.go b/series/series_test.go new file mode 100644 index 0000000..5d2ab57 --- /dev/null +++ b/series/series_test.go @@ -0,0 +1,846 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ +package series + +import ( + "reflect" + "testing" + + "github.com/network-quality/goresponsiveness/utilities" +) + +func TestNextIndex(t *testing.T) { + wsi := newWindowSeriesWindowOnlyImpl[int, int](4) + + idx := wsi.nextIndex(wsi.latestIndex) + if idx != 1 { + t.Fatalf("nextIndex is wrong (1)") + } + wsi.latestIndex = idx + + idx = wsi.nextIndex(wsi.latestIndex) + if idx != 2 { + t.Fatalf("nextIndex is wrong (2)") + } + wsi.latestIndex = idx + + idx = wsi.nextIndex(wsi.latestIndex) + if idx != 3 { + t.Fatalf("nextIndex is wrong (3)") + } + wsi.latestIndex = idx + + idx = wsi.nextIndex(wsi.latestIndex) + if idx != 0 { + t.Fatalf("nextIndex is wrong (0)") + } + wsi.latestIndex = idx + + idx = wsi.nextIndex(wsi.latestIndex) + if idx != 1 { + t.Fatalf("nextIndex is wrong (1)") + } + wsi.latestIndex = idx +} + +func TestSimpleWindowComplete(t *testing.T) { + wsi := newWindowSeriesWindowOnlyImpl[int, int](4) + if wsi.Complete() { + t.Fatalf("Window should not be complete.") + } + wsfImpl := newWindowSeriesForeverImpl[int, int]() + wsfImpl.Reserve(1) + if wsfImpl.Complete() { + t.Fatalf("Window should not be complete.") + } +} + +func TestSimpleReserve(t *testing.T) { + wswoImpl := newWindowSeriesWindowOnlyImpl[int, int](4) + result := wswoImpl.Reserve(0) + if result != nil { + t.Fatalf("Reserving 1 should be a-ok!") + } + wsfImpl := newWindowSeriesForeverImpl[int, int]() + result = wsfImpl.Reserve(0) + if result != nil { + t.Fatalf("Reserving 1 should be a-ok!") + } +} + +func Test_ForeverValues(test *testing.T) { + series := newWindowSeriesForeverImpl[float64, int]() + shouldMatch := make([]utilities.Optional[float64], 0) + previous := float64(1.0) + for i := range utilities.Iota(1, 81) { + previous *= 1.059 + series.Reserve(i) + series.Fill(i, float64(previous)) + shouldMatch = append(shouldMatch, utilities.Some[float64](previous)) + } + + if !reflect.DeepEqual(utilities.Reverse(shouldMatch), series.GetValues()) { + test.Fatalf("Values() on infinite mathematical series does not work.") + } +} + +func Test_WindowOnlySequentialIncreasesAlwaysLessThan(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[float64, int](10) + previous := float64(1.0) + for i := range utilities.Iota(1, 11) { + previous *= 1.5 + series.Reserve(i) + series.Fill(i, float64(previous)) + } + if complete, islt, maxSeqIncrease := AllSequentialIncreasesLessThan[float64, int](series, + 100); !complete || maxSeqIncrease != 50.0 || !islt { + test.Fatalf( + "(Window Only) Sequential increases are not always less than 100 (%v, %v, %f ).", + complete, islt, maxSeqIncrease, + ) + } +} + +func Test_WindowOnlyTooFewInstantsSequentialIncreasesLessThanAlwaysFalse(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[float64, int](500) + series.Reserve(1) + series.Fill(1, 0.0) + if complete, islt, _ := AllSequentialIncreasesLessThan[float64, int](series, 0.0); complete || islt { + test.Fatalf( + "(Window Only) 0 elements in a series should always yield false when asking if sequential increases are less than a value.", + ) + } +} + +func Test_Forever_Complete(test *testing.T) { + series := newWindowSeriesForeverImpl[int, int]() + series.Reserve(1) + series.Fill(1, 10) + if !series.Complete() { + test.Fatalf("(infinite) Series with one element and a window size of 1 is not complete.") + } +} + +func Test_Forever_CompleteN(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[float64, int](10) + previous := float64(1.0) + for i := range utilities.Iota(1, 11) { + previous *= 1.5 + series.Reserve(i) + series.Fill(i, float64(previous)) + } + if !series.Complete() { + test.Fatalf("(infinite) Series with one element and a window size of 2 is complete.") + } +} + +func Test_Forever_degenerate_percentile_too_high(test *testing.T) { + series := newWindowSeriesForeverImpl[int, int]() + if complete, result := Percentile[int, int](series, 101); !complete || result != 0.0 { + test.Fatalf("(infinite) Series percentile of 101 failed.") + } +} + +func Test_Forever_degenerate_percentile_too_low(test *testing.T) { + series := newWindowSeriesForeverImpl[int, int]() + if complete, result := Percentile[int, int](series, -1); !complete || result != 0.0 { + test.Fatalf("(infinite) Series percentile of -1 failed.") + } +} + +/////////// + +func Test_Forever90_percentile(test *testing.T) { + var expected int = 10 + series := newWindowSeriesForeverImpl[int, int]() + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(10, 10) + series.Fill(9, 9) + series.Fill(8, 8) + series.Fill(7, 7) + series.Fill(6, 6) + series.Fill(5, 5) + series.Fill(4, 4) + series.Fill(3, 3) + series.Fill(2, 2) + series.Fill(1, 1) + + if complete, result := Percentile[int, int](series, 90); !complete || result != expected { + test.Fatalf( + "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_Forever90_WindowOnly_percentile(test *testing.T) { + var expected int = 10 + series := newWindowSeriesForeverImpl[int, int]() + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(10, 10) + series.Fill(9, 9) + series.Fill(8, 8) + series.Fill(7, 7) + series.Fill(6, 6) + series.Fill(5, 5) + series.Fill(4, 4) + series.Fill(3, 3) + series.Fill(2, 2) + series.Fill(1, 1) + + if complete, result := Percentile[int, int](series, 90); !complete || result != expected { + test.Fatalf( + "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_Forever90_percentile_reversed(test *testing.T) { + var expected int = 10 + series := newWindowSeriesForeverImpl[int, int]() + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(10, 1) + series.Fill(9, 2) + series.Fill(8, 3) + series.Fill(7, 4) + series.Fill(6, 5) + series.Fill(5, 6) + series.Fill(4, 7) + series.Fill(3, 8) + series.Fill(2, 9) + series.Fill(1, 10) + + if complete, result := Percentile[int, int](series, 90); !complete || result != expected { + test.Fatalf( + "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_Forever50_percentile_jumbled(test *testing.T) { + var expected int64 = 15 + series := newWindowSeriesForeverImpl[int64, int]() + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(1, 7) + series.Fill(2, 2) + series.Fill(3, 15) + series.Fill(4, 27) + series.Fill(5, 5) + series.Fill(6, 52) + series.Fill(7, 18) + series.Fill(8, 23) + series.Fill(9, 11) + series.Fill(10, 12) + + if complete, result := Percentile[int64, int](series, 50); !complete || result != expected { + test.Fatalf( + "Series 50 percentile of a jumble of numbers failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_Forever90_partial_percentile(test *testing.T) { + var expected int = 10 + series := newWindowSeriesForeverImpl[int, int]() + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(10, 10) + series.Fill(9, 9) + series.Fill(8, 8) + series.Fill(7, 7) + series.Fill(6, 6) + series.Fill(5, 5) + series.Fill(4, 4) + series.Fill(3, 3) + series.Fill(2, 2) + series.Fill(1, 1) + + if complete, result := Percentile[int, int](series, 90); !complete || result != expected { + test.Fatalf( + "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_Forever90_partial_percentile_reversed(test *testing.T) { + var expected int = 10 + series := newWindowSeriesForeverImpl[int, int]() + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(10, 1) + series.Fill(9, 2) + series.Fill(8, 3) + series.Fill(7, 4) + series.Fill(6, 5) + series.Fill(5, 6) + series.Fill(4, 7) + series.Fill(3, 8) + series.Fill(2, 9) + series.Fill(1, 10) + + if complete, result := Percentile[int, int](series, 90); !complete || result != expected { + test.Fatalf( + "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_Forever50_partial_percentile_jumbled(test *testing.T) { + var expected int64 = 15 + series := newWindowSeriesForeverImpl[int64, int]() + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(1, 7) + series.Fill(2, 2) + series.Fill(3, 15) + series.Fill(4, 27) + series.Fill(5, 5) + series.Fill(6, 52) + series.Fill(7, 18) + series.Fill(8, 23) + series.Fill(9, 11) + series.Fill(10, 12) + + if complete, result := Percentile[int64, int](series, 50); !complete || result != expected { + test.Fatalf( + "Series 50 percentile of a jumble of numbers failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +/////////////////////// + +func Test_WindowOnlySequentialIncreasesAlwaysLessThanWithWraparound(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[float64, int](20) + previous := float64(1.0) + for i := range utilities.Iota(1, 21) { + previous *= 1.15 + series.Reserve(i) + series.Fill(1, float64(previous)) + } + + // All those measurements should be ejected by the following + // loop! + for i := range utilities.Iota(1, 21) { + previous *= 1.10 + series.Reserve(i + 20) + series.Fill(i+20, float64(previous)) + } + + if complete, islt, maxSeqIncrease := AllSequentialIncreasesLessThan[float64, int](series, + 11.0); !complete || !islt || !utilities.ApproximatelyEqual(maxSeqIncrease, 10, 0.1) { + test.Fatalf( + "Sequential increases are not always less than 11.0 in wraparound situation (%f v 11.0).", + maxSeqIncrease, + ) + } +} + +func Test_WindowOnlySequentialIncreasesAlwaysLessThanWithWraparoundInverse(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[float64, int](20) + previous := float64(1.0) + i := 0 + for i = range utilities.Iota(1, 21) { + previous *= 1.15 + series.Reserve(i) + series.Fill(i, float64(previous)) + } + + // *Not* all those measurements should be ejected by the following + // loop! + for j := range utilities.Iota(1, 16) { + previous *= 1.10 + series.Reserve(i + j) + series.Fill(i+j, float64(previous)) + } + + if complete, islt, maxSeqIncrease := AllSequentialIncreasesLessThan[float64, int](series, 11.0); complete == false || islt { + test.Fatalf( + "Sequential increases are (unexpectedly) always less than 11.0 in wraparound situation: %f v 11.0.", + maxSeqIncrease, + ) + } +} + +func Test_WindowOnlyStandardDeviationIncompleteCalculation(test *testing.T) { + expected := 2.93 + series := newWindowSeriesWindowOnlyImpl[float64, int](6) + // 5.7, 1.0, 8.6, 7.4, 2.2 + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + + series.Fill(1, 5.7) + series.Fill(2, 1.0) + series.Fill(3, 8.6) + series.Fill(4, 7.4) + series.Fill(5, 2.2) + + if complete, sd := SeriesStandardDeviation[float64, int](series); complete != false || + !utilities.ApproximatelyEqual(sd, expected, 0.01) { + test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd) + } else { + test.Logf("Standard deviation calculation result: %v", sd) + } +} + +func Test_WindowOnlyStandardDeviationCalculation(test *testing.T) { + expected := 2.93 + series := newWindowSeriesWindowOnlyImpl[float64, int](5) + // 5.7, 1.0, 8.6, 7.4, 2.2 + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + series.Reserve(11) + series.Reserve(12) + series.Reserve(13) + + series.Fill(1, 5.7) + series.Fill(2, 5.7) + series.Fill(3, 5.7) + series.Fill(4, 5.7) + series.Fill(5, 5.7) + series.Fill(6, 5.7) + series.Fill(7, 5.7) + series.Fill(8, 5.7) + series.Fill(9, 5.7) + series.Fill(10, 1.0) + series.Fill(11, 8.6) + series.Fill(12, 7.4) + series.Fill(13, 2.2) + + if complete, sd := SeriesStandardDeviation[float64, int](series); complete != true || + !utilities.ApproximatelyEqual(sd, expected, 0.01) { + test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd) + } +} + +func Test_WindowOnlyStandardDeviationCalculation2(test *testing.T) { + expected := 1.41 + series := newWindowSeriesWindowOnlyImpl[float64, int](5) + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + + series.Fill(1, 8) + series.Fill(2, 9) + series.Fill(3, 10) + series.Fill(4, 11) + series.Fill(5, 12) + + if _, sd := SeriesStandardDeviation[float64, int](series); !utilities.ApproximatelyEqual(sd, expected, 0.01) { + test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd) + } else { + test.Logf("Standard deviation calculation result: %v", sd) + } +} + +func Test_WindowOnlyRotatingValues(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[int, int](5) + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + + series.Reserve(6) + series.Reserve(7) + + series.Fill(1, 1) + series.Fill(2, 2) + series.Fill(3, 3) + series.Fill(4, 4) + series.Fill(5, 5) + + series.Fill(6, 6) + series.Fill(7, 7) + + actual := utilities.Fmap(series.GetValues(), func(i utilities.Optional[int]) int { + return utilities.GetSome[int](i) + }) + if !reflect.DeepEqual([]int{7, 6, 5, 4, 3}, actual) { + test.Fatalf("Adding values does not properly erase earlier values.") + } +} + +func Test_WindowOnly_degenerate_percentile_too_high(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[int, int](21) + if complete, p := Percentile[int, int](series, 101); complete != false || p != 0 { + test.Fatalf("Series percentile of 101 failed.") + } +} + +func Test_WindowOnly_degenerate_percentile_too_low(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[int, int](21) + if complete, p := Percentile[int, int](series, -1); complete != false || p != 0 { + test.Fatalf("Series percentile of -1 failed.") + } +} + +func Test_WindowOnly90_percentile(test *testing.T) { + var expected int = 10 + series := newWindowSeriesWindowOnlyImpl[int, int](10) + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(10, 10) + series.Fill(9, 9) + series.Fill(8, 8) + series.Fill(7, 7) + series.Fill(6, 6) + series.Fill(5, 5) + series.Fill(4, 4) + series.Fill(3, 3) + series.Fill(2, 2) + series.Fill(1, 1) + + if complete, result := Percentile[int, int](series, 90); !complete || result != expected { + test.Fatalf( + "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_WindowOnly90_percentile_reversed(test *testing.T) { + var expected int = 10 + series := newWindowSeriesWindowOnlyImpl[int, int](10) + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(10, 1) + series.Fill(9, 2) + series.Fill(8, 3) + series.Fill(7, 4) + series.Fill(6, 5) + series.Fill(5, 6) + series.Fill(4, 7) + series.Fill(3, 8) + series.Fill(2, 9) + series.Fill(1, 10) + + if complete, result := Percentile[int, int](series, 90); !complete || result != expected { + test.Fatalf( + "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_WindowOnly50_percentile_jumbled(test *testing.T) { + var expected int64 = 15 + series := newWindowSeriesWindowOnlyImpl[int64, int](10) + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(1, 7) + series.Fill(2, 2) + series.Fill(3, 15) + series.Fill(4, 27) + series.Fill(5, 5) + series.Fill(6, 52) + series.Fill(7, 18) + series.Fill(8, 23) + series.Fill(9, 11) + series.Fill(10, 12) + + if complete, result := Percentile[int64, int](series, 50); !complete || result != expected { + test.Fatalf( + "Series 50 percentile of a jumble of numbers failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_WindowOnly90_partial_percentile(test *testing.T) { + var expected int = 10 + series := newWindowSeriesWindowOnlyImpl[int, int](20) + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(10, 10) + series.Fill(9, 9) + series.Fill(8, 8) + series.Fill(7, 7) + series.Fill(6, 6) + series.Fill(5, 5) + series.Fill(4, 4) + series.Fill(3, 3) + series.Fill(2, 2) + series.Fill(1, 1) + + if complete, result := Percentile[int, int](series, 90); complete != false || result != expected { + test.Fatalf( + "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_WindowOnly90_partial_percentile_reversed(test *testing.T) { + var expected int = 10 + series := newWindowSeriesWindowOnlyImpl[int, int](20) + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(10, 1) + series.Fill(9, 2) + series.Fill(8, 3) + series.Fill(7, 4) + series.Fill(6, 5) + series.Fill(5, 6) + series.Fill(4, 7) + series.Fill(3, 8) + series.Fill(2, 9) + series.Fill(1, 10) + + if complete, result := Percentile[int, int](series, 90); complete != false || result != expected { + test.Fatalf( + "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +func Test_WindowOnly50_partial_percentile_jumbled(test *testing.T) { + var expected int64 = 15 + series := newWindowSeriesWindowOnlyImpl[int64, int](20) + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + series.Reserve(7) + series.Reserve(8) + series.Reserve(9) + series.Reserve(10) + + series.Fill(1, 7) + series.Fill(2, 2) + series.Fill(3, 15) + series.Fill(4, 27) + series.Fill(5, 5) + series.Fill(6, 52) + series.Fill(7, 18) + series.Fill(8, 23) + series.Fill(9, 11) + series.Fill(10, 12) + + if complete, result := Percentile[int64, int](series, 50); complete != false || result != expected { + test.Fatalf( + "Series 50 percentile of a jumble of numbers failed: (Complete: %v) Expected %v got %v.", complete, expected, result) + } +} + +/* + +func Test_WindowOnlyDoubleSidedTrimmedMean_jumbled(test *testing.T) { + expected := 8 + series := newWindowSeriesWindowOnlyImpl[int64, int](10) + series.AddElement(7) + series.AddElement(2) + series.AddElement(15) + series.AddElement(27) + series.AddElement(5) + series.AddElement(5) + series.AddElement(52) + series.AddElement(18) + series.AddElement(23) + series.AddElement(11) + series.AddElement(12) + + trimmed := series.DoubleSidedTrim(10) + + if trimmed.Len() != expected { + test.Fatalf( + "WindowOnly series is not of the proper size. Expected %v and got %v", + expected, + trimmed.Len(), + ) + } + + prev := int64(0) + for _, v := range trimmed.Values() { + if !(prev <= v) { + test.Fatalf("Not sorted: %v is not less than or equal to %v\n", prev, v) + } + prev = v + } +} + +func Test_WindowOnlyAverage(test *testing.T) { + expected := 1.0082230220488836e+08 + series := newWindowSeriesWindowOnlyImpl[float64, int](4) + series.AddElement(9.94747772516195e+07) + series.AddElement(9.991286984703423e+07) + series.AddElement(1.0285437111086299e+08) + series.AddElement(1.0104719061003672e+08) + if average := series.CalculateAverage(); !utilities.ApproximatelyEqual(average, 0.01, expected) { + test.Fatalf( + "Expected: %v; Actual: %v.", average, expected, + ) + } +} +*/ + +func Test_ForeverStandardDeviationIncompleteCalculation(test *testing.T) { + foreverExpected := 2.90 + series := newWindowSeriesForeverImpl[float64, int]() + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + series.Reserve(6) + + series.Fill(1, 5.7) + series.Fill(2, 1.0) + series.Fill(3, 8.6) + series.Fill(4, 7.4) + series.Fill(5, 2.2) + series.Fill(6, 8.0) + + if complete, sd := SeriesStandardDeviation[float64, int](series); !complete || + !utilities.ApproximatelyEqual(sd, foreverExpected, 0.01) { + test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", foreverExpected, sd) + } +} + +func Test_ForeverStandardDeviationCalculation2(test *testing.T) { + expected := 1.41 + series := newWindowSeriesForeverImpl[float64, int]() + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + + series.Fill(1, 8) + series.Fill(2, 9) + series.Fill(3, 10) + series.Fill(4, 11) + series.Fill(5, 12) + + if _, sd := SeriesStandardDeviation[float64, int](series); !utilities.ApproximatelyEqual(sd, expected, 0.01) { + test.Fatalf("Standard deviation(series) max calculation(series) failed: Expected: %v; Actual: %v.", expected, sd) + } +} diff --git a/series/statistics.go b/series/statistics.go new file mode 100644 index 0000000..2742aa7 --- /dev/null +++ b/series/statistics.go @@ -0,0 +1,74 @@ +package series + +import ( + "github.com/network-quality/goresponsiveness/utilities" + "golang.org/x/exp/constraints" +) + +func SeriesStandardDeviation[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket]) (bool, float64) { + complete := s.Complete() + + inputValues := s.GetValues() + + actualValues := utilities.Filter(inputValues, func(d utilities.Optional[Data]) bool { + return utilities.IsSome[Data](d) + }) + values := utilities.Fmap(actualValues, func(d utilities.Optional[Data]) Data { return utilities.GetSome[Data](d) }) + + return complete, utilities.CalculateStandardDeviation[Data](values) +} + +func Percentile[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], p int) (bool, Data) { + complete := s.Complete() + + inputValues := s.GetValues() + + actualValues := utilities.Filter(inputValues, func(d utilities.Optional[Data]) bool { + return utilities.IsSome[Data](d) + }) + values := utilities.Fmap(actualValues, func(d utilities.Optional[Data]) Data { return utilities.GetSome[Data](d) }) + + return complete, utilities.CalculatePercentile(values, p) +} + +func AllSequentialIncreasesLessThan[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], limit float64, +) (bool, bool, float64) { + complete := s.Complete() + + inputValues := s.GetValues() + + actualValues := utilities.Filter(utilities.Reverse(inputValues), func(d utilities.Optional[Data]) bool { + return utilities.IsSome[Data](d) + }) + values := utilities.Fmap(actualValues, func(d utilities.Optional[Data]) Data { return utilities.GetSome[Data](d) }) + + result, actualLimit := utilities.AllSequentialIncreasesLessThan(values, limit) + return complete, result, actualLimit +} + +func CalculateAverage[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket]) (bool, float64) { + complete := s.Complete() + + inputValues := s.GetValues() + + actualValues := utilities.Filter(inputValues, func(d utilities.Optional[Data]) bool { + return utilities.IsSome[Data](d) + }) + values := utilities.Fmap(actualValues, func(d utilities.Optional[Data]) Data { return utilities.GetSome[Data](d) }) + + return complete, utilities.CalculateAverage(values) +} + +func TrimmedMean[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], trim int) (bool, float64, []Data) { + complete := s.Complete() + + inputValues := s.GetValues() + + actualValues := utilities.Filter(inputValues, func(d utilities.Optional[Data]) bool { + return utilities.IsSome[Data](d) + }) + values := utilities.Fmap(actualValues, func(d utilities.Optional[Data]) Data { return utilities.GetSome[Data](d) }) + + trimmedMean, trimmedElements := utilities.TrimmedMean(values, trim) + return complete, trimmedMean, trimmedElements +} diff --git a/stabilizer/algorithm.go b/stabilizer/algorithm.go index 45c34d9..86b4bb6 100644 --- a/stabilizer/algorithm.go +++ b/stabilizer/algorithm.go @@ -19,19 +19,23 @@ import ( "sync" "github.com/network-quality/goresponsiveness/debug" - "github.com/network-quality/goresponsiveness/ms" + "github.com/network-quality/goresponsiveness/series" + "github.com/network-quality/goresponsiveness/utilities" "golang.org/x/exp/constraints" ) -type MeasurementStablizer[T constraints.Float | constraints.Integer] struct { - instantaneousses ms.MathematicalSeries[T] - aggregates ms.MathematicalSeries[float64] +type MeasurementStablizer[Data constraints.Float | constraints.Integer, Bucket constraints.Ordered] struct { + // The number of instantaneous measurements in the current interval could be infinite (Forever). + instantaneousses series.WindowSeries[Data, Bucket] + // There are a fixed, finite number of aggregates (WindowOnly). + aggregates series.WindowSeries[series.WindowSeries[Data, Bucket], int] stabilityStandardDeviation float64 trimmingLevel uint m sync.Mutex dbgLevel debug.DebugLevel dbgConfig *debug.DebugWithPrefix units string + currentInterval int } // Stabilizer parameters: @@ -54,77 +58,159 @@ type MeasurementStablizer[T constraints.Float | constraints.Integer] struct { // If the calculated standard deviation of *those* values is less than SDT, we declare // stability. -func NewStabilizer[T constraints.Float | constraints.Integer]( - mad uint, +func NewStabilizer[Data constraints.Float | constraints.Integer, Bucket constraints.Ordered]( + mad int, sdt float64, trimmingLevel uint, units string, debugLevel debug.DebugLevel, debug *debug.DebugWithPrefix, -) MeasurementStablizer[T] { - return MeasurementStablizer[T]{ - instantaneousses: ms.NewCappedMathematicalSeries[T](mad), - aggregates: ms.NewCappedMathematicalSeries[float64](mad), +) MeasurementStablizer[Data, Bucket] { + return MeasurementStablizer[Data, Bucket]{ + instantaneousses: series.NewWindowSeries[Data, Bucket](series.Forever, 0), + aggregates: series.NewWindowSeries[ + series.WindowSeries[Data, Bucket], int](series.WindowOnly, mad), stabilityStandardDeviation: sdt, trimmingLevel: trimmingLevel, units: units, + currentInterval: 0, dbgConfig: debug, dbgLevel: debugLevel, } } -func (r3 *MeasurementStablizer[T]) AddMeasurement(measurement T) { +func (r3 *MeasurementStablizer[Data, Bucket]) Reserve(bucket Bucket) { + r3.m.Lock() + defer r3.m.Unlock() + r3.instantaneousses.Reserve(bucket) +} + +func (r3 *MeasurementStablizer[Data, Bucket]) AddMeasurement(bucket Bucket, measurement Data) { + r3.m.Lock() + defer r3.m.Unlock() + + // Fill in the bucket in the current interval. + if err := r3.instantaneousses.Fill(bucket, measurement); err != nil { + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf("%s: A bucket (with id %v) does not exist in the isntantaneousses.\n", + r3.dbgConfig.String(), + bucket) + } + } + + // The result may have been retired from the current interval. Look in the older series + // to fill it in there if it is. + r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) { + if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) { + md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md) + if err := md.Fill(bucket, measurement); err != nil { + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf("%s: A bucket (with id %v) does not exist in a historical window.\n", + r3.dbgConfig.String(), + bucket) + } + } + } + }) + + /* + // Add this instantaneous measurement to the mix of the MAD previous instantaneous measurements. + r3.instantaneousses.Fill(bucket, measurement) + // Calculate the moving average of the MAD previous instantaneous measurements (what the + // algorithm calls moving average aggregate throughput at interval p) and add it to + // the mix of MAD previous moving averages. + + r3.aggregates.AutoFill(r3.instantaneousses.CalculateAverage()) + + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf( + "%s: MA: %f Mbps (previous %d intervals).\n", + r3.dbgConfig.String(), + r3.aggregates.CalculateAverage(), + r3.aggregates.Len(), + ) + } + */ +} + +func (r3 *MeasurementStablizer[Data, Bucket]) Interval() { r3.m.Lock() defer r3.m.Unlock() - // Add this instantaneous measurement to the mix of the MAD previous instantaneous measurements. - r3.instantaneousses.AddElement(measurement) - // Calculate the moving average of the MAD previous instantaneous measurements (what the - // algorithm calls moving average aggregate throughput at interval p) and add it to - // the mix of MAD previous moving averages. - r3.aggregates.AddElement(r3.instantaneousses.CalculateAverage()) if debug.IsDebug(r3.dbgLevel) { fmt.Printf( - "%s: MA: %f Mbps (previous %d intervals).\n", + "%s: stability interval marked (transitioning from %d to %d).\n", r3.dbgConfig.String(), - r3.aggregates.CalculateAverage(), - r3.aggregates.Len(), + r3.currentInterval, + r3.currentInterval+1, ) } -} -func (r3 *MeasurementStablizer[T]) IsStable() bool { - // There are MAD number of measurements of the _moving average aggregate throughput - // at interval p_ in movingAverages. - isvalid, stddev := r3.aggregates.StandardDeviation() + // At the interval boundary, move the instantaneous series to + // the aggregates and start a new instantaneous series. + r3.aggregates.Reserve(r3.currentInterval) + r3.aggregates.Fill(r3.currentInterval, r3.instantaneousses) - if !isvalid { - // If there are not enough values in the series to be able to calculate a - // standard deviation, then we know that we are not yet stable. Vamoose. - return false - } + r3.instantaneousses = series.NewWindowSeries[Data, Bucket](series.Forever, 0) + r3.currentInterval++ +} - // Stability is determined by whether or not the standard deviation of the values - // is within some percentage of the average. - stabilityCutoff := r3.aggregates.CalculateAverage() * (r3.stabilityStandardDeviation / 100.0) - isStable := stddev <= stabilityCutoff +func (r3 *MeasurementStablizer[Data, Bucket]) IsStable() bool { + r3.m.Lock() + defer r3.m.Unlock() if debug.IsDebug(r3.dbgLevel) { fmt.Printf( - "%s: Is Stable? %v; Standard Deviation: %f %s; Is Normally Distributed? %v; Standard Deviation Cutoff: %v %s).\n", + "%s: Determining stability in the %d th interval.\n", r3.dbgConfig.String(), - isStable, - stddev, - r3.units, - r3.aggregates.IsNormallyDistributed(), - stabilityCutoff, - r3.units, + r3.currentInterval, ) - fmt.Printf("%s: Values: ", r3.dbgConfig.String()) - for _, v := range r3.aggregates.Values() { - fmt.Printf("%v, ", v) + } + // Determine if + // a) All the aggregates have values, + // b) All the aggregates are complete. + allComplete := true + r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) { + if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) { + md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md) + allComplete = md.Complete() + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf("%s\n", md.String()) + } + } else { + allComplete = false } - fmt.Printf("\n") + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf( + "%s: The aggregate for the %d th interval was %s.\n", + r3.dbgConfig.String(), + b, + utilities.Conditional(allComplete, "complete", "incomplete"), + ) + } + }) + + if !allComplete { + return false } + + // Calculate the averages of each of the aggregates. + averages := make([]float64, 0) + r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) { + if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) { + md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md) + _, average := series.CalculateAverage(md) + averages = append(averages, average) + } + }) + + // Calculate the standard deviation of the averages of the aggregates. + sd := utilities.CalculateStandardDeviation(averages) + + // Take a percentage of the average of the averages of the aggregates ... + stabilityCutoff := utilities.CalculateAverage(averages) * (r3.stabilityStandardDeviation / 100.0) + // and compare that to the standard deviation to determine stability. + isStable := sd <= stabilityCutoff + return isStable } diff --git a/stabilizer/stabilizer.go b/stabilizer/stabilizer.go index b582751..b157ce1 100644 --- a/stabilizer/stabilizer.go +++ b/stabilizer/stabilizer.go @@ -14,7 +14,12 @@ package stabilizer -type Stabilizer[T any] interface { - AddMeasurement(T) +import ( + "golang.org/x/exp/constraints" +) + +type Stabilizer[Data any, Bucket constraints.Ordered] interface { + Interval() + AddMeasurement(Data, Bucket) IsStable() bool } diff --git a/utilities/math.go b/utilities/math.go new file mode 100644 index 0000000..1ecca61 --- /dev/null +++ b/utilities/math.go @@ -0,0 +1,144 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package utilities + +import ( + "math" + "sort" + + "golang.org/x/exp/constraints" + "golang.org/x/exp/slices" +) + +type Number interface { + constraints.Float | constraints.Integer +} + +func CalculateAverage[T Number](elements []T) float64 { + total := T(0) + for i := 0; i < len(elements); i++ { + total += elements[i] + } + return float64(total) / float64(len(elements)) +} + +func CalculatePercentile[T Number]( + elements []T, + p int, +) (result T) { + result = T(0) + if p < 0 || p > 100 { + return + } + + sort.Slice(elements, func(l int, r int) bool { return elements[l] < elements[r] }) + pindex := int64((float64(p) / float64(100)) * float64(len(elements))) + result = elements[pindex] + return +} + +func Max(x, y uint64) uint64 { + if x > y { + return x + } + return y +} + +func SignedPercentDifference[T constraints.Float | constraints.Integer]( + current T, + previous T, +) (difference float64) { + fCurrent := float64(current) + fPrevious := float64(previous) + return ((fCurrent - fPrevious) / fPrevious) * 100.0 +} + +func AbsPercentDifference( + current float64, + previous float64, +) (difference float64) { + return (math.Abs(current-previous) / (float64(current+previous) / 2.0)) * float64( + 100, + ) +} + +func CalculateStandardDeviation[T constraints.Float | constraints.Integer](elements []T) float64 { + // From https://www.mathsisfun.com/data/standard-deviation-calculator.html + // Yes, for real! + + // Calculate the average of the numbers ... + average := CalculateAverage(elements) + + // Calculate the square of each of the elements' differences from the mean. + differences_squared := make([]float64, len(elements)) + for index, value := range elements { + differences_squared[index] = math.Pow(float64(value-T(average)), 2) + } + + // The variance is the average of the squared differences. + // So, we need to ... + + // Accumulate all those squared differences. + sds := float64(0) + for _, dss := range differences_squared { + sds += dss + } + + // And then divide that total by the number of elements + variance := sds / float64(len(elements)) + + // Finally, the standard deviation is the square root + // of the variance. + sd := float64(math.Sqrt(variance)) + // sd := T(variance) + + return sd +} + +func AllSequentialIncreasesLessThan[T Number](elements []T, limit float64) (bool, float64) { + if len(elements) < 2 { + return false, 0.0 + } + + maximumSequentialIncrease := float64(0) + for i := 1; i < len(elements); i++ { + current := elements[i] + previous := elements[i-1] + percentChange := SignedPercentDifference(current, previous) + if percentChange > limit { + return false, percentChange + } + if percentChange > float64(maximumSequentialIncrease) { + maximumSequentialIncrease = percentChange + } + } + return true, maximumSequentialIncrease +} + +// elements must already be sorted! +func TrimBy[T Number](elements []T, trim int) []T { + numberToKeep := int(float32(len(elements)) * (float32(trim) / 100.0)) + + return elements[:numberToKeep] +} + +func TrimmedMean[T Number](elements []T, trim int) (float64, []T) { + sortedElements := make([]T, len(elements)) + copy(sortedElements, elements) + slices.Sort(sortedElements) + + trimmedElements := TrimBy(sortedElements, trim) + return CalculateAverage(trimmedElements), trimmedElements +} diff --git a/utilities/utilities.go b/utilities/utilities.go index ff04023..b976f77 100644 --- a/utilities/utilities.go +++ b/utilities/utilities.go @@ -21,13 +21,10 @@ import ( "math/rand" "os" "reflect" - "sort" "strings" "sync" "sync/atomic" "time" - - "golang.org/x/exp/constraints" ) // GitVersion is the Git revision hash @@ -46,24 +43,6 @@ func IsInterfaceNil(ifc interface{}) bool { (reflect.ValueOf(ifc).Kind() == reflect.Ptr && reflect.ValueOf(ifc).IsNil()) } -func SignedPercentDifference[T constraints.Float | constraints.Integer]( - current T, - previous T, -) (difference float64) { - fCurrent := float64(current) - fPrevious := float64(previous) - return ((fCurrent - fPrevious) / fPrevious) * 100.0 -} - -func AbsPercentDifference( - current float64, - previous float64, -) (difference float64) { - return (math.Abs(current-previous) / (float64(current+previous) / 2.0)) * float64( - 100, - ) -} - func Conditional[T any](condition bool, t T, f T) T { if condition { return t @@ -137,13 +116,6 @@ func RandBetween(max int) int { return rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % max } -func Max(x, y uint64) uint64 { - if x > y { - return x - } - return y -} - func ChannelToSlice[S any](channel <-chan S) (slice []S) { slice = make([]S, 0) for element := range channel { @@ -152,6 +124,26 @@ func ChannelToSlice[S any](channel <-chan S) (slice []S) { return } +func Reverse[T any](elements []T) []T { + result := make([]T, len(elements)) + iterator := len(elements) - 1 + for _, v := range elements { + result[iterator] = v + iterator-- + } + return result +} + +func Filter[S any](elements []S, filterer func(S) bool) []S { + result := make([]S, 0) + for _, s := range elements { + if filterer(s) { + result = append(result, s) + } + } + return result +} + func Fmap[S any, F any](elements []S, mapper func(S) F) []F { result := make([]F, 0) for _, s := range elements { @@ -160,13 +152,6 @@ func Fmap[S any, F any](elements []S, mapper func(S) F) []F { return result } -func CalculatePercentile[S float32 | int32 | float64 | int64](elements []S, percentile int) S { - sort.Slice(elements, func(a, b int) bool { return elements[a] < elements[b] }) - elementsCount := len(elements) - percentileIdx := elementsCount * (percentile / 100) - return elements[percentileIdx] -} - func OrTimeout(f func(), timeout time.Duration) { completeChannel := func() chan interface{} { completed := make(chan interface{}) @@ -227,9 +212,9 @@ func ContextSignaler(ctxt context.Context, st time.Duration, condition *func() b } } -type Pair[T any] struct { - First T - Second T +type Pair[T1, T2 any] struct { + First T1 + Second T2 } func PerSecondToInterval(rate int64) time.Duration { diff --git a/utilities/utilities_test.go b/utilities/utilities_test.go index 9cd4ef0..7f3d83a 100644 --- a/utilities/utilities_test.go +++ b/utilities/utilities_test.go @@ -126,3 +126,65 @@ func TestPerSecondToInterval(t *testing.T) { t.Fatalf("Something that happens twice per second should happen every 5000ns.") } } + +func TestTrim(t *testing.T) { + elements := Iota(1, 101) + + trimmedElements := TrimBy(elements, 75) + + trimmedLength := len(trimmedElements) + trimmedLast := trimmedElements[trimmedLength-1] + + if trimmedLength != 75 || trimmedLast != 75 { + t.Fatalf("When trimming, the length should be 75 but it is %d and/or the last element should be 75 but it is %d", trimmedLength, trimmedLast) + } +} + +func TestTrim2(t *testing.T) { + elements := Iota(1, 11) + + trimmedElements := TrimBy(elements, 75) + + trimmedLength := len(trimmedElements) + trimmedLast := trimmedElements[trimmedLength-1] + + if trimmedLength != 7 || trimmedLast != 7 { + t.Fatalf("When trimming, the length should be 7 but it is %d and/or the last element should be 7 but it is %d", trimmedLength, trimmedLast) + } +} + +func TestTrim3(t *testing.T) { + elements := Iota(1, 6) + + trimmedElements := TrimBy(elements, 101) + + trimmedLength := len(trimmedElements) + trimmedLast := trimmedElements[trimmedLength-1] + + if trimmedLength != 5 || trimmedLast != 5 { + t.Fatalf("When trimming, the length should be 5 but it is %d and/or the last element should be 5 but it is %d", trimmedLength, trimmedLast) + } +} + +func TestTrim4(t *testing.T) { + elements := Iota(1, 11) + + trimmedElements := TrimBy(elements, 81) + + trimmedLength := len(trimmedElements) + trimmedLast := trimmedElements[trimmedLength-1] + + if trimmedLength != 8 || trimmedLast != 8 { + t.Fatalf("When trimming, the length should be 8 but it is %d and/or the last element should be 8 but it is %d", trimmedLength, trimmedLast) + } +} + +func TestTrimmedMean(t *testing.T) { + expected := 2.5 + elements := []int{5, 4, 3, 2, 1} + + result, elements := TrimmedMean(elements, 80) + if result != expected || len(elements) != 4 || elements[len(elements)-1] != 4 { + t.Fatalf("The trimmed mean result %v does not match the expected value %v", result, expected) + } +} |
