From e60066f572464da3a6f55999a48b1d38cf6423d6 Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Fri, 26 Jan 2024 20:46:05 -0500 Subject: [Bugfix] Only probes collected during stable MAD count For calculating the final RPM, only those probes that are sent/received during the stable MAD should count. Signed-off-by: Will Hawkins --- series/series.go | 232 +++++++++++++++++++++++++++++++++++++++++++++----- series/series_test.go | 180 ++++++++++++++++++++++++++++++++++++++- series/statistics.go | 11 ++- 3 files changed, 393 insertions(+), 30 deletions(-) (limited to 'series') diff --git a/series/series.go b/series/series.go index ef133d2..de0d031 100644 --- a/series/series.go +++ b/series/series.go @@ -14,11 +14,12 @@ package series import ( + "cmp" "fmt" + "slices" "sync" "github.com/network-quality/goresponsiveness/utilities" - "golang.org/x/exp/constraints" ) type WindowSeriesDuration int @@ -28,7 +29,7 @@ const ( WindowOnly WindowSeriesDuration = iota ) -type WindowSeries[Data any, Bucket constraints.Ordered] interface { +type WindowSeries[Data any, Bucket utilities.Number] interface { fmt.Stringer Reserve(b Bucket) error @@ -42,15 +43,25 @@ type WindowSeries[Data any, Bucket constraints.Ordered] interface { Complete() bool GetType() WindowSeriesDuration + ExtractBoundedSeries() WindowSeries[Data, Bucket] + Append(appended *WindowSeries[Data, Bucket]) + BoundedAppend(appended *WindowSeries[Data, Bucket]) + + GetBucketBounds() (Bucket, Bucket) + + SetTrimmingBucketBounds(Bucket, Bucket) + ResetTrimmingBucketBounds() } -type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct { - windowSize int - data []utilities.Pair[Bucket, utilities.Optional[Data]] - latestIndex int - empty bool - lock sync.RWMutex +type windowSeriesWindowOnlyImpl[Data any, Bucket utilities.Number] struct { + windowSize int + data []utilities.Pair[Bucket, utilities.Optional[Data]] + latestIndex int // invariant: newest data is there. + empty bool + lock sync.RWMutex + lowerTrimmingBound utilities.Optional[Bucket] + upperTrimmingBound utilities.Optional[Bucket] } /* @@ -80,6 +91,13 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error { return nil } +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) BucketBounds() (Bucket, Bucket) { + newestBucket := wsi.data[wsi.latestIndex].First + oldestBucket := wsi.data[wsi.nextIndex(wsi.latestIndex)].First + + return oldestBucket, newestBucket +} + func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) error { wsi.lock.Lock() defer wsi.lock.Unlock() @@ -150,15 +168,28 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optio } result := make([]utilities.Optional[Data], wsi.windowSize) + var lowerTrimmingBound, upperTrimmingBound Bucket + hasBounds := false + if utilities.IsSome(wsi.lowerTrimmingBound) { + hasBounds = true + lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound) + upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound) + result = make([]utilities.Optional[Data], + int(upperTrimmingBound-lowerTrimmingBound)+1) + } + if wsi.empty { return result } iterator := wsi.latestIndex parallelIterator := 0 for { - result[parallelIterator] = wsi.data[iterator].Second + if !hasBounds || (lowerTrimmingBound <= wsi.data[iterator].First && + wsi.data[iterator].First <= upperTrimmingBound) { + result[parallelIterator] = wsi.data[iterator].Second + parallelIterator++ + } iterator = wsi.previousIndex(iterator) - parallelIterator++ if iterator == wsi.latestIndex { break } @@ -166,6 +197,32 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optio return result } +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ExtractBoundedSeries() WindowSeries[Data, Bucket] { + wsi.lock.Lock() + defer wsi.lock.Unlock() + + result := NewWindowSeries[Data, Bucket](WindowOnly, wsi.windowSize) + + var lowerTrimmingBound, upperTrimmingBound Bucket + hasBounds := false + if utilities.IsSome(wsi.lowerTrimmingBound) { + hasBounds = true + lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound) + upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound) + } + + for _, v := range wsi.data { + if hasBounds && (v.First < lowerTrimmingBound || v.First > upperTrimmingBound) { + continue + } + result.Reserve(v.First) + if utilities.IsSome(v.Second) { + result.Fill(v.First, utilities.GetSome(v.Second)) + } + } + return result +} + func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] { wsi.lock.Lock() defer wsi.lock.Unlock() @@ -198,11 +255,31 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string { return result } +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) SetTrimmingBucketBounds(lower Bucket, upper Bucket) { + wsi.lowerTrimmingBound = utilities.Some[Bucket](lower) + wsi.upperTrimmingBound = utilities.Some[Bucket](upper) +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ResetTrimmingBucketBounds() { + wsi.lowerTrimmingBound = utilities.None[Bucket]() + wsi.upperTrimmingBound = utilities.None[Bucket]() +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetBucketBounds() (Bucket, Bucket) { + wsi.lock.Lock() + defer wsi.lock.Unlock() + return wsi.data[wsi.nextIndex(wsi.latestIndex)].First, wsi.data[wsi.latestIndex].First +} + func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) { - panic("") + panic("Append is unimplemented on a window-only Window Series") +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) BoundedAppend(appended *WindowSeries[Data, Bucket]) { + panic("BoundedAppend is unimplemented on a window-only Window Series") } -func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered]( +func newWindowSeriesWindowOnlyImpl[Data any, Bucket utilities.Number]( windowSize int, ) *windowSeriesWindowOnlyImpl[Data, Bucket] { result := windowSeriesWindowOnlyImpl[Data, Bucket]{windowSize: windowSize, latestIndex: 0, empty: true} @@ -216,17 +293,19 @@ func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered]( * End of WindowSeries interface methods. */ -type windowSeriesForeverImpl[Data any, Bucket constraints.Ordered] struct { - data []utilities.Pair[Bucket, utilities.Optional[Data]] - empty bool - lock sync.RWMutex +type windowSeriesForeverImpl[Data any, Bucket utilities.Number] struct { + data []utilities.Pair[Bucket, utilities.Optional[Data]] + empty bool + lock sync.RWMutex + lowerTrimmingBound utilities.Optional[Bucket] + upperTrimmingBound utilities.Optional[Bucket] } func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error { wsi.lock.Lock() defer wsi.lock.Unlock() if !wsi.empty && b <= wsi.data[len(wsi.data)-1].First { - fmt.Printf("reserving must be monotonically increasing") + fmt.Printf("reserving must be monotonically increasing: %v vs %v", b, wsi.data[len(wsi.data)-1].First) return fmt.Errorf("reserving must be monotonically increasing") } @@ -247,13 +326,53 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error { return fmt.Errorf("attempting to fill a bucket that does not exist") } +func (wsi *windowSeriesForeverImpl[Data, Bucket]) ExtractBoundedSeries() WindowSeries[Data, Bucket] { + wsi.lock.Lock() + defer wsi.lock.Unlock() + + var lowerTrimmingBound, upperTrimmingBound Bucket + hasBounds := false + if utilities.IsSome(wsi.lowerTrimmingBound) { + hasBounds = true + lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound) + upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound) + } + + result := NewWindowSeries[Data, Bucket](Forever, 0) + + for _, v := range wsi.data { + if hasBounds && (v.First < lowerTrimmingBound || v.First > upperTrimmingBound) { + continue + } + result.Reserve(v.First) + if utilities.IsSome(v.Second) { + result.Fill(v.First, utilities.GetSome(v.Second)) + } + } + return result +} + func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] { wsi.lock.Lock() defer wsi.lock.Unlock() result := make([]utilities.Optional[Data], len(wsi.data)) - for i, v := range utilities.Reverse(wsi.data) { - result[i] = v.Second + var lowerTrimmingBound, upperTrimmingBound Bucket + hasBounds := false + if utilities.IsSome(wsi.lowerTrimmingBound) { + hasBounds = true + lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound) + upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound) + result = make([]utilities.Optional[Data], + int(upperTrimmingBound-lowerTrimmingBound)+1) + } + + index := 0 + for _, v := range wsi.data { + if !hasBounds || (lowerTrimmingBound <= v.First && v.First <= upperTrimmingBound) { + result[index] = v.Second + index++ + } } return result @@ -289,8 +408,12 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetType() WindowSeriesDuration return Forever } -func newWindowSeriesForeverImpl[Data any, Bucket constraints.Ordered]() *windowSeriesForeverImpl[Data, Bucket] { - result := windowSeriesForeverImpl[Data, Bucket]{empty: true} +func newWindowSeriesForeverImpl[Data any, Bucket utilities.Number]() *windowSeriesForeverImpl[Data, Bucket] { + result := windowSeriesForeverImpl[Data, Bucket]{ + empty: true, + lowerTrimmingBound: utilities.None[Bucket](), + upperTrimmingBound: utilities.None[Bucket](), + } result.data = nil @@ -319,6 +442,32 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string { return result } +func (wsi *windowSeriesForeverImpl[Data, Bucket]) SetTrimmingBucketBounds(lower Bucket, upper Bucket) { + wsi.lowerTrimmingBound = utilities.Some[Bucket](lower) + wsi.upperTrimmingBound = utilities.Some[Bucket](upper) +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) ResetTrimmingBucketBounds() { + wsi.lowerTrimmingBound = utilities.None[Bucket]() + wsi.upperTrimmingBound = utilities.None[Bucket]() +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetBucketBounds() (Bucket, Bucket) { + wsi.lock.Lock() + defer wsi.lock.Unlock() + if wsi.empty { + return 0, 0 + } + return wsi.data[0].First, wsi.data[len(wsi.data)-1].First +} + +// Sort the data according to the bucket ids (ascending) +func (wsi *windowSeriesForeverImpl[Data, Bucket]) sort() { + slices.SortFunc(wsi.data, func(left, right utilities.Pair[Bucket, utilities.Optional[Data]]) int { + return cmp.Compare(left.First, right.First) + }) +} + func (wsi *windowSeriesForeverImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) { result, ok := (*appended).(*windowSeriesForeverImpl[Data, Bucket]) if !ok { @@ -330,19 +479,56 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Append(appended *WindowSeries[ defer result.lock.Unlock() wsi.data = append(wsi.data, result.data...) + // Because the series that we are appending in may have overlapping buckets, + // we will sort them to maintain the invariant that the data items are sorted + // by bucket ids (increasing). + wsi.sort() +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) BoundedAppend(appended *WindowSeries[Data, Bucket]) { + result, ok := (*appended).(*windowSeriesForeverImpl[Data, Bucket]) + if !ok { + panic("Cannot merge a forever window series with a non-forever window series.") + } + wsi.lock.Lock() + defer wsi.lock.Unlock() + result.lock.Lock() + defer result.lock.Unlock() + + if utilities.IsNone(result.lowerTrimmingBound) || + utilities.IsNone(result.upperTrimmingBound) { + wsi.sort() + wsi.data = append(wsi.data, result.data...) + return + } else { + lowerTrimmingBound := utilities.GetSome(result.lowerTrimmingBound) + upperTrimmingBound := utilities.GetSome(result.upperTrimmingBound) + + toAppend := utilities.Filter(result.data, func( + element utilities.Pair[Bucket, utilities.Optional[Data]], + ) bool { + bucket := element.First + return lowerTrimmingBound <= bucket && bucket <= upperTrimmingBound + }) + wsi.data = append(wsi.data, toAppend...) + } + // Because the series that we are appending in may have overlapping buckets, + // we will sort them to maintain the invariant that the data items are sorted + // by bucket ids (increasing). + wsi.sort() } /* * End of WindowSeries interface methods. */ -func NewWindowSeries[Data any, Bucket constraints.Ordered](tipe WindowSeriesDuration, windowSize int) WindowSeries[Data, Bucket] { +func NewWindowSeries[Data any, Bucket utilities.Number](tipe WindowSeriesDuration, windowSize int) WindowSeries[Data, Bucket] { if tipe == WindowOnly { return newWindowSeriesWindowOnlyImpl[Data, Bucket](windowSize) } else if tipe == Forever { return newWindowSeriesForeverImpl[Data, Bucket]() } - panic("") + panic("Attempting to create a new window series with an invalid type.") } type NumericBucketGenerator[T utilities.Number] struct { diff --git a/series/series_test.go b/series/series_test.go index 54ddfcc..82778fc 100644 --- a/series/series_test.go +++ b/series/series_test.go @@ -109,7 +109,7 @@ func Test_ForeverValues(test *testing.T) { shouldMatch = append(shouldMatch, utilities.Some[float64](previous)) } - if !reflect.DeepEqual(utilities.Reverse(shouldMatch), series.GetValues()) { + if !reflect.DeepEqual(shouldMatch, series.GetValues()) { test.Fatalf("Values() on infinite mathematical series does not work.") } } @@ -941,3 +941,181 @@ func Test_ForeverLocking(test *testing.T) { test.Fatalf("Mutual exclusion checks did not properly lock out parallel ForEach operations.") } } + +func Test_ForeverGetBucketBounds(test *testing.T) { + series := newWindowSeriesForeverImpl[float64, int]() + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + + series.Fill(1, 8) + series.Fill(2, 9) + series.Fill(3, 10) + series.Fill(4, 11) + series.Fill(5, 12) + + lower, upper := series.GetBucketBounds() + if lower != 1 || upper != 5 { + test.Fatalf("expected a lower of 1 and upper of 5; got %v and %v, respectively!\n", lower, upper) + } +} + +func Test_WindowGetBucketBounds(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[float64, int](3) + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + + series.Fill(3, 10) + series.Fill(4, 11) + series.Fill(5, 12) + + lower, upper := series.GetBucketBounds() + if lower != 3 || upper != 5 { + test.Fatalf("expected a lower of 3 and upper of 5; got %v and %v, respectively!\n", lower, upper) + } +} + +func Test_ForeverBucketBoundsEmpty(test *testing.T) { + series := newWindowSeriesForeverImpl[float64, int]() + + lower, upper := series.GetBucketBounds() + if lower != 0 || upper != 0 { + test.Fatalf("expected a lower of 0 and upper of 0; got %v and %v, respectively!\n", lower, upper) + } +} + +func Test_ForeverTrimmingBucketBounds(test *testing.T) { + series := newWindowSeriesForeverImpl[float64, int]() + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + + series.Fill(1, 8) + series.Fill(2, 9) + series.Fill(3, 10) + series.Fill(4, 11) + series.Fill(5, 12) + + series.SetTrimmingBucketBounds(3, 5) + + trimmedValues := series.GetValues() + if len(trimmedValues) != 3 { + test.Fatalf("Expected that the list would have 3 elements but it only had %v!\n", len(trimmedValues)) + } + if utilities.GetSome(trimmedValues[0]) != 10 || utilities.GetSome(trimmedValues[1]) != 11 || + utilities.GetSome(trimmedValues[2]) != 12 { + test.Fatalf("Expected values are not the actual values.\n") + } +} + +func Test_WindowTrimmingBucketBounds(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[float64, int](5) + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + + series.Fill(1, 8) + series.Fill(2, 9) + series.Fill(3, 10) + series.Fill(4, 11) + series.Fill(5, 12) + + series.SetTrimmingBucketBounds(3, 5) + + trimmedValues := series.GetValues() + if len(trimmedValues) != 3 { + test.Fatalf("Expected that the list would have 3 elements but it only had %v!\n", len(trimmedValues)) + } + if utilities.GetSome(trimmedValues[0]) != 12 || utilities.GetSome(trimmedValues[1]) != 11 || + utilities.GetSome(trimmedValues[2]) != 10 { + test.Fatalf("Expected values are not the actual values.\n") + } +} + +func Test_ForeverBoundedAppend(test *testing.T) { + appending_series := NewWindowSeries[float64, int](Forever, 0) + baseSeries := NewWindowSeries[float64, int](Forever, 0) + + baseSeries.Reserve(1) + baseSeries.Fill(1, 1) + baseSeries.Reserve(2) + baseSeries.Fill(2, 2) + baseSeries.Reserve(3) + baseSeries.Fill(3, 3) + + appending_series.Reserve(4) + appending_series.Reserve(5) + appending_series.Reserve(6) + appending_series.Reserve(7) + appending_series.Reserve(8) + + appending_series.Fill(4, 8) + appending_series.Fill(5, 9) + appending_series.Fill(6, 10) + appending_series.Fill(7, 11) + appending_series.Fill(8, 12) + + appending_series.SetTrimmingBucketBounds(6, 8) + + baseSeries.BoundedAppend(&appending_series) + + if len(baseSeries.GetValues()) != 6 { + test.Fatalf("The base series should have 6 values, but it actually has %v (bounded test)", len(baseSeries.GetValues())) + } + + baseSeriesValues := baseSeries.GetValues() + if utilities.GetSome(baseSeriesValues[0]) != 1 || + utilities.GetSome(baseSeriesValues[1]) != 2 || + utilities.GetSome(baseSeriesValues[2]) != 3 || + utilities.GetSome(baseSeriesValues[3]) != 10 || + utilities.GetSome(baseSeriesValues[4]) != 11 || + utilities.GetSome(baseSeriesValues[5]) != 12 { + test.Fatalf("The values that should be in a series with bounded append are not there.") + } + baseSeries = NewWindowSeries[float64, int](Forever, 0) + baseSeries.Append(&appending_series) + if len(baseSeries.GetValues()) != 5 { + test.Fatalf("The base series should have 5 values, but it actually has %v (unbounded test)", len(baseSeries.GetValues())) + } +} + +func Test_ForeverExtractBounded(test *testing.T) { + series := NewWindowSeries[float64, int](Forever, 0) + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + + series.Fill(1, 8) + series.Fill(2, 9) + series.Fill(3, 10) + series.Fill(4, 11) + series.Fill(5, 12) + + extracted := series.ExtractBoundedSeries() + + if len(extracted.GetValues()) != 5 { + test.Fatalf("Expected the extracted list to have 5 values but it really has %v", len(extracted.GetValues())) + } + + series.SetTrimmingBucketBounds(3, 5) + extracted = series.ExtractBoundedSeries() + if len(extracted.GetValues()) != 3 { + test.Fatalf("Expected the extracted list to have 3 values but it really has %v", len(extracted.GetValues())) + } +} diff --git a/series/statistics.go b/series/statistics.go index 5d11164..5ff4559 100644 --- a/series/statistics.go +++ b/series/statistics.go @@ -16,10 +16,9 @@ package series import ( "github.com/network-quality/goresponsiveness/utilities" - "golang.org/x/exp/constraints" ) -func SeriesStandardDeviation[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket]) (bool, float64) { +func SeriesStandardDeviation[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket]) (bool, float64) { complete := s.Complete() inputValues := s.GetValues() @@ -32,7 +31,7 @@ func SeriesStandardDeviation[Data utilities.Number, Bucket constraints.Ordered]( return complete, utilities.CalculateStandardDeviation[Data](values) } -func Percentile[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], p uint) (bool, Data) { +func Percentile[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket], p uint) (bool, Data) { complete := s.Complete() inputValues := s.GetValues() @@ -45,7 +44,7 @@ func Percentile[Data utilities.Number, Bucket constraints.Ordered](s WindowSerie return complete, utilities.CalculatePercentile(values, p) } -func AllSequentialIncreasesLessThan[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], limit float64, +func AllSequentialIncreasesLessThan[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket], limit float64, ) (bool, bool, float64) { complete := s.Complete() @@ -60,7 +59,7 @@ func AllSequentialIncreasesLessThan[Data utilities.Number, Bucket constraints.Or return complete, result, actualLimit } -func CalculateAverage[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket]) (bool, float64) { +func CalculateAverage[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket]) (bool, float64) { complete := s.Complete() inputValues := s.GetValues() @@ -73,7 +72,7 @@ func CalculateAverage[Data utilities.Number, Bucket constraints.Ordered](s Windo return complete, utilities.CalculateAverage(values) } -func TrimmedMean[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], trim int) (bool, float64, []Data) { +func TrimmedMean[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket], trim int) (bool, float64, []Data) { complete := s.Complete() inputValues := s.GetValues() -- cgit v1.2.3