summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile2
-rw-r--r--direction/direction.go29
-rw-r--r--ms/ms.go371
-rw-r--r--ms/ms_test.go431
-rw-r--r--networkQuality.go227
-rw-r--r--rpm/calculations.go62
-rw-r--r--rpm/rpm.go37
-rw-r--r--series/message.go19
-rw-r--r--series/series.go280
-rw-r--r--series/series_test.go846
-rw-r--r--series/statistics.go74
-rw-r--r--stabilizer/algorithm.go176
-rw-r--r--stabilizer/stabilizer.go9
-rw-r--r--utilities/math.go144
-rw-r--r--utilities/utilities.go61
-rw-r--r--utilities/utilities_test.go62
16 files changed, 1835 insertions, 995 deletions
diff --git a/Makefile b/Makefile
index e573dc3..23de0af 100644
--- a/Makefile
+++ b/Makefile
@@ -6,7 +6,7 @@ all: build test
build:
go build $(LDFLAGS) networkQuality.go
test:
- go test ./timeoutat/ ./traceable/ ./ms/ ./utilities/ ./lgc ./qualityattenuation ./rpm
+ go test ./timeoutat/ ./traceable/ ./utilities/ ./lgc ./qualityattenuation ./rpm ./series
golines:
find . -name '*.go' -exec ~/go/bin/golines -w {} \;
clean:
diff --git a/direction/direction.go b/direction/direction.go
index 3ef6585..865a273 100644
--- a/direction/direction.go
+++ b/direction/direction.go
@@ -23,19 +23,18 @@ import (
)
type Direction struct {
- DirectionLabel string
- SelfProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint]
- ForeignProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint]
- ThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint]
- GranularThroughputDataLogger datalogger.DataLogger[rpm.GranularThroughputDataPoint]
- CreateLgdc func() lgc.LoadGeneratingConnection
- Lgcc lgc.LoadGeneratingConnectionCollection
- DirectionDebugging *debug.DebugWithPrefix
- ProbeDebugging *debug.DebugWithPrefix
- ThroughputStabilizerDebugging *debug.DebugWithPrefix
- ResponsivenessStabilizerDebugging *debug.DebugWithPrefix
- LgStabilizationCommunicationChannel chan rpm.ThroughputDataPoint
- ExtendedStatsEligible bool
- StableThroughput bool
- StableResponsiveness bool
+ DirectionLabel string
+ SelfProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint]
+ ForeignProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint]
+ ThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint]
+ GranularThroughputDataLogger datalogger.DataLogger[rpm.GranularThroughputDataPoint]
+ CreateLgdc func() lgc.LoadGeneratingConnection
+ Lgcc lgc.LoadGeneratingConnectionCollection
+ DirectionDebugging *debug.DebugWithPrefix
+ ProbeDebugging *debug.DebugWithPrefix
+ ThroughputStabilizerDebugging *debug.DebugWithPrefix
+ ResponsivenessStabilizerDebugging *debug.DebugWithPrefix
+ ExtendedStatsEligible bool
+ StableThroughput bool
+ StableResponsiveness bool
}
diff --git a/ms/ms.go b/ms/ms.go
deleted file mode 100644
index 06c7340..0000000
--- a/ms/ms.go
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * This file is part of Go Responsiveness.
- *
- * Go Responsiveness is free software: you can redistribute it and/or modify it under
- * the terms of the GNU General Public License as published by the Free Software Foundation,
- * either version 2 of the License, or (at your option) any later version.
- * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
- * PARTICULAR PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
- */
-
-package ms
-
-import (
- "fmt"
- "math"
- "sort"
-
- "github.com/network-quality/goresponsiveness/saturating"
- "github.com/network-quality/goresponsiveness/utilities"
- "golang.org/x/exp/constraints"
-)
-
-type MathematicalSeries[T constraints.Float | constraints.Integer] interface {
- AddElement(T)
- CalculateAverage() float64
- AllSequentialIncreasesLessThan(float64) (bool, float64)
- StandardDeviation() (bool, T)
- IsNormallyDistributed() bool
- Len() int
- Values() []T
- Percentile(int) T
- DoubleSidedTrim(uint) MathematicalSeries[T]
- Less(int, int) bool
- Swap(int, int)
-}
-
-func calculateAverage[T constraints.Integer | constraints.Float](elements []T) float64 {
- total := T(0)
- for i := 0; i < len(elements); i++ {
- total += elements[i]
- }
- return float64(total) / float64(len(elements))
-}
-
-func calculatePercentile[T constraints.Integer | constraints.Float](
- elements []T,
- p int,
-) (result T) {
- result = T(0)
- if p < 0 || p > 100 {
- return
- }
-
- sort.Slice(elements, func(l int, r int) bool { return elements[l] < elements[r] })
- pindex := int64((float64(p) / float64(100)) * float64(len(elements)))
- result = elements[pindex]
- return
-}
-
-type InfiniteMathematicalSeries[T constraints.Float | constraints.Integer] struct {
- elements []T
-}
-
-func NewInfiniteMathematicalSeries[T constraints.Float | constraints.Integer]() MathematicalSeries[T] {
- return &InfiniteMathematicalSeries[T]{}
-}
-
-func (ims *InfiniteMathematicalSeries[T]) Swap(i, j int) {
- ims.elements[i], ims.elements[j] = ims.elements[j], ims.elements[i]
-}
-
-func (ims *InfiniteMathematicalSeries[T]) Less(i, j int) bool {
- return ims.elements[i] < ims.elements[j]
-}
-
-func (ims *InfiniteMathematicalSeries[T]) DoubleSidedTrim(percent uint) MathematicalSeries[T] {
- if percent >= 100 {
- panic(
- fmt.Sprintf("Cannot perform double-sided trim for an invalid percentage: %d", percent),
- )
- }
-
- trimmed := &InfiniteMathematicalSeries[T]{}
- trimmed.elements = make([]T, ims.Len())
- copy(trimmed.elements, ims.elements)
-
- sort.Sort(trimmed)
-
- elementsToTrim := uint64(float32(ims.Len()) * ((float32(percent)) / float32(100.0)))
- trimmed.elements = trimmed.elements[elementsToTrim : len(trimmed.elements)-int(elementsToTrim)]
-
- return trimmed
-}
-
-func (ims *InfiniteMathematicalSeries[T]) Copy() MathematicalSeries[T] {
- newIms := InfiniteMathematicalSeries[T]{}
- newIms.elements = make([]T, ims.Len())
- copy(newIms.elements, ims.elements)
- return &newIms
-}
-
-func (ims *InfiniteMathematicalSeries[T]) AddElement(element T) {
- ims.elements = append(ims.elements, element)
-}
-
-func (ims *InfiniteMathematicalSeries[T]) CalculateAverage() float64 {
- return calculateAverage(ims.elements)
-}
-
-func (ims *InfiniteMathematicalSeries[T]) AllSequentialIncreasesLessThan(
- limit float64,
-) (bool, float64) {
- if len(ims.elements) < 2 {
- return false, 0.0
- }
-
- maximumSequentialIncrease := float64(0)
- for i := 1; i < len(ims.elements); i++ {
- current := ims.elements[i]
- previous := ims.elements[i-1]
- percentChange := utilities.SignedPercentDifference(current, previous)
- if percentChange > limit {
- return false, percentChange
- }
- if percentChange > float64(maximumSequentialIncrease) {
- maximumSequentialIncrease = percentChange
- }
- }
- return true, maximumSequentialIncrease
-}
-
-/*
- * N.B.: Overflow is possible -- use at your discretion!
- */
-func (ims *InfiniteMathematicalSeries[T]) StandardDeviation() (bool, T) {
- // From https://www.mathsisfun.com/data/standard-deviation-calculator.html
- // Yes, for real!
-
- // Calculate the average of the numbers ...
- average := ims.CalculateAverage()
-
- // Calculate the square of each of the elements' differences from the mean.
- differences_squared := make([]float64, len(ims.elements))
- for index, value := range ims.elements {
- differences_squared[index] = math.Pow(float64(value-T(average)), 2)
- }
-
- // The variance is the average of the squared differences.
- // So, we need to ...
-
- // Accumulate all those squared differences.
- sds := float64(0)
- for _, dss := range differences_squared {
- sds += dss
- }
-
- // And then divide that total by the number of elements
- variance := sds / float64(len(ims.elements))
-
- // Finally, the standard deviation is the square root
- // of the variance.
- sd := T(math.Sqrt(variance))
- // sd := T(variance)
-
- return true, sd
-}
-
-func (ims *InfiniteMathematicalSeries[T]) IsNormallyDistributed() bool {
- return false
-}
-
-func (ims *InfiniteMathematicalSeries[T]) Len() int {
- return len(ims.elements)
-}
-
-func (ims *InfiniteMathematicalSeries[T]) Values() []T {
- return ims.elements
-}
-
-func (ims *InfiniteMathematicalSeries[T]) Percentile(p int) T {
- return calculatePercentile(ims.elements, p)
-}
-
-type CappedMathematicalSeries[T constraints.Float | constraints.Integer] struct {
- elements_count uint
- elements []T
- index uint
- divisor *saturating.Saturating[uint]
-}
-
-func NewCappedMathematicalSeries[T constraints.Float | constraints.Integer](
- instants_count uint,
-) MathematicalSeries[T] {
- return &CappedMathematicalSeries[T]{
- elements: make([]T, instants_count),
- elements_count: instants_count,
- divisor: saturating.NewSaturating(instants_count),
- index: 0,
- }
-}
-
-func (ma *CappedMathematicalSeries[T]) AddElement(measurement T) {
- ma.elements[ma.index] = measurement
- ma.divisor.Add(1)
- // Invariant: ma.index always points to the oldest measurement
- ma.index = (ma.index + 1) % ma.elements_count
-}
-
-func (ma *CappedMathematicalSeries[T]) CalculateAverage() float64 {
- // If we do not yet have all the values, then we know that the values
- // exist between 0 and ma.divisor.Value(). If we do have all the values,
- // we know that they, too, exist between 0 and ma.divisor.Value().
- return calculateAverage(ma.elements[0:ma.divisor.Value()])
-}
-
-func (ma *CappedMathematicalSeries[T]) AllSequentialIncreasesLessThan(
- limit float64,
-) (_ bool, maximumSequentialIncrease float64) {
- // If we have not yet accumulated a complete set of intervals,
- // this is false.
- if ma.divisor.Value() != ma.elements_count {
- return false, 0
- }
-
- // Invariant: ma.index always points to the oldest (see AddMeasurement
- // above)
- oldestIndex := ma.index
- previous := ma.elements[oldestIndex]
- maximumSequentialIncrease = 0
- for i := uint(1); i < ma.elements_count; i++ {
- currentIndex := (oldestIndex + i) % ma.elements_count
- current := ma.elements[currentIndex]
- percentChange := utilities.SignedPercentDifference(current, previous)
- previous = current
- if percentChange > limit {
- return false, percentChange
- }
- }
- return true, maximumSequentialIncrease
-}
-
-/*
- * N.B.: Overflow is possible -- use at your discretion!
- */
-func (ma *CappedMathematicalSeries[T]) StandardDeviation() (bool, T) {
- // If we have not yet accumulated a complete set of intervals,
- // we are always false.
- if ma.divisor.Value() != ma.elements_count {
- return false, T(0)
- }
-
- // From https://www.mathsisfun.com/data/standard-deviation-calculator.html
- // Yes, for real!
-
- // Calculate the average of the numbers ...
- average := ma.CalculateAverage()
-
- // Calculate the square of each of the elements' differences from the mean.
- differences_squared := make([]float64, ma.elements_count)
- for index, value := range ma.elements {
- differences_squared[index] = math.Pow(float64(value-T(average)), 2)
- }
-
- // The variance is the average of the squared differences.
- // So, we need to ...
-
- // Accumulate all those squared differences.
- sds := float64(0)
- for _, dss := range differences_squared {
- sds += dss
- }
-
- // And then divide that total by the number of elements
- variance := sds / float64(ma.divisor.Value())
-
- // Finally, the standard deviation is the square root
- // of the variance.
- sd := T(math.Sqrt(variance))
- // sd := T(variance)
-
- return true, sd
-}
-
-func (ma *CappedMathematicalSeries[T]) IsNormallyDistributed() bool {
- valid, stddev := ma.StandardDeviation()
- // If there are not enough values in our series to generate a standard
- // deviation, then we cannot do this calculation either.
- if !valid {
- return false
- }
- avg := float64(ma.CalculateAverage())
-
- fstddev := float64(stddev)
- within := float64(0)
- for _, v := range ma.Values() {
- if (avg-fstddev) <= float64(v) && float64(v) <= (avg+fstddev) {
- within++
- }
- }
- return within/float64(ma.divisor.Value()) >= 0.68
-}
-
-func (ma *CappedMathematicalSeries[T]) Values() []T {
- return ma.elements
-}
-
-func (ma *CappedMathematicalSeries[T]) Len() int {
- if uint(len(ma.elements)) != ma.elements_count {
- panic(
- fmt.Sprintf(
- "Error: A capped mathematical series' metadata is invalid: the length of its element array/slice does not match element_count! (%v vs %v)",
- ma.elements_count,
- len(ma.elements),
- ),
- )
- }
- return len(ma.elements)
-}
-
-func (ma *CappedMathematicalSeries[T]) Percentile(p int) T {
- if p < 0 || p > 100 {
- return 0
- }
-
- // Because we need to sort the list to perform the percentile calculation,
- // we have to make a copy of the list so that we don't disturb
- // the time-relative ordering of the elements.
-
- kopy := make([]T, len(ma.elements))
- copy(kopy, ma.elements)
- return calculatePercentile(kopy, p)
-}
-
-func (ims *CappedMathematicalSeries[T]) Swap(i, j int) {
- ims.elements[i], ims.elements[j] = ims.elements[j], ims.elements[i]
-}
-
-func (ims *CappedMathematicalSeries[T]) Less(i, j int) bool {
- return ims.elements[i] < ims.elements[j]
-}
-
-func (ims *CappedMathematicalSeries[T]) DoubleSidedTrim(percent uint) MathematicalSeries[T] {
- if percent >= 100 {
- panic(
- fmt.Sprintf("Cannot perform double-sided trim for an invalid percentage: %d", percent),
- )
- }
-
- trimmed := &CappedMathematicalSeries[T]{elements_count: uint(ims.Len())}
- trimmed.elements = make([]T, ims.Len())
- copy(trimmed.elements, ims.elements)
- sort.Sort(trimmed)
-
- elementsToTrim := uint(float32(ims.Len()) * ((float32(percent)) / float32(100.0)))
- trimmed.elements = trimmed.elements[elementsToTrim : len(trimmed.elements)-int(elementsToTrim)]
-
- trimmed.elements_count -= (elementsToTrim * 2)
-
- return trimmed
-}
-
-func (ims *CappedMathematicalSeries[T]) Copy() MathematicalSeries[T] {
- newCms := CappedMathematicalSeries[T]{}
- newCms.elements = make([]T, ims.Len())
- copy(newCms.elements, ims.elements)
- return &newCms
-}
diff --git a/ms/ms_test.go b/ms/ms_test.go
deleted file mode 100644
index 34817d0..0000000
--- a/ms/ms_test.go
+++ /dev/null
@@ -1,431 +0,0 @@
-package ms
-
-import (
- "reflect"
- "testing"
-
- "github.com/network-quality/goresponsiveness/utilities"
-)
-
-func Test_InfiniteValues(test *testing.T) {
- series := NewInfiniteMathematicalSeries[float64]()
- shouldMatch := make([]float64, 0)
- previous := float64(1.0)
- for range utilities.Iota(1, 80) {
- previous *= 1.059
- series.AddElement(float64(previous))
- shouldMatch = append(shouldMatch, previous)
- }
-
- if !reflect.DeepEqual(shouldMatch, series.Values()) {
- test.Fatalf("Values() on infinite mathematical series does not work.")
- }
-}
-
-func Test_InfiniteSequentialIncreasesAlwaysLessThan(test *testing.T) {
- series := NewInfiniteMathematicalSeries[float64]()
- previous := float64(1.0)
- for range utilities.Iota(1, 80) {
- previous *= 1.059
- series.AddElement(float64(previous))
- }
- if islt, maxSeqIncrease := series.AllSequentialIncreasesLessThan(6.0); !islt {
- test.Fatalf(
- "(infinite) Sequential increases are not always less than 6.0 (%f).",
- maxSeqIncrease,
- )
- }
-}
-
-func Test_CappedTooFewInstantsSequentialIncreasesLessThanAlwaysFalse(test *testing.T) {
- series := NewCappedMathematicalSeries[float64](500)
- series.AddElement(0.0)
- if islt, _ := series.AllSequentialIncreasesLessThan(6.0); islt {
- test.Fatalf(
- "(infinite) 0 elements in a series should always yield false when asking if sequential increases are less than a value.",
- )
- }
-}
-
-func Test_Infinite_degenerate_percentile_too_high(test *testing.T) {
- series := NewInfiniteMathematicalSeries[int]()
- if series.Percentile(101) != 0 {
- test.Fatalf("(infinite) Series percentile of 101 failed.")
- }
-}
-
-func Test_Infinite_degenerate_percentile_too_low(test *testing.T) {
- series := NewInfiniteMathematicalSeries[int]()
- if series.Percentile(-1) != 0 {
- test.Fatalf("(infinite) Series percentile of -1 failed.")
- }
-}
-
-func Test_Infinite90_percentile(test *testing.T) {
- var expected int64 = 10
- series := NewInfiniteMathematicalSeries[int64]()
- series.AddElement(10)
- series.AddElement(9)
- series.AddElement(8)
- series.AddElement(7)
- series.AddElement(6)
- series.AddElement(5)
- series.AddElement(4)
- series.AddElement(3)
- series.AddElement(2)
- series.AddElement(1)
-
- if series.Percentile(90) != expected {
- test.Fatalf(
- "(infinite) Series 90th percentile of 0 ... 10 failed: Expected: %v; Actual: %v.", expected,
- series.Percentile(90),
- )
- }
-}
-
-func Test_Infinite90_percentile_reversed(test *testing.T) {
- var expected int64 = 10
- series := NewInfiniteMathematicalSeries[int64]()
- series.AddElement(1)
- series.AddElement(2)
- series.AddElement(3)
- series.AddElement(4)
- series.AddElement(5)
- series.AddElement(6)
- series.AddElement(7)
- series.AddElement(8)
- series.AddElement(9)
- series.AddElement(10)
-
- if series.Percentile(90) != expected {
- test.Fatalf(
- "(infinite) Series 90th percentile of 0 ... 10 failed: Expected %v; Actual: %v.", expected,
- series.Percentile(90),
- )
- }
-}
-
-func Test_Infinite50_percentile_jumbled(test *testing.T) {
- var expected int64 = 15
- series := NewInfiniteMathematicalSeries[int64]()
- series.AddElement(7)
- series.AddElement(2)
- series.AddElement(15)
- series.AddElement(27)
- series.AddElement(5)
- series.AddElement(52)
- series.AddElement(18)
- series.AddElement(23)
- series.AddElement(11)
- series.AddElement(12)
-
- if series.Percentile(50) != expected {
- test.Fatalf(
- "(infinite) Series 50 percentile of a jumble of numbers failed: Expected %v; Actual: %v.", expected,
- series.Percentile(50),
- )
- }
-}
-
-func Test_InfiniteDoubleSidedTrimmedMean_jumbled(test *testing.T) {
- expected := 16
- series := NewInfiniteMathematicalSeries[int64]()
- series.AddElement(7)
- series.AddElement(2)
- series.AddElement(15)
- series.AddElement(27)
- series.AddElement(5)
- series.AddElement(5)
- series.AddElement(52)
- series.AddElement(18)
- series.AddElement(23)
- series.AddElement(11)
- series.AddElement(22)
- series.AddElement(17)
- series.AddElement(14)
- series.AddElement(9)
- series.AddElement(100)
- series.AddElement(72)
- series.AddElement(91)
- series.AddElement(43)
- series.AddElement(37)
- series.AddElement(62)
-
- trimmed := series.DoubleSidedTrim(10)
-
- if trimmed.Len() != expected {
- test.Fatalf(
- "Capped series is not of the proper size. Expected %v and got %v",
- expected,
- trimmed.Len(),
- )
- }
-
- prev := int64(0)
- for _, v := range trimmed.Values() {
- if !(prev <= v) {
- test.Fatalf("Not sorted: %v is not less than or equal to %v\n", prev, v)
- }
- prev = v
- }
-}
-
-func Test_CappedSequentialIncreasesAlwaysLessThan(test *testing.T) {
- series := NewCappedMathematicalSeries[float64](40)
- previous := float64(1.0)
- for range utilities.Iota(1, 80) {
- previous *= 1.059
- series.AddElement(float64(previous))
- }
- if islt, maxSeqIncrease := series.AllSequentialIncreasesLessThan(6.0); !islt {
- test.Fatalf("Sequential increases are not always less than 6.0 (%f).", maxSeqIncrease)
- }
-}
-
-func Test_CappedSequentialIncreasesAlwaysLessThanWithWraparound(test *testing.T) {
- series := NewCappedMathematicalSeries[float64](20)
- previous := float64(1.0)
- for range utilities.Iota(1, 20) {
- previous *= 1.15
- series.AddElement(float64(previous))
- }
-
- // All those measurements should be ejected by the following
- // loop!
- for range utilities.Iota(1, 20) {
- previous *= 1.10
- series.AddElement(float64(previous))
- }
-
- if islt, maxSeqIncrease := series.AllSequentialIncreasesLessThan(11.0); !islt {
- test.Fatalf(
- "Sequential increases are not always less than 11.0 in wraparound situation (%f v 11.0).",
- maxSeqIncrease,
- )
- }
-}
-
-func Test_CappedSequentialIncreasesAlwaysLessThanWithWraparoundInverse(test *testing.T) {
- series := NewCappedMathematicalSeries[float64](20)
- previous := float64(1.0)
- for range utilities.Iota(1, 20) {
- previous *= 1.15
- series.AddElement(float64(previous))
- }
-
- // *Not* all those measurements should be ejected by the following
- // loop!
- for range utilities.Iota(1, 15) {
- previous *= 1.10
- series.AddElement(float64(previous))
- }
-
- if islt, maxSeqIncrease := series.AllSequentialIncreasesLessThan(11.0); islt {
- test.Fatalf(
- "Sequential increases are (unexpectedly) always less than 11.0 in wraparound situation: %f v 11.0.",
- maxSeqIncrease,
- )
- }
-}
-
-func Test_CappedStandardDeviationCalculation(test *testing.T) {
- expected := 2.93
- series := NewCappedMathematicalSeries[float64](5)
- // 5.7, 1.0, 8.6, 7.4, 2.2
- series.AddElement(5.7)
- series.AddElement(5.7)
- series.AddElement(5.7)
- series.AddElement(5.7)
- series.AddElement(5.7)
- series.AddElement(5.7)
- series.AddElement(5.7)
- series.AddElement(5.7)
- series.AddElement(5.7)
- series.AddElement(1.0)
- series.AddElement(8.6)
- series.AddElement(7.4)
- series.AddElement(2.2)
-
- if _, sd := series.StandardDeviation(); !utilities.ApproximatelyEqual(sd, expected, 0.01) {
- test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd)
- } else {
- test.Logf("Standard deviation calculation result: %v", sd)
- }
-}
-
-func Test_CappedStandardDeviationCalculation2(test *testing.T) {
- expected := 1.41
- series := NewCappedMathematicalSeries[float64](5)
- series.AddElement(8)
- series.AddElement(9)
- series.AddElement(10)
- series.AddElement(11)
- series.AddElement(12)
-
- if _, sd := series.StandardDeviation(); !utilities.ApproximatelyEqual(sd, expected, 0.01) {
- test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd)
- } else {
- test.Logf("Standard deviation calculation result: %v", sd)
- }
-}
-
-func Test_CappedRotatingValues(test *testing.T) {
- series := NewCappedMathematicalSeries[int](5)
-
- series.AddElement(1)
- series.AddElement(2)
- series.AddElement(3)
- series.AddElement(4)
- series.AddElement(5)
-
- series.AddElement(6)
- series.AddElement(7)
-
- if !reflect.DeepEqual([]int{6, 7, 3, 4, 5}, series.Values()) {
- test.Fatalf("Adding values does not properly erase earlier values.")
- }
-}
-
-func Test_CappedLen(test *testing.T) {
- series := NewCappedMathematicalSeries[int](5)
-
- series.AddElement(1)
- series.AddElement(2)
- series.AddElement(3)
- series.AddElement(4)
- series.AddElement(5)
-
- series.AddElement(6)
- series.AddElement(7)
-
- if series.Len() != 5 {
- test.Fatalf("Series size calculations failed.")
- }
-}
-
-func Test_Capped_degenerate_percentile_too_high(test *testing.T) {
- series := NewCappedMathematicalSeries[int](21)
- if series.Percentile(101) != 0 {
- test.Fatalf("Series percentile of 101 failed.")
- }
-}
-
-func Test_Capped_degenerate_percentile_too_low(test *testing.T) {
- series := NewCappedMathematicalSeries[int](21)
- if series.Percentile(-1) != 0 {
- test.Fatalf("Series percentile of -1 failed.")
- }
-}
-
-func Test_Capped90_percentile(test *testing.T) {
- var expected int = 10
- series := NewCappedMathematicalSeries[int](10)
- series.AddElement(10)
- series.AddElement(9)
- series.AddElement(8)
- series.AddElement(7)
- series.AddElement(6)
- series.AddElement(5)
- series.AddElement(4)
- series.AddElement(3)
- series.AddElement(2)
- series.AddElement(1)
-
- if series.Percentile(90) != expected {
- test.Fatalf(
- "Series 90th percentile of 0 ... 10 failed: Expected %v got %v.", expected,
- series.Percentile(90),
- )
- }
-}
-
-func Test_Capped90_percentile_reversed(test *testing.T) {
- series := NewCappedMathematicalSeries[int64](10)
- series.AddElement(1)
- series.AddElement(2)
- series.AddElement(3)
- series.AddElement(4)
- series.AddElement(5)
- series.AddElement(6)
- series.AddElement(7)
- series.AddElement(8)
- series.AddElement(9)
- series.AddElement(10)
-
- if series.Percentile(90) != 10 {
- test.Fatalf(
- "Series 90th percentile of 0 ... 10 failed: Expected 10 got %v.",
- series.Percentile(90),
- )
- }
-}
-
-func Test_Capped50_percentile_jumbled(test *testing.T) {
- var expected int64 = 15
- series := NewCappedMathematicalSeries[int64](10)
- series.AddElement(7)
- series.AddElement(2)
- series.AddElement(15)
- series.AddElement(27)
- series.AddElement(5)
- series.AddElement(52)
- series.AddElement(18)
- series.AddElement(23)
- series.AddElement(11)
- series.AddElement(12)
-
- if series.Percentile(50) != expected {
- test.Fatalf(
- "Series 50 percentile of a jumble of numbers failed: Expected %v got %v.", expected,
- series.Percentile(50),
- )
- }
-}
-
-func Test_CappedDoubleSidedTrimmedMean_jumbled(test *testing.T) {
- expected := 8
- series := NewCappedMathematicalSeries[int64](10)
- series.AddElement(7)
- series.AddElement(2)
- series.AddElement(15)
- series.AddElement(27)
- series.AddElement(5)
- series.AddElement(5)
- series.AddElement(52)
- series.AddElement(18)
- series.AddElement(23)
- series.AddElement(11)
- series.AddElement(12)
-
- trimmed := series.DoubleSidedTrim(10)
-
- if trimmed.Len() != expected {
- test.Fatalf(
- "Capped series is not of the proper size. Expected %v and got %v",
- expected,
- trimmed.Len(),
- )
- }
-
- prev := int64(0)
- for _, v := range trimmed.Values() {
- if !(prev <= v) {
- test.Fatalf("Not sorted: %v is not less than or equal to %v\n", prev, v)
- }
- prev = v
- }
-}
-
-func Test_CappedAverage(test *testing.T) {
- expected := 1.0082230220488836e+08
- series := NewCappedMathematicalSeries[float64](4)
- series.AddElement(9.94747772516195e+07)
- series.AddElement(9.991286984703423e+07)
- series.AddElement(1.0285437111086299e+08)
- series.AddElement(1.0104719061003672e+08)
- if average := series.CalculateAverage(); !utilities.ApproximatelyEqual(average, 0.01, expected) {
- test.Fatalf(
- "Expected: %v; Actual: %v.", average, expected,
- )
- }
-}
diff --git a/networkQuality.go b/networkQuality.go
index 776c0e7..aa8d854 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -33,10 +33,10 @@ import (
"github.com/network-quality/goresponsiveness/direction"
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
- "github.com/network-quality/goresponsiveness/ms"
"github.com/network-quality/goresponsiveness/probe"
"github.com/network-quality/goresponsiveness/qualityattenuation"
"github.com/network-quality/goresponsiveness/rpm"
+ "github.com/network-quality/goresponsiveness/series"
"github.com/network-quality/goresponsiveness/stabilizer"
"github.com/network-quality/goresponsiveness/timeoutat"
"github.com/network-quality/goresponsiveness/utilities"
@@ -471,8 +471,8 @@ func main() {
}
// All tests will accumulate data to these series because it will all matter for RPM calculation!
- selfRtts := ms.NewInfiniteMathematicalSeries[float64]()
- foreignRtts := ms.NewInfiniteMathematicalSeries[float64]()
+ selfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0)
+ foreignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0)
var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil
if *printQualityAttenuation {
@@ -521,8 +521,8 @@ func main() {
if *debugCliFlag {
downloadThroughputStabilizerDebugLevel = debug.Debug
}
- throughputStabilizer := stabilizer.NewStabilizer[float64](
- uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance, 0, "bytes",
+ throughputStabilizer := stabilizer.NewStabilizer[float64, uint64](
+ specParameters.MovingAvgDist, specParameters.StdDevTolerance, 0, "bytes",
downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig)
responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug,
@@ -531,8 +531,8 @@ func main() {
if *debugCliFlag {
responsivenessStabilizerDebugLevel = debug.Debug
}
- responsivenessStabilizer := stabilizer.NewStabilizer[int64](
- uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance,
+ responsivenessStabilizer := stabilizer.NewStabilizer[int64, uint](
+ specParameters.MovingAvgDist, specParameters.StdDevTolerance,
specParameters.TrimmedMeanPct, "milliseconds",
responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig)
@@ -541,32 +541,70 @@ func main() {
lastThroughputRate := float64(0)
lastThroughputOpenConnectionCount := int(0)
+ stabilityCheckTime := time.Now().Add(specParameters.EvalInterval)
+ stabilityCheckTimeChannel := timeoutat.TimeoutAt(
+ operatingCtx,
+ stabilityCheckTime,
+ debugLevel,
+ )
+
lg_timeout:
for !direction.StableThroughput {
select {
case throughputMeasurement := <-lgStabilizationCommunicationChannel:
{
- throughputStabilizer.AddMeasurement(
- throughputMeasurement.Throughput)
+ switch throughputMeasurement.Type {
+ case series.SeriesMessageReserve:
+ {
+ throughputStabilizer.Reserve(throughputMeasurement.Bucket)
+ if *debugCliFlag {
+ fmt.Printf(
+ "%s: Reserving a throughput bucket with id %v.\n",
+ direction.DirectionLabel, throughputMeasurement.Bucket)
+ }
+ }
+ case series.SeriesMessageMeasure:
+ {
+ bucket := throughputMeasurement.Bucket
+ measurement := utilities.GetSome(throughputMeasurement.Measure)
+
+ throughputStabilizer.AddMeasurement(bucket, measurement.Throughput)
+
+ direction.ThroughputDataLogger.LogRecord(measurement)
+ for _, v := range measurement.GranularThroughputDataPoints {
+ v.Direction = "Download"
+ direction.GranularThroughputDataLogger.LogRecord(v)
+ }
+
+ lastThroughputRate = measurement.Throughput
+ lastThroughputOpenConnectionCount = measurement.Connections
+ }
+ }
+ }
+ case <-stabilityCheckTimeChannel:
+ {
+ if *debugCliFlag {
+ fmt.Printf(
+ "%v throughput stability interval is complete.\n", direction.DirectionLabel)
+ }
+ stabilityCheckTime = time.Now().Add(specParameters.EvalInterval)
+ stabilityCheckTimeChannel = timeoutat.TimeoutAt(
+ operatingCtx,
+ stabilityCheckTime,
+ debugLevel,
+ )
+
direction.StableThroughput = throughputStabilizer.IsStable()
if *debugCliFlag {
fmt.Printf(
- "################# %v is instantaneously %s.\n", direction.DirectionLabel,
+ "%v is instantaneously %s.\n", direction.DirectionLabel,
utilities.Conditional(direction.StableThroughput, "stable", "unstable"))
}
- direction.ThroughputDataLogger.LogRecord(throughputMeasurement)
- for i := range throughputMeasurement.GranularThroughputDataPoints {
- datapoint := throughputMeasurement.GranularThroughputDataPoints[i]
- datapoint.Direction = "Download"
- direction.GranularThroughputDataLogger.LogRecord(datapoint)
- }
-
- lastThroughputRate = throughputMeasurement.Throughput
- lastThroughputOpenConnectionCount = throughputMeasurement.Connections
if direction.StableThroughput {
throughputGeneratorCtxCancel()
}
+ throughputStabilizer.Interval()
}
case <-timeoutChannel:
{
@@ -577,7 +615,7 @@ func main() {
if direction.StableThroughput {
if *debugCliFlag {
- fmt.Printf("################# Throughput is stable; beginning responsiveness testing.\n")
+ fmt.Printf("Throughput is stable; beginning responsiveness testing.\n")
}
} else {
fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Adding 15 seconds to calculate speculative RPM results.\n")
@@ -590,8 +628,8 @@ func main() {
)
}
- perDirectionSelfRtts := ms.NewInfiniteMathematicalSeries[float64]()
- perDirectionForeignRtts := ms.NewInfiniteMathematicalSeries[float64]()
+ perDirectionSelfRtts := series.NewWindowSeries[float64, uint](series.Forever, 0)
+ perDirectionForeignRtts := series.NewWindowSeries[float64, uint](series.Forever, 0)
responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber(
proberOperatorCtx,
@@ -611,61 +649,122 @@ func main() {
select {
case probeMeasurement := <-responsivenessStabilizationCommunicationChannel:
{
- foreignDataPoint := probeMeasurement.First
- selfDataPoint := probeMeasurement.Second
+ switch probeMeasurement.Type {
+ case series.SeriesMessageReserve:
+ {
+ bucket := probeMeasurement.Bucket
+ if *debugCliFlag {
+ fmt.Printf(
+ "%s: Reserving a responsiveness bucket with id %v.\n", direction.DirectionLabel, bucket)
+ }
+ responsivenessStabilizer.Reserve(bucket)
+ selfRtts.Reserve(bucket)
+ foreignRtts.Reserve(bucket)
+ perDirectionForeignRtts.Reserve(bucket)
+ perDirectionSelfRtts.Reserve(bucket)
+ }
+ case series.SeriesMessageMeasure:
+ {
+ bucket := probeMeasurement.Bucket
+ measurement := utilities.GetSome(probeMeasurement.Measure)
+ foreignDataPoint := measurement.Foreign
+ selfDataPoint := measurement.Self
- responsivenessStabilizer.AddMeasurement(
- (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds())
+ if *debugCliFlag {
+ fmt.Printf(
+ "%s: Filling a responsiveness bucket with id %v with value %v.\n",
+ direction.DirectionLabel, bucket, measurement)
+ }
+ responsivenessStabilizer.AddMeasurement(bucket,
+ (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds())
- // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy
- // is *actually* important, but it can't hurt?
- direction.StableResponsiveness = responsivenessStabilizer.IsStable()
+ if err := selfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil {
+ fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (selfRtts)\n", bucket)
+ }
+ if perDirectionSelfRtts.Fill(bucket, selfDataPoint.Duration.Seconds()); err != nil {
+ fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionSelfRtts)\n", bucket)
+ }
- if *debugCliFlag {
- fmt.Printf(
- "################# Responsiveness is instantaneously %s.\n",
- utilities.Conditional(direction.StableResponsiveness, "stable", "unstable"))
- }
- // There may be more than one round trip accumulated together. If that is the case,
- // we will blow them apart in to three separate measurements and each one will just
- // be 1 / measurement.RoundTripCount of the total length.
- for range utilities.Iota(0, int(foreignDataPoint.RoundTripCount)) {
- foreignRtts.AddElement(foreignDataPoint.Duration.Seconds() /
- float64(foreignDataPoint.RoundTripCount))
- perDirectionForeignRtts.AddElement(foreignDataPoint.Duration.Seconds() /
- float64(foreignDataPoint.RoundTripCount))
- }
- selfRtts.AddElement(selfDataPoint.Duration.Seconds())
- perDirectionSelfRtts.AddElement(selfDataPoint.Duration.Seconds())
+ if foreignRtts.Fill(bucket, foreignDataPoint.Duration.Seconds()); err != nil {
+ fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (foreignRtts)\n", bucket)
+ }
- if selfRttsQualityAttenuation != nil {
- selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds())
- }
+ if perDirectionForeignRtts.Fill(bucket,
+ foreignDataPoint.Duration.Seconds()); err != nil {
+ fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (perDirectionForeignRtts)\n", bucket)
+ }
+
+ if selfRttsQualityAttenuation != nil {
+ selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds())
+ }
- direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint)
- direction.SelfProbeDataLogger.LogRecord(*selfDataPoint)
+ direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint)
+ direction.SelfProbeDataLogger.LogRecord(*selfDataPoint)
+
+ }
+ }
}
case throughputMeasurement := <-lgStabilizationCommunicationChannel:
{
- if *debugCliFlag {
- fmt.Printf("Adding a throughput measurement.\n")
- }
- // There may be more than one round trip accumulated together. If that is the case,
- direction.ThroughputDataLogger.LogRecord(throughputMeasurement)
- for i := range throughputMeasurement.GranularThroughputDataPoints {
- datapoint := throughputMeasurement.GranularThroughputDataPoints[i]
- datapoint.Direction = direction.DirectionLabel
- direction.GranularThroughputDataLogger.LogRecord(datapoint)
- }
+ switch throughputMeasurement.Type {
+ case series.SeriesMessageReserve:
+ {
+ // We are no longer tracking stability, so reservation messages are useless!
+ if *debugCliFlag {
+ fmt.Printf(
+ "%s: Discarding a throughput bucket with id %v when ascertaining responsiveness.\n",
+ direction.DirectionLabel, throughputMeasurement.Bucket)
+ }
+ }
+ case series.SeriesMessageMeasure:
+ {
+ measurement := utilities.GetSome(throughputMeasurement.Measure)
- lastThroughputRate = throughputMeasurement.Throughput
- lastThroughputOpenConnectionCount = throughputMeasurement.Connections
+ if *debugCliFlag {
+ fmt.Printf("Adding a throughput measurement (while ascertaining responsiveness).\n")
+ }
+ // There may be more than one round trip accumulated together. If that is the case,
+ direction.ThroughputDataLogger.LogRecord(measurement)
+ for _, v := range measurement.GranularThroughputDataPoints {
+ v.Direction = direction.DirectionLabel
+ direction.GranularThroughputDataLogger.LogRecord(v)
+ }
+ lastThroughputRate = measurement.Throughput
+ lastThroughputOpenConnectionCount = measurement.Connections
+ }
+ }
}
case <-timeoutChannel:
{
break responsiveness_timeout
}
+ case <-stabilityCheckTimeChannel:
+ {
+ if *debugCliFlag {
+ fmt.Printf(
+ "%v responsiveness stability interval is complete.\n", direction.DirectionLabel)
+ }
+
+ stabilityCheckTime = time.Now().Add(specParameters.EvalInterval)
+ stabilityCheckTimeChannel = timeoutat.TimeoutAt(
+ operatingCtx,
+ stabilityCheckTime,
+ debugLevel,
+ )
+
+ // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy
+ // is *actually* important, but it can't hurt?
+ direction.StableResponsiveness = responsivenessStabilizer.IsStable()
+
+ if *debugCliFlag {
+ fmt.Printf(
+ "Responsiveness is instantaneously %s.\n",
+ utilities.Conditional(direction.StableResponsiveness, "stable", "unstable"))
+ }
+
+ responsivenessStabilizer.Interval()
+ }
}
}
@@ -753,7 +852,7 @@ func main() {
}
fmt.Printf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, directionResult.PNRpm, 90)
- fmt.Printf("%s RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel,
+ fmt.Printf("%s RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel,
directionResult.MeanRpm, specParameters.TrimmedMeanPct)
if len(*prometheusStatsFilename) > 0 {
@@ -808,7 +907,7 @@ func main() {
}
fmt.Printf("Final RPM: %5.0f (P%d)\n", result.PNRpm, 90)
- fmt.Printf("Final RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n",
+ fmt.Printf("Final RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n",
result.MeanRpm, specParameters.TrimmedMeanPct)
// Stop the world.
diff --git a/rpm/calculations.go b/rpm/calculations.go
index 5387aa7..4b61413 100644
--- a/rpm/calculations.go
+++ b/rpm/calculations.go
@@ -17,40 +17,60 @@ package rpm
import (
"fmt"
- "github.com/network-quality/goresponsiveness/ms"
+ "github.com/network-quality/goresponsiveness/series"
+ "github.com/network-quality/goresponsiveness/utilities"
+ "golang.org/x/exp/constraints"
)
-type Rpm struct {
+type Rpm[Data utilities.Number] struct {
SelfRttsTotal int
ForeignRttsTotal int
SelfRttsTrimmed int
ForeignRttsTrimmed int
- SelfProbeRttPN float64
- ForeignProbeRttPN float64
+ SelfProbeRttPN Data
+ ForeignProbeRttPN Data
SelfProbeRttMean float64
ForeignProbeRttMean float64
PNRpm float64
MeanRpm float64
}
-func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.MathematicalSeries[float64], trimming uint, percentile int) Rpm {
- // First, let's do a double-sided trim of the top/bottom 10% of our measurements.
- selfRttsTotalCount := selfRtts.Len()
- foreignRttsTotalCount := foreignRtts.Len()
+func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered](
+ selfRtts series.WindowSeries[Data, Bucket], aggregatedForeignRtts series.WindowSeries[Data, Bucket], trimming uint, percentile int,
+) Rpm[Data] {
+ // There may be more than one round trip accumulated together. If that is the case,
+ // we will blow them apart in to three separate measurements and each one will just
+ // be 1 / 3.
+ foreignRtts := series.NewWindowSeries[Data, int](series.Forever, 0)
+ foreignBuckets := 0
+ for _, v := range aggregatedForeignRtts.GetValues() {
+ if utilities.IsSome(v) {
+ v := utilities.GetSome(v)
+ foreignRtts.Reserve(foreignBuckets)
+ foreignRtts.Reserve(foreignBuckets + 1)
+ foreignRtts.Reserve(foreignBuckets + 2)
+ foreignRtts.Fill(foreignBuckets, v/3)
+ foreignRtts.Fill(foreignBuckets+1, v/3)
+ foreignRtts.Fill(foreignBuckets+2, v/3)
+ foreignBuckets += 3
+ }
+ }
- selfRttsTrimmed := selfRtts.DoubleSidedTrim(trimming)
- foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(trimming)
+ // First, let's do a double-sided trim of the top/bottom 10% of our measurements.
+ selfRttsTotalCount, _ := selfRtts.Count()
+ foreignRttsTotalCount, _ := foreignRtts.Count()
- selfRttsTrimmedCount := selfRttsTrimmed.Len()
- foreignRttsTrimmedCount := foreignRttsTrimmed.Len()
+ _, selfProbeRoundTripTimeMean, selfRttsTrimmed :=
+ series.TrimmedMean(selfRtts, int(trimming))
+ _, foreignProbeRoundTripTimeMean, foreignRttsTrimmed :=
+ series.TrimmedMean(foreignRtts, int(trimming))
- // Then, let's take the mean of those ...
- selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage()
- foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage()
+ selfRttsTrimmedCount := len(selfRttsTrimmed)
+ foreignRttsTrimmedCount := len(foreignRttsTrimmed)
// Second, let's do the P90 calculations.
- selfProbeRoundTripTimePN := selfRtts.Percentile(percentile)
- foreignProbeRoundTripTimePN := foreignRtts.Percentile(percentile)
+ _, selfProbeRoundTripTimePN := series.Percentile(selfRtts, percentile)
+ _, foreignProbeRoundTripTimePN := series.Percentile(foreignRtts, percentile)
// Note: The specification indicates that we want to calculate the foreign probes as such:
// 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign
@@ -62,7 +82,7 @@ func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.Mathem
pnRpm := 60.0 / (float64(selfProbeRoundTripTimePN+foreignProbeRoundTripTimePN) / 2.0)
meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0)
- return Rpm{
+ return Rpm[Data]{
SelfRttsTotal: selfRttsTotalCount, ForeignRttsTotal: foreignRttsTotalCount,
SelfRttsTrimmed: selfRttsTrimmedCount, ForeignRttsTrimmed: foreignRttsTrimmedCount,
SelfProbeRttPN: selfProbeRoundTripTimePN, ForeignProbeRttPN: foreignProbeRoundTripTimePN,
@@ -71,14 +91,14 @@ func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.Mathem
}
}
-func (rpm *Rpm) ToString() string {
+func (rpm *Rpm[Data]) ToString() string {
return fmt.Sprintf(
`Total Self Probes: %d
Total Foreign Probes: %d
Trimmed Self Probes Count: %d
Trimmed Foreign Probes Count: %d
-P90 Self RTT: %f
-P90 Foreign RTT: %f
+P90 Self RTT: %v
+P90 Foreign RTT: %v
Trimmed Mean Self RTT: %f
Trimmed Mean Foreign RTT: %f
`,
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 23ed9f4..b1b8ba4 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -29,6 +29,7 @@ import (
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
"github.com/network-quality/goresponsiveness/probe"
+ "github.com/network-quality/goresponsiveness/series"
"github.com/network-quality/goresponsiveness/utilities"
)
@@ -84,6 +85,11 @@ type SelfDataCollectionResult struct {
LoggingContinuation func()
}
+type ResponsivenessProbeResult struct {
+ Foreign *probe.ProbeDataPoint
+ Self *probe.ProbeDataPoint
+}
+
func ResponsivenessProber(
proberCtx context.Context,
networkActivityCtx context.Context,
@@ -95,7 +101,7 @@ func ResponsivenessProber(
keyLogger io.Writer,
captureExtendedStats bool,
debugging *debug.DebugWithPrefix,
-) (dataPoints chan utilities.Pair[*probe.ProbeDataPoint]) {
+) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, uint]) {
if debug.IsDebug(debugging.Level) {
fmt.Printf(
"(%s) Starting to collect responsiveness information at an interval of %v!\n",
@@ -106,7 +112,7 @@ func ResponsivenessProber(
// Make a channel to send back all the generated data points
// when we are probing.
- dataPoints = make(chan utilities.Pair[*probe.ProbeDataPoint])
+ dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, uint])
go func() {
wg := sync.WaitGroup{}
@@ -141,6 +147,11 @@ func ResponsivenessProber(
)
}
+ dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{
+ Type: series.SeriesMessageReserve, Bucket: probeCount,
+ Measure: utilities.None[ResponsivenessProbeResult](),
+ }
+
// The presence of a custom TLSClientConfig in a *generic* `transport`
// means that go will default to HTTP/1.1 and cowardly avoid HTTP/2:
// https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278
@@ -262,8 +273,13 @@ func ResponsivenessProber(
dataPointsLock.Lock()
// Now we have our four data points (three in the foreign probe data point and one in the self probe data point)
if dataPoints != nil {
- dataPoints <- utilities.Pair[*probe.ProbeDataPoint]{
- First: foreignProbeDataPoint, Second: selfProbeDataPoint,
+ measurement := ResponsivenessProbeResult{
+ Foreign: foreignProbeDataPoint, Self: selfProbeDataPoint,
+ }
+
+ dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{
+ Type: series.SeriesMessageMeasure, Bucket: probeCount,
+ Measure: utilities.Some[ResponsivenessProbeResult](measurement),
}
}
dataPointsLock.Unlock()
@@ -300,8 +316,8 @@ func LoadGenerator(
mnp int,
captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections?
debugging *debug.DebugWithPrefix, // How can we forget debugging?
-) (stabilizerCommunicationChannel chan ThroughputDataPoint) { // Send back all the instantaneous throughputs that we generate.
- stabilizerCommunicationChannel = make(chan ThroughputDataPoint)
+) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, uint64]) { // Send back all the instantaneous throughputs that we generate.
+ seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, uint64])
go func() {
flowsCreated := uint64(0)
@@ -454,7 +470,14 @@ func LoadGenerator(
len(*loadGeneratingConnectionsCollection.LGCs),
granularThroughputDatapoints,
}
- stabilizerCommunicationChannel <- throughputDataPoint
+
+ seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{
+ Type: series.SeriesMessageReserve, Bucket: currentInterval,
+ }
+ seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{
+ Type: series.SeriesMessageMeasure, Bucket: currentInterval,
+ Measure: utilities.Some[ThroughputDataPoint](throughputDataPoint),
+ }
if generateLoadCtx.Err() != nil {
// No need to add additional data points because the controller told us
diff --git a/series/message.go b/series/message.go
new file mode 100644
index 0000000..fa0b096
--- /dev/null
+++ b/series/message.go
@@ -0,0 +1,19 @@
+package series
+
+import (
+ "github.com/network-quality/goresponsiveness/utilities"
+ "golang.org/x/exp/constraints"
+)
+
+type SeriesMessageType int
+
+const (
+ SeriesMessageReserve SeriesMessageType = iota
+ SeriesMessageMeasure SeriesMessageType = iota
+)
+
+type SeriesMessage[Data any, BucketType constraints.Ordered] struct {
+ Type SeriesMessageType
+ Bucket BucketType
+ Measure utilities.Optional[Data]
+}
diff --git a/series/series.go b/series/series.go
new file mode 100644
index 0000000..0084007
--- /dev/null
+++ b/series/series.go
@@ -0,0 +1,280 @@
+/*
+ * This file is part of Go Responsiveness.
+ *
+ * Go Responsiveness is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software Foundation,
+ * either version 2 of the License, or (at your option) any later version.
+ * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
+ */
+package series
+
+import (
+ "fmt"
+
+ "github.com/network-quality/goresponsiveness/utilities"
+ "golang.org/x/exp/constraints"
+)
+
+type WindowSeriesDuration int
+
+const (
+ Forever WindowSeriesDuration = iota
+ WindowOnly WindowSeriesDuration = iota
+)
+
+type WindowSeries[Data any, Bucket constraints.Ordered] interface {
+ fmt.Stringer
+
+ Reserve(b Bucket) error
+ Fill(b Bucket, d Data) error
+
+ Count() (some int, none int)
+
+ ForEach(func(Bucket, *utilities.Optional[Data]))
+
+ GetValues() []utilities.Optional[Data]
+ Complete() bool
+ GetType() WindowSeriesDuration
+}
+
+type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct {
+ windowSize int
+ data []utilities.Pair[Bucket, utilities.Optional[Data]]
+ latestIndex int
+ empty bool
+}
+
+/*
+ * Beginning of WindowSeries interface methods.
+ */
+
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error {
+ if !wsi.empty && b <= wsi.data[wsi.latestIndex].First {
+ return fmt.Errorf("reserving must be monotonically increasing")
+ }
+
+ if wsi.empty {
+ /* Special case if we are empty: The latestIndex is where we want this value to go! */
+ wsi.data[wsi.latestIndex] = utilities.Pair[Bucket, utilities.Optional[Data]]{
+ First: b, Second: utilities.None[Data](),
+ }
+ } else {
+ /* Otherwise, bump ourselves forward and place the new reservation there. */
+ wsi.latestIndex = wsi.nextIndex(wsi.latestIndex)
+ wsi.data[wsi.latestIndex] = utilities.Pair[Bucket, utilities.Optional[Data]]{
+ First: b, Second: utilities.None[Data](),
+ }
+ }
+ wsi.empty = false
+ return nil
+}
+
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) error {
+ iterator := wsi.latestIndex
+ for {
+ if wsi.data[iterator].First == b {
+ wsi.data[iterator].Second = utilities.Some[Data](d)
+ return nil
+ }
+ iterator = wsi.nextIndex(iterator)
+ if iterator == wsi.latestIndex {
+ break
+ }
+ }
+ return fmt.Errorf("attempting to fill a bucket that does not exist")
+}
+
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Count() (some int, none int) {
+ some = 0
+ none = 0
+ for _, v := range wsi.data {
+ if utilities.IsSome[Data](v.Second) {
+ some++
+ } else {
+ none++
+ }
+ }
+ return
+}
+
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Complete() bool {
+ for _, v := range wsi.data {
+ if utilities.IsNone(v.Second) {
+ return false
+ }
+ }
+ return true
+}
+
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) nextIndex(currentIndex int) int {
+ return (currentIndex + 1) % wsi.windowSize
+}
+
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) previousIndex(currentIndex int) int {
+ nextIndex := currentIndex - 1
+ if nextIndex < 0 {
+ nextIndex += wsi.windowSize
+ }
+ return nextIndex
+}
+
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optional[Data] {
+ result := make([]utilities.Optional[Data], wsi.windowSize)
+
+ iterator := wsi.latestIndex
+ parallelIterator := 0
+ for {
+ result[parallelIterator] = wsi.data[iterator].Second
+ iterator = wsi.previousIndex(iterator)
+ parallelIterator++
+ if iterator == wsi.latestIndex {
+ break
+ }
+ }
+ return result
+}
+
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] {
+ return wsi.toArray()
+}
+
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetType() WindowSeriesDuration {
+ return WindowOnly
+}
+
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ForEach(eacher func(b Bucket, d *utilities.Optional[Data])) {
+ for _, v := range wsi.data {
+ eacher(v.First, &v.Second)
+ }
+}
+
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string {
+ result := fmt.Sprintf("Window series (window (%d) only, latest index: %v): ", wsi.windowSize, wsi.latestIndex)
+ for _, v := range wsi.data {
+ valueString := "None"
+ if utilities.IsSome[Data](v.Second) {
+ valueString = fmt.Sprintf("%v", utilities.GetSome[Data](v.Second))
+ }
+ result += fmt.Sprintf("%v: %v; ", v.First, valueString)
+ }
+ return result
+}
+
+func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered](
+ windowSize int,
+) *windowSeriesWindowOnlyImpl[Data, Bucket] {
+ result := windowSeriesWindowOnlyImpl[Data, Bucket]{windowSize: windowSize, latestIndex: 0, empty: true}
+
+ result.data = make([]utilities.Pair[Bucket, utilities.Optional[Data]], windowSize)
+
+ return &result
+}
+
+/*
+ * End of WindowSeries interface methods.
+ */
+
+type windowSeriesForeverImpl[Data any, Bucket constraints.Ordered] struct {
+ data []utilities.Pair[Bucket, utilities.Optional[Data]]
+ empty bool
+}
+
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error {
+ if !wsi.empty && b <= wsi.data[len(wsi.data)-1].First {
+ return fmt.Errorf("reserving must be monotonically increasing")
+ }
+
+ wsi.empty = false
+ wsi.data = append(wsi.data, utilities.Pair[Bucket, utilities.Optional[Data]]{First: b, Second: utilities.None[Data]()})
+ return nil
+}
+
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error {
+ for i := range wsi.data {
+ if wsi.data[i].First == b {
+ wsi.data[i].Second = utilities.Some[Data](d)
+ return nil
+ }
+ }
+ return fmt.Errorf("attempting to fill a bucket that does not exist")
+}
+
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] {
+ result := make([]utilities.Optional[Data], len(wsi.data))
+
+ for i, v := range utilities.Reverse(wsi.data) {
+ result[i] = v.Second
+ }
+
+ return result
+}
+
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) Count() (some int, none int) {
+ some = 0
+ none = 0
+ for _, v := range wsi.data {
+ if utilities.IsSome[Data](v.Second) {
+ some++
+ } else {
+ none++
+ }
+ }
+ return
+}
+
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) Complete() bool {
+ for _, v := range wsi.data {
+ if utilities.IsNone(v.Second) {
+ return false
+ }
+ }
+ return true
+}
+
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetType() WindowSeriesDuration {
+ return Forever
+}
+
+func newWindowSeriesForeverImpl[Data any, Bucket constraints.Ordered]() *windowSeriesForeverImpl[Data, Bucket] {
+ result := windowSeriesForeverImpl[Data, Bucket]{empty: true}
+
+ result.data = nil
+
+ return &result
+}
+
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) ForEach(eacher func(b Bucket, d *utilities.Optional[Data])) {
+ for _, v := range wsi.data {
+ eacher(v.First, &v.Second)
+ }
+}
+
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string {
+ result := "Window series (forever): "
+ for _, v := range wsi.data {
+ valueString := "None"
+ if utilities.IsSome[Data](v.Second) {
+ valueString = fmt.Sprintf("%v", utilities.GetSome[Data](v.Second))
+ }
+ result += fmt.Sprintf("%v: %v; ", v.First, valueString)
+ }
+ return result
+}
+
+/*
+ * End of WindowSeries interface methods.
+ */
+
+func NewWindowSeries[Data any, Bucket constraints.Ordered](tipe WindowSeriesDuration, windowSize int) WindowSeries[Data, Bucket] {
+ if tipe == WindowOnly {
+ return newWindowSeriesWindowOnlyImpl[Data, Bucket](windowSize)
+ } else if tipe == Forever {
+ return newWindowSeriesForeverImpl[Data, Bucket]()
+ }
+ panic("")
+}
diff --git a/series/series_test.go b/series/series_test.go
new file mode 100644
index 0000000..5d2ab57
--- /dev/null
+++ b/series/series_test.go
@@ -0,0 +1,846 @@
+/*
+ * This file is part of Go Responsiveness.
+ *
+ * Go Responsiveness is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software Foundation,
+ * either version 2 of the License, or (at your option) any later version.
+ * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
+ */
+package series
+
+import (
+ "reflect"
+ "testing"
+
+ "github.com/network-quality/goresponsiveness/utilities"
+)
+
+func TestNextIndex(t *testing.T) {
+ wsi := newWindowSeriesWindowOnlyImpl[int, int](4)
+
+ idx := wsi.nextIndex(wsi.latestIndex)
+ if idx != 1 {
+ t.Fatalf("nextIndex is wrong (1)")
+ }
+ wsi.latestIndex = idx
+
+ idx = wsi.nextIndex(wsi.latestIndex)
+ if idx != 2 {
+ t.Fatalf("nextIndex is wrong (2)")
+ }
+ wsi.latestIndex = idx
+
+ idx = wsi.nextIndex(wsi.latestIndex)
+ if idx != 3 {
+ t.Fatalf("nextIndex is wrong (3)")
+ }
+ wsi.latestIndex = idx
+
+ idx = wsi.nextIndex(wsi.latestIndex)
+ if idx != 0 {
+ t.Fatalf("nextIndex is wrong (0)")
+ }
+ wsi.latestIndex = idx
+
+ idx = wsi.nextIndex(wsi.latestIndex)
+ if idx != 1 {
+ t.Fatalf("nextIndex is wrong (1)")
+ }
+ wsi.latestIndex = idx
+}
+
+func TestSimpleWindowComplete(t *testing.T) {
+ wsi := newWindowSeriesWindowOnlyImpl[int, int](4)
+ if wsi.Complete() {
+ t.Fatalf("Window should not be complete.")
+ }
+ wsfImpl := newWindowSeriesForeverImpl[int, int]()
+ wsfImpl.Reserve(1)
+ if wsfImpl.Complete() {
+ t.Fatalf("Window should not be complete.")
+ }
+}
+
+func TestSimpleReserve(t *testing.T) {
+ wswoImpl := newWindowSeriesWindowOnlyImpl[int, int](4)
+ result := wswoImpl.Reserve(0)
+ if result != nil {
+ t.Fatalf("Reserving 1 should be a-ok!")
+ }
+ wsfImpl := newWindowSeriesForeverImpl[int, int]()
+ result = wsfImpl.Reserve(0)
+ if result != nil {
+ t.Fatalf("Reserving 1 should be a-ok!")
+ }
+}
+
+func Test_ForeverValues(test *testing.T) {
+ series := newWindowSeriesForeverImpl[float64, int]()
+ shouldMatch := make([]utilities.Optional[float64], 0)
+ previous := float64(1.0)
+ for i := range utilities.Iota(1, 81) {
+ previous *= 1.059
+ series.Reserve(i)
+ series.Fill(i, float64(previous))
+ shouldMatch = append(shouldMatch, utilities.Some[float64](previous))
+ }
+
+ if !reflect.DeepEqual(utilities.Reverse(shouldMatch), series.GetValues()) {
+ test.Fatalf("Values() on infinite mathematical series does not work.")
+ }
+}
+
+func Test_WindowOnlySequentialIncreasesAlwaysLessThan(test *testing.T) {
+ series := newWindowSeriesWindowOnlyImpl[float64, int](10)
+ previous := float64(1.0)
+ for i := range utilities.Iota(1, 11) {
+ previous *= 1.5
+ series.Reserve(i)
+ series.Fill(i, float64(previous))
+ }
+ if complete, islt, maxSeqIncrease := AllSequentialIncreasesLessThan[float64, int](series,
+ 100); !complete || maxSeqIncrease != 50.0 || !islt {
+ test.Fatalf(
+ "(Window Only) Sequential increases are not always less than 100 (%v, %v, %f ).",
+ complete, islt, maxSeqIncrease,
+ )
+ }
+}
+
+func Test_WindowOnlyTooFewInstantsSequentialIncreasesLessThanAlwaysFalse(test *testing.T) {
+ series := newWindowSeriesWindowOnlyImpl[float64, int](500)
+ series.Reserve(1)
+ series.Fill(1, 0.0)
+ if complete, islt, _ := AllSequentialIncreasesLessThan[float64, int](series, 0.0); complete || islt {
+ test.Fatalf(
+ "(Window Only) 0 elements in a series should always yield false when asking if sequential increases are less than a value.",
+ )
+ }
+}
+
+func Test_Forever_Complete(test *testing.T) {
+ series := newWindowSeriesForeverImpl[int, int]()
+ series.Reserve(1)
+ series.Fill(1, 10)
+ if !series.Complete() {
+ test.Fatalf("(infinite) Series with one element and a window size of 1 is not complete.")
+ }
+}
+
+func Test_Forever_CompleteN(test *testing.T) {
+ series := newWindowSeriesWindowOnlyImpl[float64, int](10)
+ previous := float64(1.0)
+ for i := range utilities.Iota(1, 11) {
+ previous *= 1.5
+ series.Reserve(i)
+ series.Fill(i, float64(previous))
+ }
+ if !series.Complete() {
+ test.Fatalf("(infinite) Series with one element and a window size of 2 is complete.")
+ }
+}
+
+func Test_Forever_degenerate_percentile_too_high(test *testing.T) {
+ series := newWindowSeriesForeverImpl[int, int]()
+ if complete, result := Percentile[int, int](series, 101); !complete || result != 0.0 {
+ test.Fatalf("(infinite) Series percentile of 101 failed.")
+ }
+}
+
+func Test_Forever_degenerate_percentile_too_low(test *testing.T) {
+ series := newWindowSeriesForeverImpl[int, int]()
+ if complete, result := Percentile[int, int](series, -1); !complete || result != 0.0 {
+ test.Fatalf("(infinite) Series percentile of -1 failed.")
+ }
+}
+
+///////////
+
+func Test_Forever90_percentile(test *testing.T) {
+ var expected int = 10
+ series := newWindowSeriesForeverImpl[int, int]()
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+ series.Reserve(6)
+ series.Reserve(7)
+ series.Reserve(8)
+ series.Reserve(9)
+ series.Reserve(10)
+
+ series.Fill(10, 10)
+ series.Fill(9, 9)
+ series.Fill(8, 8)
+ series.Fill(7, 7)
+ series.Fill(6, 6)
+ series.Fill(5, 5)
+ series.Fill(4, 4)
+ series.Fill(3, 3)
+ series.Fill(2, 2)
+ series.Fill(1, 1)
+
+ if complete, result := Percentile[int, int](series, 90); !complete || result != expected {
+ test.Fatalf(
+ "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result)
+ }
+}
+
+func Test_Forever90_WindowOnly_percentile(test *testing.T) {
+ var expected int = 10
+ series := newWindowSeriesForeverImpl[int, int]()
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+ series.Reserve(6)
+ series.Reserve(7)
+ series.Reserve(8)
+ series.Reserve(9)
+ series.Reserve(10)
+
+ series.Fill(10, 10)
+ series.Fill(9, 9)
+ series.Fill(8, 8)
+ series.Fill(7, 7)
+ series.Fill(6, 6)
+ series.Fill(5, 5)
+ series.Fill(4, 4)
+ series.Fill(3, 3)
+ series.Fill(2, 2)
+ series.Fill(1, 1)
+
+ if complete, result := Percentile[int, int](series, 90); !complete || result != expected {
+ test.Fatalf(
+ "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result)
+ }
+}
+
+func Test_Forever90_percentile_reversed(test *testing.T) {
+ var expected int = 10
+ series := newWindowSeriesForeverImpl[int, int]()
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+ series.Reserve(6)
+ series.Reserve(7)
+ series.Reserve(8)
+ series.Reserve(9)
+ series.Reserve(10)
+
+ series.Fill(10, 1)
+ series.Fill(9, 2)
+ series.Fill(8, 3)
+ series.Fill(7, 4)
+ series.Fill(6, 5)
+ series.Fill(5, 6)
+ series.Fill(4, 7)
+ series.Fill(3, 8)
+ series.Fill(2, 9)
+ series.Fill(1, 10)
+
+ if complete, result := Percentile[int, int](series, 90); !complete || result != expected {
+ test.Fatalf(
+ "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result)
+ }
+}
+
+func Test_Forever50_percentile_jumbled(test *testing.T) {
+ var expected int64 = 15
+ series := newWindowSeriesForeverImpl[int64, int]()
+
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+ series.Reserve(6)
+ series.Reserve(7)
+ series.Reserve(8)
+ series.Reserve(9)
+ series.Reserve(10)
+
+ series.Fill(1, 7)
+ series.Fill(2, 2)
+ series.Fill(3, 15)
+ series.Fill(4, 27)
+ series.Fill(5, 5)
+ series.Fill(6, 52)
+ series.Fill(7, 18)
+ series.Fill(8, 23)
+ series.Fill(9, 11)
+ series.Fill(10, 12)
+
+ if complete, result := Percentile[int64, int](series, 50); !complete || result != expected {
+ test.Fatalf(
+ "Series 50 percentile of a jumble of numbers failed: (Complete: %v) Expected %v got %v.", complete, expected, result)
+ }
+}
+
+func Test_Forever90_partial_percentile(test *testing.T) {
+ var expected int = 10
+ series := newWindowSeriesForeverImpl[int, int]()
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+ series.Reserve(6)
+ series.Reserve(7)
+ series.Reserve(8)
+ series.Reserve(9)
+ series.Reserve(10)
+
+ series.Fill(10, 10)
+ series.Fill(9, 9)
+ series.Fill(8, 8)
+ series.Fill(7, 7)
+ series.Fill(6, 6)
+ series.Fill(5, 5)
+ series.Fill(4, 4)
+ series.Fill(3, 3)
+ series.Fill(2, 2)
+ series.Fill(1, 1)
+
+ if complete, result := Percentile[int, int](series, 90); !complete || result != expected {
+ test.Fatalf(
+ "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result)
+ }
+}
+
+func Test_Forever90_partial_percentile_reversed(test *testing.T) {
+ var expected int = 10
+ series := newWindowSeriesForeverImpl[int, int]()
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+ series.Reserve(6)
+ series.Reserve(7)
+ series.Reserve(8)
+ series.Reserve(9)
+ series.Reserve(10)
+
+ series.Fill(10, 1)
+ series.Fill(9, 2)
+ series.Fill(8, 3)
+ series.Fill(7, 4)
+ series.Fill(6, 5)
+ series.Fill(5, 6)
+ series.Fill(4, 7)
+ series.Fill(3, 8)
+ series.Fill(2, 9)
+ series.Fill(1, 10)
+
+ if complete, result := Percentile[int, int](series, 90); !complete || result != expected {
+ test.Fatalf(
+ "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result)
+ }
+}
+
+func Test_Forever50_partial_percentile_jumbled(test *testing.T) {
+ var expected int64 = 15
+ series := newWindowSeriesForeverImpl[int64, int]()
+
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+ series.Reserve(6)
+ series.Reserve(7)
+ series.Reserve(8)
+ series.Reserve(9)
+ series.Reserve(10)
+
+ series.Fill(1, 7)
+ series.Fill(2, 2)
+ series.Fill(3, 15)
+ series.Fill(4, 27)
+ series.Fill(5, 5)
+ series.Fill(6, 52)
+ series.Fill(7, 18)
+ series.Fill(8, 23)
+ series.Fill(9, 11)
+ series.Fill(10, 12)
+
+ if complete, result := Percentile[int64, int](series, 50); !complete || result != expected {
+ test.Fatalf(
+ "Series 50 percentile of a jumble of numbers failed: (Complete: %v) Expected %v got %v.", complete, expected, result)
+ }
+}
+
+///////////////////////
+
+func Test_WindowOnlySequentialIncreasesAlwaysLessThanWithWraparound(test *testing.T) {
+ series := newWindowSeriesWindowOnlyImpl[float64, int](20)
+ previous := float64(1.0)
+ for i := range utilities.Iota(1, 21) {
+ previous *= 1.15
+ series.Reserve(i)
+ series.Fill(1, float64(previous))
+ }
+
+ // All those measurements should be ejected by the following
+ // loop!
+ for i := range utilities.Iota(1, 21) {
+ previous *= 1.10
+ series.Reserve(i + 20)
+ series.Fill(i+20, float64(previous))
+ }
+
+ if complete, islt, maxSeqIncrease := AllSequentialIncreasesLessThan[float64, int](series,
+ 11.0); !complete || !islt || !utilities.ApproximatelyEqual(maxSeqIncrease, 10, 0.1) {
+ test.Fatalf(
+ "Sequential increases are not always less than 11.0 in wraparound situation (%f v 11.0).",
+ maxSeqIncrease,
+ )
+ }
+}
+
+func Test_WindowOnlySequentialIncreasesAlwaysLessThanWithWraparoundInverse(test *testing.T) {
+ series := newWindowSeriesWindowOnlyImpl[float64, int](20)
+ previous := float64(1.0)
+ i := 0
+ for i = range utilities.Iota(1, 21) {
+ previous *= 1.15
+ series.Reserve(i)
+ series.Fill(i, float64(previous))
+ }
+
+ // *Not* all those measurements should be ejected by the following
+ // loop!
+ for j := range utilities.Iota(1, 16) {
+ previous *= 1.10
+ series.Reserve(i + j)
+ series.Fill(i+j, float64(previous))
+ }
+
+ if complete, islt, maxSeqIncrease := AllSequentialIncreasesLessThan[float64, int](series, 11.0); complete == false || islt {
+ test.Fatalf(
+ "Sequential increases are (unexpectedly) always less than 11.0 in wraparound situation: %f v 11.0.",
+ maxSeqIncrease,
+ )
+ }
+}
+
+func Test_WindowOnlyStandardDeviationIncompleteCalculation(test *testing.T) {
+ expected := 2.93
+ series := newWindowSeriesWindowOnlyImpl[float64, int](6)
+ // 5.7, 1.0, 8.6, 7.4, 2.2
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+
+ series.Fill(1, 5.7)
+ series.Fill(2, 1.0)
+ series.Fill(3, 8.6)
+ series.Fill(4, 7.4)
+ series.Fill(5, 2.2)
+
+ if complete, sd := SeriesStandardDeviation[float64, int](series); complete != false ||
+ !utilities.ApproximatelyEqual(sd, expected, 0.01) {
+ test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd)
+ } else {
+ test.Logf("Standard deviation calculation result: %v", sd)
+ }
+}
+
+func Test_WindowOnlyStandardDeviationCalculation(test *testing.T) {
+ expected := 2.93
+ series := newWindowSeriesWindowOnlyImpl[float64, int](5)
+ // 5.7, 1.0, 8.6, 7.4, 2.2
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+ series.Reserve(6)
+ series.Reserve(7)
+ series.Reserve(8)
+ series.Reserve(9)
+ series.Reserve(10)
+ series.Reserve(11)
+ series.Reserve(12)
+ series.Reserve(13)
+
+ series.Fill(1, 5.7)
+ series.Fill(2, 5.7)
+ series.Fill(3, 5.7)
+ series.Fill(4, 5.7)
+ series.Fill(5, 5.7)
+ series.Fill(6, 5.7)
+ series.Fill(7, 5.7)
+ series.Fill(8, 5.7)
+ series.Fill(9, 5.7)
+ series.Fill(10, 1.0)
+ series.Fill(11, 8.6)
+ series.Fill(12, 7.4)
+ series.Fill(13, 2.2)
+
+ if complete, sd := SeriesStandardDeviation[float64, int](series); complete != true ||
+ !utilities.ApproximatelyEqual(sd, expected, 0.01) {
+ test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd)
+ }
+}
+
+func Test_WindowOnlyStandardDeviationCalculation2(test *testing.T) {
+ expected := 1.41
+ series := newWindowSeriesWindowOnlyImpl[float64, int](5)
+
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+
+ series.Fill(1, 8)
+ series.Fill(2, 9)
+ series.Fill(3, 10)
+ series.Fill(4, 11)
+ series.Fill(5, 12)
+
+ if _, sd := SeriesStandardDeviation[float64, int](series); !utilities.ApproximatelyEqual(sd, expected, 0.01) {
+ test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd)
+ } else {
+ test.Logf("Standard deviation calculation result: %v", sd)
+ }
+}
+
+func Test_WindowOnlyRotatingValues(test *testing.T) {
+ series := newWindowSeriesWindowOnlyImpl[int, int](5)
+
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+
+ series.Reserve(6)
+ series.Reserve(7)
+
+ series.Fill(1, 1)
+ series.Fill(2, 2)
+ series.Fill(3, 3)
+ series.Fill(4, 4)
+ series.Fill(5, 5)
+
+ series.Fill(6, 6)
+ series.Fill(7, 7)
+
+ actual := utilities.Fmap(series.GetValues(), func(i utilities.Optional[int]) int {
+ return utilities.GetSome[int](i)
+ })
+ if !reflect.DeepEqual([]int{7, 6, 5, 4, 3}, actual) {
+ test.Fatalf("Adding values does not properly erase earlier values.")
+ }
+}
+
+func Test_WindowOnly_degenerate_percentile_too_high(test *testing.T) {
+ series := newWindowSeriesWindowOnlyImpl[int, int](21)
+ if complete, p := Percentile[int, int](series, 101); complete != false || p != 0 {
+ test.Fatalf("Series percentile of 101 failed.")
+ }
+}
+
+func Test_WindowOnly_degenerate_percentile_too_low(test *testing.T) {
+ series := newWindowSeriesWindowOnlyImpl[int, int](21)
+ if complete, p := Percentile[int, int](series, -1); complete != false || p != 0 {
+ test.Fatalf("Series percentile of -1 failed.")
+ }
+}
+
+func Test_WindowOnly90_percentile(test *testing.T) {
+ var expected int = 10
+ series := newWindowSeriesWindowOnlyImpl[int, int](10)
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+ series.Reserve(6)
+ series.Reserve(7)
+ series.Reserve(8)
+ series.Reserve(9)
+ series.Reserve(10)
+
+ series.Fill(10, 10)
+ series.Fill(9, 9)
+ series.Fill(8, 8)
+ series.Fill(7, 7)
+ series.Fill(6, 6)
+ series.Fill(5, 5)
+ series.Fill(4, 4)
+ series.Fill(3, 3)
+ series.Fill(2, 2)
+ series.Fill(1, 1)
+
+ if complete, result := Percentile[int, int](series, 90); !complete || result != expected {
+ test.Fatalf(
+ "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result)
+ }
+}
+
+func Test_WindowOnly90_percentile_reversed(test *testing.T) {
+ var expected int = 10
+ series := newWindowSeriesWindowOnlyImpl[int, int](10)
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+ series.Reserve(6)
+ series.Reserve(7)
+ series.Reserve(8)
+ series.Reserve(9)
+ series.Reserve(10)
+
+ series.Fill(10, 1)
+ series.Fill(9, 2)
+ series.Fill(8, 3)
+ series.Fill(7, 4)
+ series.Fill(6, 5)
+ series.Fill(5, 6)
+ series.Fill(4, 7)
+ series.Fill(3, 8)
+ series.Fill(2, 9)
+ series.Fill(1, 10)
+
+ if complete, result := Percentile[int, int](series, 90); !complete || result != expected {
+ test.Fatalf(
+ "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result)
+ }
+}
+
+func Test_WindowOnly50_percentile_jumbled(test *testing.T) {
+ var expected int64 = 15
+ series := newWindowSeriesWindowOnlyImpl[int64, int](10)
+
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+ series.Reserve(6)
+ series.Reserve(7)
+ series.Reserve(8)
+ series.Reserve(9)
+ series.Reserve(10)
+
+ series.Fill(1, 7)
+ series.Fill(2, 2)
+ series.Fill(3, 15)
+ series.Fill(4, 27)
+ series.Fill(5, 5)
+ series.Fill(6, 52)
+ series.Fill(7, 18)
+ series.Fill(8, 23)
+ series.Fill(9, 11)
+ series.Fill(10, 12)
+
+ if complete, result := Percentile[int64, int](series, 50); !complete || result != expected {
+ test.Fatalf(
+ "Series 50 percentile of a jumble of numbers failed: (Complete: %v) Expected %v got %v.", complete, expected, result)
+ }
+}
+
+func Test_WindowOnly90_partial_percentile(test *testing.T) {
+ var expected int = 10
+ series := newWindowSeriesWindowOnlyImpl[int, int](20)
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+ series.Reserve(6)
+ series.Reserve(7)
+ series.Reserve(8)
+ series.Reserve(9)
+ series.Reserve(10)
+
+ series.Fill(10, 10)
+ series.Fill(9, 9)
+ series.Fill(8, 8)
+ series.Fill(7, 7)
+ series.Fill(6, 6)
+ series.Fill(5, 5)
+ series.Fill(4, 4)
+ series.Fill(3, 3)
+ series.Fill(2, 2)
+ series.Fill(1, 1)
+
+ if complete, result := Percentile[int, int](series, 90); complete != false || result != expected {
+ test.Fatalf(
+ "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result)
+ }
+}
+
+func Test_WindowOnly90_partial_percentile_reversed(test *testing.T) {
+ var expected int = 10
+ series := newWindowSeriesWindowOnlyImpl[int, int](20)
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+ series.Reserve(6)
+ series.Reserve(7)
+ series.Reserve(8)
+ series.Reserve(9)
+ series.Reserve(10)
+
+ series.Fill(10, 1)
+ series.Fill(9, 2)
+ series.Fill(8, 3)
+ series.Fill(7, 4)
+ series.Fill(6, 5)
+ series.Fill(5, 6)
+ series.Fill(4, 7)
+ series.Fill(3, 8)
+ series.Fill(2, 9)
+ series.Fill(1, 10)
+
+ if complete, result := Percentile[int, int](series, 90); complete != false || result != expected {
+ test.Fatalf(
+ "Series 90th percentile of 0 ... 10 failed: (Complete: %v) Expected %v got %v.", complete, expected, result)
+ }
+}
+
+func Test_WindowOnly50_partial_percentile_jumbled(test *testing.T) {
+ var expected int64 = 15
+ series := newWindowSeriesWindowOnlyImpl[int64, int](20)
+
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+ series.Reserve(6)
+ series.Reserve(7)
+ series.Reserve(8)
+ series.Reserve(9)
+ series.Reserve(10)
+
+ series.Fill(1, 7)
+ series.Fill(2, 2)
+ series.Fill(3, 15)
+ series.Fill(4, 27)
+ series.Fill(5, 5)
+ series.Fill(6, 52)
+ series.Fill(7, 18)
+ series.Fill(8, 23)
+ series.Fill(9, 11)
+ series.Fill(10, 12)
+
+ if complete, result := Percentile[int64, int](series, 50); complete != false || result != expected {
+ test.Fatalf(
+ "Series 50 percentile of a jumble of numbers failed: (Complete: %v) Expected %v got %v.", complete, expected, result)
+ }
+}
+
+/*
+
+func Test_WindowOnlyDoubleSidedTrimmedMean_jumbled(test *testing.T) {
+ expected := 8
+ series := newWindowSeriesWindowOnlyImpl[int64, int](10)
+ series.AddElement(7)
+ series.AddElement(2)
+ series.AddElement(15)
+ series.AddElement(27)
+ series.AddElement(5)
+ series.AddElement(5)
+ series.AddElement(52)
+ series.AddElement(18)
+ series.AddElement(23)
+ series.AddElement(11)
+ series.AddElement(12)
+
+ trimmed := series.DoubleSidedTrim(10)
+
+ if trimmed.Len() != expected {
+ test.Fatalf(
+ "WindowOnly series is not of the proper size. Expected %v and got %v",
+ expected,
+ trimmed.Len(),
+ )
+ }
+
+ prev := int64(0)
+ for _, v := range trimmed.Values() {
+ if !(prev <= v) {
+ test.Fatalf("Not sorted: %v is not less than or equal to %v\n", prev, v)
+ }
+ prev = v
+ }
+}
+
+func Test_WindowOnlyAverage(test *testing.T) {
+ expected := 1.0082230220488836e+08
+ series := newWindowSeriesWindowOnlyImpl[float64, int](4)
+ series.AddElement(9.94747772516195e+07)
+ series.AddElement(9.991286984703423e+07)
+ series.AddElement(1.0285437111086299e+08)
+ series.AddElement(1.0104719061003672e+08)
+ if average := series.CalculateAverage(); !utilities.ApproximatelyEqual(average, 0.01, expected) {
+ test.Fatalf(
+ "Expected: %v; Actual: %v.", average, expected,
+ )
+ }
+}
+*/
+
+func Test_ForeverStandardDeviationIncompleteCalculation(test *testing.T) {
+ foreverExpected := 2.90
+ series := newWindowSeriesForeverImpl[float64, int]()
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+ series.Reserve(6)
+
+ series.Fill(1, 5.7)
+ series.Fill(2, 1.0)
+ series.Fill(3, 8.6)
+ series.Fill(4, 7.4)
+ series.Fill(5, 2.2)
+ series.Fill(6, 8.0)
+
+ if complete, sd := SeriesStandardDeviation[float64, int](series); !complete ||
+ !utilities.ApproximatelyEqual(sd, foreverExpected, 0.01) {
+ test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", foreverExpected, sd)
+ }
+}
+
+func Test_ForeverStandardDeviationCalculation2(test *testing.T) {
+ expected := 1.41
+ series := newWindowSeriesForeverImpl[float64, int]()
+
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+
+ series.Fill(1, 8)
+ series.Fill(2, 9)
+ series.Fill(3, 10)
+ series.Fill(4, 11)
+ series.Fill(5, 12)
+
+ if _, sd := SeriesStandardDeviation[float64, int](series); !utilities.ApproximatelyEqual(sd, expected, 0.01) {
+ test.Fatalf("Standard deviation(series) max calculation(series) failed: Expected: %v; Actual: %v.", expected, sd)
+ }
+}
diff --git a/series/statistics.go b/series/statistics.go
new file mode 100644
index 0000000..2742aa7
--- /dev/null
+++ b/series/statistics.go
@@ -0,0 +1,74 @@
+package series
+
+import (
+ "github.com/network-quality/goresponsiveness/utilities"
+ "golang.org/x/exp/constraints"
+)
+
+func SeriesStandardDeviation[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket]) (bool, float64) {
+ complete := s.Complete()
+
+ inputValues := s.GetValues()
+
+ actualValues := utilities.Filter(inputValues, func(d utilities.Optional[Data]) bool {
+ return utilities.IsSome[Data](d)
+ })
+ values := utilities.Fmap(actualValues, func(d utilities.Optional[Data]) Data { return utilities.GetSome[Data](d) })
+
+ return complete, utilities.CalculateStandardDeviation[Data](values)
+}
+
+func Percentile[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], p int) (bool, Data) {
+ complete := s.Complete()
+
+ inputValues := s.GetValues()
+
+ actualValues := utilities.Filter(inputValues, func(d utilities.Optional[Data]) bool {
+ return utilities.IsSome[Data](d)
+ })
+ values := utilities.Fmap(actualValues, func(d utilities.Optional[Data]) Data { return utilities.GetSome[Data](d) })
+
+ return complete, utilities.CalculatePercentile(values, p)
+}
+
+func AllSequentialIncreasesLessThan[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], limit float64,
+) (bool, bool, float64) {
+ complete := s.Complete()
+
+ inputValues := s.GetValues()
+
+ actualValues := utilities.Filter(utilities.Reverse(inputValues), func(d utilities.Optional[Data]) bool {
+ return utilities.IsSome[Data](d)
+ })
+ values := utilities.Fmap(actualValues, func(d utilities.Optional[Data]) Data { return utilities.GetSome[Data](d) })
+
+ result, actualLimit := utilities.AllSequentialIncreasesLessThan(values, limit)
+ return complete, result, actualLimit
+}
+
+func CalculateAverage[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket]) (bool, float64) {
+ complete := s.Complete()
+
+ inputValues := s.GetValues()
+
+ actualValues := utilities.Filter(inputValues, func(d utilities.Optional[Data]) bool {
+ return utilities.IsSome[Data](d)
+ })
+ values := utilities.Fmap(actualValues, func(d utilities.Optional[Data]) Data { return utilities.GetSome[Data](d) })
+
+ return complete, utilities.CalculateAverage(values)
+}
+
+func TrimmedMean[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], trim int) (bool, float64, []Data) {
+ complete := s.Complete()
+
+ inputValues := s.GetValues()
+
+ actualValues := utilities.Filter(inputValues, func(d utilities.Optional[Data]) bool {
+ return utilities.IsSome[Data](d)
+ })
+ values := utilities.Fmap(actualValues, func(d utilities.Optional[Data]) Data { return utilities.GetSome[Data](d) })
+
+ trimmedMean, trimmedElements := utilities.TrimmedMean(values, trim)
+ return complete, trimmedMean, trimmedElements
+}
diff --git a/stabilizer/algorithm.go b/stabilizer/algorithm.go
index 45c34d9..86b4bb6 100644
--- a/stabilizer/algorithm.go
+++ b/stabilizer/algorithm.go
@@ -19,19 +19,23 @@ import (
"sync"
"github.com/network-quality/goresponsiveness/debug"
- "github.com/network-quality/goresponsiveness/ms"
+ "github.com/network-quality/goresponsiveness/series"
+ "github.com/network-quality/goresponsiveness/utilities"
"golang.org/x/exp/constraints"
)
-type MeasurementStablizer[T constraints.Float | constraints.Integer] struct {
- instantaneousses ms.MathematicalSeries[T]
- aggregates ms.MathematicalSeries[float64]
+type MeasurementStablizer[Data constraints.Float | constraints.Integer, Bucket constraints.Ordered] struct {
+ // The number of instantaneous measurements in the current interval could be infinite (Forever).
+ instantaneousses series.WindowSeries[Data, Bucket]
+ // There are a fixed, finite number of aggregates (WindowOnly).
+ aggregates series.WindowSeries[series.WindowSeries[Data, Bucket], int]
stabilityStandardDeviation float64
trimmingLevel uint
m sync.Mutex
dbgLevel debug.DebugLevel
dbgConfig *debug.DebugWithPrefix
units string
+ currentInterval int
}
// Stabilizer parameters:
@@ -54,77 +58,159 @@ type MeasurementStablizer[T constraints.Float | constraints.Integer] struct {
// If the calculated standard deviation of *those* values is less than SDT, we declare
// stability.
-func NewStabilizer[T constraints.Float | constraints.Integer](
- mad uint,
+func NewStabilizer[Data constraints.Float | constraints.Integer, Bucket constraints.Ordered](
+ mad int,
sdt float64,
trimmingLevel uint,
units string,
debugLevel debug.DebugLevel,
debug *debug.DebugWithPrefix,
-) MeasurementStablizer[T] {
- return MeasurementStablizer[T]{
- instantaneousses: ms.NewCappedMathematicalSeries[T](mad),
- aggregates: ms.NewCappedMathematicalSeries[float64](mad),
+) MeasurementStablizer[Data, Bucket] {
+ return MeasurementStablizer[Data, Bucket]{
+ instantaneousses: series.NewWindowSeries[Data, Bucket](series.Forever, 0),
+ aggregates: series.NewWindowSeries[
+ series.WindowSeries[Data, Bucket], int](series.WindowOnly, mad),
stabilityStandardDeviation: sdt,
trimmingLevel: trimmingLevel,
units: units,
+ currentInterval: 0,
dbgConfig: debug,
dbgLevel: debugLevel,
}
}
-func (r3 *MeasurementStablizer[T]) AddMeasurement(measurement T) {
+func (r3 *MeasurementStablizer[Data, Bucket]) Reserve(bucket Bucket) {
+ r3.m.Lock()
+ defer r3.m.Unlock()
+ r3.instantaneousses.Reserve(bucket)
+}
+
+func (r3 *MeasurementStablizer[Data, Bucket]) AddMeasurement(bucket Bucket, measurement Data) {
+ r3.m.Lock()
+ defer r3.m.Unlock()
+
+ // Fill in the bucket in the current interval.
+ if err := r3.instantaneousses.Fill(bucket, measurement); err != nil {
+ if debug.IsDebug(r3.dbgLevel) {
+ fmt.Printf("%s: A bucket (with id %v) does not exist in the isntantaneousses.\n",
+ r3.dbgConfig.String(),
+ bucket)
+ }
+ }
+
+ // The result may have been retired from the current interval. Look in the older series
+ // to fill it in there if it is.
+ r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) {
+ if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) {
+ md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md)
+ if err := md.Fill(bucket, measurement); err != nil {
+ if debug.IsDebug(r3.dbgLevel) {
+ fmt.Printf("%s: A bucket (with id %v) does not exist in a historical window.\n",
+ r3.dbgConfig.String(),
+ bucket)
+ }
+ }
+ }
+ })
+
+ /*
+ // Add this instantaneous measurement to the mix of the MAD previous instantaneous measurements.
+ r3.instantaneousses.Fill(bucket, measurement)
+ // Calculate the moving average of the MAD previous instantaneous measurements (what the
+ // algorithm calls moving average aggregate throughput at interval p) and add it to
+ // the mix of MAD previous moving averages.
+
+ r3.aggregates.AutoFill(r3.instantaneousses.CalculateAverage())
+
+ if debug.IsDebug(r3.dbgLevel) {
+ fmt.Printf(
+ "%s: MA: %f Mbps (previous %d intervals).\n",
+ r3.dbgConfig.String(),
+ r3.aggregates.CalculateAverage(),
+ r3.aggregates.Len(),
+ )
+ }
+ */
+}
+
+func (r3 *MeasurementStablizer[Data, Bucket]) Interval() {
r3.m.Lock()
defer r3.m.Unlock()
- // Add this instantaneous measurement to the mix of the MAD previous instantaneous measurements.
- r3.instantaneousses.AddElement(measurement)
- // Calculate the moving average of the MAD previous instantaneous measurements (what the
- // algorithm calls moving average aggregate throughput at interval p) and add it to
- // the mix of MAD previous moving averages.
- r3.aggregates.AddElement(r3.instantaneousses.CalculateAverage())
if debug.IsDebug(r3.dbgLevel) {
fmt.Printf(
- "%s: MA: %f Mbps (previous %d intervals).\n",
+ "%s: stability interval marked (transitioning from %d to %d).\n",
r3.dbgConfig.String(),
- r3.aggregates.CalculateAverage(),
- r3.aggregates.Len(),
+ r3.currentInterval,
+ r3.currentInterval+1,
)
}
-}
-func (r3 *MeasurementStablizer[T]) IsStable() bool {
- // There are MAD number of measurements of the _moving average aggregate throughput
- // at interval p_ in movingAverages.
- isvalid, stddev := r3.aggregates.StandardDeviation()
+ // At the interval boundary, move the instantaneous series to
+ // the aggregates and start a new instantaneous series.
+ r3.aggregates.Reserve(r3.currentInterval)
+ r3.aggregates.Fill(r3.currentInterval, r3.instantaneousses)
- if !isvalid {
- // If there are not enough values in the series to be able to calculate a
- // standard deviation, then we know that we are not yet stable. Vamoose.
- return false
- }
+ r3.instantaneousses = series.NewWindowSeries[Data, Bucket](series.Forever, 0)
+ r3.currentInterval++
+}
- // Stability is determined by whether or not the standard deviation of the values
- // is within some percentage of the average.
- stabilityCutoff := r3.aggregates.CalculateAverage() * (r3.stabilityStandardDeviation / 100.0)
- isStable := stddev <= stabilityCutoff
+func (r3 *MeasurementStablizer[Data, Bucket]) IsStable() bool {
+ r3.m.Lock()
+ defer r3.m.Unlock()
if debug.IsDebug(r3.dbgLevel) {
fmt.Printf(
- "%s: Is Stable? %v; Standard Deviation: %f %s; Is Normally Distributed? %v; Standard Deviation Cutoff: %v %s).\n",
+ "%s: Determining stability in the %d th interval.\n",
r3.dbgConfig.String(),
- isStable,
- stddev,
- r3.units,
- r3.aggregates.IsNormallyDistributed(),
- stabilityCutoff,
- r3.units,
+ r3.currentInterval,
)
- fmt.Printf("%s: Values: ", r3.dbgConfig.String())
- for _, v := range r3.aggregates.Values() {
- fmt.Printf("%v, ", v)
+ }
+ // Determine if
+ // a) All the aggregates have values,
+ // b) All the aggregates are complete.
+ allComplete := true
+ r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) {
+ if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) {
+ md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md)
+ allComplete = md.Complete()
+ if debug.IsDebug(r3.dbgLevel) {
+ fmt.Printf("%s\n", md.String())
+ }
+ } else {
+ allComplete = false
}
- fmt.Printf("\n")
+ if debug.IsDebug(r3.dbgLevel) {
+ fmt.Printf(
+ "%s: The aggregate for the %d th interval was %s.\n",
+ r3.dbgConfig.String(),
+ b,
+ utilities.Conditional(allComplete, "complete", "incomplete"),
+ )
+ }
+ })
+
+ if !allComplete {
+ return false
}
+
+ // Calculate the averages of each of the aggregates.
+ averages := make([]float64, 0)
+ r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) {
+ if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) {
+ md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md)
+ _, average := series.CalculateAverage(md)
+ averages = append(averages, average)
+ }
+ })
+
+ // Calculate the standard deviation of the averages of the aggregates.
+ sd := utilities.CalculateStandardDeviation(averages)
+
+ // Take a percentage of the average of the averages of the aggregates ...
+ stabilityCutoff := utilities.CalculateAverage(averages) * (r3.stabilityStandardDeviation / 100.0)
+ // and compare that to the standard deviation to determine stability.
+ isStable := sd <= stabilityCutoff
+
return isStable
}
diff --git a/stabilizer/stabilizer.go b/stabilizer/stabilizer.go
index b582751..b157ce1 100644
--- a/stabilizer/stabilizer.go
+++ b/stabilizer/stabilizer.go
@@ -14,7 +14,12 @@
package stabilizer
-type Stabilizer[T any] interface {
- AddMeasurement(T)
+import (
+ "golang.org/x/exp/constraints"
+)
+
+type Stabilizer[Data any, Bucket constraints.Ordered] interface {
+ Interval()
+ AddMeasurement(Data, Bucket)
IsStable() bool
}
diff --git a/utilities/math.go b/utilities/math.go
new file mode 100644
index 0000000..1ecca61
--- /dev/null
+++ b/utilities/math.go
@@ -0,0 +1,144 @@
+/*
+ * This file is part of Go Responsiveness.
+ *
+ * Go Responsiveness is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software Foundation,
+ * either version 2 of the License, or (at your option) any later version.
+ * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package utilities
+
+import (
+ "math"
+ "sort"
+
+ "golang.org/x/exp/constraints"
+ "golang.org/x/exp/slices"
+)
+
+type Number interface {
+ constraints.Float | constraints.Integer
+}
+
+func CalculateAverage[T Number](elements []T) float64 {
+ total := T(0)
+ for i := 0; i < len(elements); i++ {
+ total += elements[i]
+ }
+ return float64(total) / float64(len(elements))
+}
+
+func CalculatePercentile[T Number](
+ elements []T,
+ p int,
+) (result T) {
+ result = T(0)
+ if p < 0 || p > 100 {
+ return
+ }
+
+ sort.Slice(elements, func(l int, r int) bool { return elements[l] < elements[r] })
+ pindex := int64((float64(p) / float64(100)) * float64(len(elements)))
+ result = elements[pindex]
+ return
+}
+
+func Max(x, y uint64) uint64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+
+func SignedPercentDifference[T constraints.Float | constraints.Integer](
+ current T,
+ previous T,
+) (difference float64) {
+ fCurrent := float64(current)
+ fPrevious := float64(previous)
+ return ((fCurrent - fPrevious) / fPrevious) * 100.0
+}
+
+func AbsPercentDifference(
+ current float64,
+ previous float64,
+) (difference float64) {
+ return (math.Abs(current-previous) / (float64(current+previous) / 2.0)) * float64(
+ 100,
+ )
+}
+
+func CalculateStandardDeviation[T constraints.Float | constraints.Integer](elements []T) float64 {
+ // From https://www.mathsisfun.com/data/standard-deviation-calculator.html
+ // Yes, for real!
+
+ // Calculate the average of the numbers ...
+ average := CalculateAverage(elements)
+
+ // Calculate the square of each of the elements' differences from the mean.
+ differences_squared := make([]float64, len(elements))
+ for index, value := range elements {
+ differences_squared[index] = math.Pow(float64(value-T(average)), 2)
+ }
+
+ // The variance is the average of the squared differences.
+ // So, we need to ...
+
+ // Accumulate all those squared differences.
+ sds := float64(0)
+ for _, dss := range differences_squared {
+ sds += dss
+ }
+
+ // And then divide that total by the number of elements
+ variance := sds / float64(len(elements))
+
+ // Finally, the standard deviation is the square root
+ // of the variance.
+ sd := float64(math.Sqrt(variance))
+ // sd := T(variance)
+
+ return sd
+}
+
+func AllSequentialIncreasesLessThan[T Number](elements []T, limit float64) (bool, float64) {
+ if len(elements) < 2 {
+ return false, 0.0
+ }
+
+ maximumSequentialIncrease := float64(0)
+ for i := 1; i < len(elements); i++ {
+ current := elements[i]
+ previous := elements[i-1]
+ percentChange := SignedPercentDifference(current, previous)
+ if percentChange > limit {
+ return false, percentChange
+ }
+ if percentChange > float64(maximumSequentialIncrease) {
+ maximumSequentialIncrease = percentChange
+ }
+ }
+ return true, maximumSequentialIncrease
+}
+
+// elements must already be sorted!
+func TrimBy[T Number](elements []T, trim int) []T {
+ numberToKeep := int(float32(len(elements)) * (float32(trim) / 100.0))
+
+ return elements[:numberToKeep]
+}
+
+func TrimmedMean[T Number](elements []T, trim int) (float64, []T) {
+ sortedElements := make([]T, len(elements))
+ copy(sortedElements, elements)
+ slices.Sort(sortedElements)
+
+ trimmedElements := TrimBy(sortedElements, trim)
+ return CalculateAverage(trimmedElements), trimmedElements
+}
diff --git a/utilities/utilities.go b/utilities/utilities.go
index ff04023..b976f77 100644
--- a/utilities/utilities.go
+++ b/utilities/utilities.go
@@ -21,13 +21,10 @@ import (
"math/rand"
"os"
"reflect"
- "sort"
"strings"
"sync"
"sync/atomic"
"time"
-
- "golang.org/x/exp/constraints"
)
// GitVersion is the Git revision hash
@@ -46,24 +43,6 @@ func IsInterfaceNil(ifc interface{}) bool {
(reflect.ValueOf(ifc).Kind() == reflect.Ptr && reflect.ValueOf(ifc).IsNil())
}
-func SignedPercentDifference[T constraints.Float | constraints.Integer](
- current T,
- previous T,
-) (difference float64) {
- fCurrent := float64(current)
- fPrevious := float64(previous)
- return ((fCurrent - fPrevious) / fPrevious) * 100.0
-}
-
-func AbsPercentDifference(
- current float64,
- previous float64,
-) (difference float64) {
- return (math.Abs(current-previous) / (float64(current+previous) / 2.0)) * float64(
- 100,
- )
-}
-
func Conditional[T any](condition bool, t T, f T) T {
if condition {
return t
@@ -137,13 +116,6 @@ func RandBetween(max int) int {
return rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % max
}
-func Max(x, y uint64) uint64 {
- if x > y {
- return x
- }
- return y
-}
-
func ChannelToSlice[S any](channel <-chan S) (slice []S) {
slice = make([]S, 0)
for element := range channel {
@@ -152,6 +124,26 @@ func ChannelToSlice[S any](channel <-chan S) (slice []S) {
return
}
+func Reverse[T any](elements []T) []T {
+ result := make([]T, len(elements))
+ iterator := len(elements) - 1
+ for _, v := range elements {
+ result[iterator] = v
+ iterator--
+ }
+ return result
+}
+
+func Filter[S any](elements []S, filterer func(S) bool) []S {
+ result := make([]S, 0)
+ for _, s := range elements {
+ if filterer(s) {
+ result = append(result, s)
+ }
+ }
+ return result
+}
+
func Fmap[S any, F any](elements []S, mapper func(S) F) []F {
result := make([]F, 0)
for _, s := range elements {
@@ -160,13 +152,6 @@ func Fmap[S any, F any](elements []S, mapper func(S) F) []F {
return result
}
-func CalculatePercentile[S float32 | int32 | float64 | int64](elements []S, percentile int) S {
- sort.Slice(elements, func(a, b int) bool { return elements[a] < elements[b] })
- elementsCount := len(elements)
- percentileIdx := elementsCount * (percentile / 100)
- return elements[percentileIdx]
-}
-
func OrTimeout(f func(), timeout time.Duration) {
completeChannel := func() chan interface{} {
completed := make(chan interface{})
@@ -227,9 +212,9 @@ func ContextSignaler(ctxt context.Context, st time.Duration, condition *func() b
}
}
-type Pair[T any] struct {
- First T
- Second T
+type Pair[T1, T2 any] struct {
+ First T1
+ Second T2
}
func PerSecondToInterval(rate int64) time.Duration {
diff --git a/utilities/utilities_test.go b/utilities/utilities_test.go
index 9cd4ef0..7f3d83a 100644
--- a/utilities/utilities_test.go
+++ b/utilities/utilities_test.go
@@ -126,3 +126,65 @@ func TestPerSecondToInterval(t *testing.T) {
t.Fatalf("Something that happens twice per second should happen every 5000ns.")
}
}
+
+func TestTrim(t *testing.T) {
+ elements := Iota(1, 101)
+
+ trimmedElements := TrimBy(elements, 75)
+
+ trimmedLength := len(trimmedElements)
+ trimmedLast := trimmedElements[trimmedLength-1]
+
+ if trimmedLength != 75 || trimmedLast != 75 {
+ t.Fatalf("When trimming, the length should be 75 but it is %d and/or the last element should be 75 but it is %d", trimmedLength, trimmedLast)
+ }
+}
+
+func TestTrim2(t *testing.T) {
+ elements := Iota(1, 11)
+
+ trimmedElements := TrimBy(elements, 75)
+
+ trimmedLength := len(trimmedElements)
+ trimmedLast := trimmedElements[trimmedLength-1]
+
+ if trimmedLength != 7 || trimmedLast != 7 {
+ t.Fatalf("When trimming, the length should be 7 but it is %d and/or the last element should be 7 but it is %d", trimmedLength, trimmedLast)
+ }
+}
+
+func TestTrim3(t *testing.T) {
+ elements := Iota(1, 6)
+
+ trimmedElements := TrimBy(elements, 101)
+
+ trimmedLength := len(trimmedElements)
+ trimmedLast := trimmedElements[trimmedLength-1]
+
+ if trimmedLength != 5 || trimmedLast != 5 {
+ t.Fatalf("When trimming, the length should be 5 but it is %d and/or the last element should be 5 but it is %d", trimmedLength, trimmedLast)
+ }
+}
+
+func TestTrim4(t *testing.T) {
+ elements := Iota(1, 11)
+
+ trimmedElements := TrimBy(elements, 81)
+
+ trimmedLength := len(trimmedElements)
+ trimmedLast := trimmedElements[trimmedLength-1]
+
+ if trimmedLength != 8 || trimmedLast != 8 {
+ t.Fatalf("When trimming, the length should be 8 but it is %d and/or the last element should be 8 but it is %d", trimmedLength, trimmedLast)
+ }
+}
+
+func TestTrimmedMean(t *testing.T) {
+ expected := 2.5
+ elements := []int{5, 4, 3, 2, 1}
+
+ result, elements := TrimmedMean(elements, 80)
+ if result != expected || len(elements) != 4 || elements[len(elements)-1] != 4 {
+ t.Fatalf("The trimmed mean result %v does not match the expected value %v", result, expected)
+ }
+}