From e60066f572464da3a6f55999a48b1d38cf6423d6 Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Fri, 26 Jan 2024 20:46:05 -0500 Subject: [Bugfix] Only probes collected during stable MAD count For calculating the final RPM, only those probes that are sent/received during the stable MAD should count. Signed-off-by: Will Hawkins --- direction/direction.go | 2 + networkQuality.go | 91 +++++++++++++++---- rpm/calculations.go | 11 +-- series/series.go | 232 +++++++++++++++++++++++++++++++++++++++++++----- series/series_test.go | 180 ++++++++++++++++++++++++++++++++++++- series/statistics.go | 11 ++- stabilizer/algorithm.go | 32 ++++++- 7 files changed, 503 insertions(+), 56 deletions(-) diff --git a/direction/direction.go b/direction/direction.go index 9b41b26..2cc9fa9 100644 --- a/direction/direction.go +++ b/direction/direction.go @@ -45,4 +45,6 @@ type Direction struct { ThroughputActivityCtx *context.Context ThroughputActivityCtxCancel *context.CancelFunc FormattedResults string + LowerBucketBound uint64 + UpperBucketBound uint64 } diff --git a/networkQuality.go b/networkQuality.go index f22cc32..e88a000 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -568,6 +568,7 @@ func main() { baselineProbeDebugging, ) + lowerBucketBound, upperBucketBound := uint64(0), uint64(0) baseline_responsiveness_timeout: for !baselineStableResponsiveness { select { @@ -642,13 +643,23 @@ func main() { baselineNetworkActivityCtxCancel() baselineProberOperatorCtxCancel() - baselineRpm = rpm.CalculateRpm(baselineFauxSelfDownloadRtts, - baselineForeignDownloadRtts, specParameters.TrimmedMeanPct, specParameters.Percentile) + lowerBucketBound, upperBucketBound = baselineResponsivenessStabilizer.GetBounds() - fmt.Printf("Baseline RPM: %5.0f (P%d)\n", baselineRpm.PNRpm, specParameters.Percentile) - fmt.Printf("Baseline RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", - baselineRpm.MeanRpm, specParameters.TrimmedMeanPct) + if *debugCliFlag { + fmt.Printf("Baseline responsiveness stablizer bucket bounds: (%v, %v)\n", lowerBucketBound, upperBucketBound) + } + + for _, label := range []string{"Unbounded ", ""} { + baselineRpm = rpm.CalculateRpm(baselineFauxSelfDownloadRtts, + baselineForeignDownloadRtts, specParameters.TrimmedMeanPct, specParameters.Percentile) + fmt.Printf("%vBaseline RPM: %5.0f (P%d)\n", label, baselineRpm.PNRpm, specParameters.Percentile) + fmt.Printf("%vBaseline RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", + label, baselineRpm.MeanRpm, specParameters.TrimmedMeanPct) + + baselineFauxSelfDownloadRtts.SetTrimmingBucketBounds(lowerBucketBound, upperBucketBound) + baselineForeignDownloadRtts.SetTrimmingBucketBounds(lowerBucketBound, upperBucketBound) + } } var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil @@ -911,6 +922,9 @@ func main() { } case <-timeoutChannel: { + if *debugCliFlag { + fmt.Printf("%v responsiveness seeking interval has expired.\n", direction.DirectionLabel) + } break responsiveness_timeout } case <-stabilityCheckTimeChannel: @@ -933,7 +947,7 @@ func main() { if *debugCliFlag { fmt.Printf( - "Responsiveness is instantaneously %s.\n", + "%v responsiveness is instantaneously %s.\n", direction.DirectionLabel, utilities.Conditional(direction.StableResponsiveness, "stable", "unstable")) } @@ -1019,6 +1033,8 @@ func main() { (*direction.ThroughputActivityCtxCancel)() } + direction.LowerBucketBound, direction.UpperBucketBound = responsivenessStabilizer.GetBounds() + // Add a header to the results direction.FormattedResults += fmt.Sprintf("%v:\n", direction.DirectionLabel) @@ -1037,13 +1053,29 @@ func main() { direction.FormattedResults += utilities.IndentOutput( fmt.Sprintf("%v", extendedStats.Repr()), 1, "\t") } - directionResult := rpm.CalculateRpm(direction.SelfRtts, direction.ForeignRtts, - specParameters.TrimmedMeanPct, specParameters.Percentile) + + var directionResult *rpm.Rpm[float64] = nil + for _, label := range []string{"Unbounded ", ""} { + directionResult = rpm.CalculateRpm(direction.SelfRtts, direction.ForeignRtts, + specParameters.TrimmedMeanPct, specParameters.Percentile) + if *debugCliFlag { + direction.FormattedResults += utilities.IndentOutput( + fmt.Sprintf("%vRPM Calculation Statistics:\n", label), 1, "\t") + direction.FormattedResults += utilities.IndentOutput(directionResult.ToString(), 2, "\t") + } + + direction.SelfRtts.SetTrimmingBucketBounds( + direction.LowerBucketBound, direction.UpperBucketBound) + direction.ForeignRtts.SetTrimmingBucketBounds( + direction.LowerBucketBound, direction.UpperBucketBound) + } + if *debugCliFlag { direction.FormattedResults += utilities.IndentOutput( - "RPM Calculation Statistics:\n", 1, "\t") - direction.FormattedResults += utilities.IndentOutput(directionResult.ToString(), 2, "\t") + fmt.Sprintf("Bucket bounds: (%v, %v)\n", + direction.LowerBucketBound, direction.UpperBucketBound), 1, "\t") } + if *printQualityAttenuation { direction.FormattedResults += utilities.IndentOutput( "Quality Attenuation Statistics:\n", 1, "\t") @@ -1121,7 +1153,7 @@ func main() { direction.GranularThroughputDataLogger.Close() if *debugCliFlag { - fmt.Printf("In debugging mode, we will cool down between tests.\n") + fmt.Printf("In debugging mode, we will cool down after tests.\n") time.Sleep(constants.CooldownPeriod) fmt.Printf("Done cooling down.\n") } @@ -1168,18 +1200,39 @@ func main() { fmt.Printf("========\n") } - allSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) - allForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) + if *debugCliFlag { + unboundedAllSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) + unboundedAllForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) + + unboundedAllSelfRtts.Append(&downloadDirection.SelfRtts) + unboundedAllSelfRtts.Append(&uploadDirection.SelfRtts) + unboundedAllForeignRtts.Append(&downloadDirection.ForeignRtts) + unboundedAllForeignRtts.Append(&uploadDirection.ForeignRtts) + + result := rpm.CalculateRpm(unboundedAllSelfRtts, unboundedAllForeignRtts, + specParameters.TrimmedMeanPct, specParameters.Percentile) + + fmt.Printf("(Unbounded Final RPM Calculation stats):\n%v\n", result.ToString()) + + fmt.Printf("Unbounded Final RPM: %.0f (P%d)\n", result.PNRpm, specParameters.Percentile) + fmt.Printf("Unbounded Final RPM: %.0f (Single-Sided %v%% Trimmed Mean)\n", + result.MeanRpm, specParameters.TrimmedMeanPct) + fmt.Printf("\n") + } + + boundedAllSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) + boundedAllForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) - allSelfRtts.Append(&downloadDirection.SelfRtts) - allSelfRtts.Append(&uploadDirection.SelfRtts) - allForeignRtts.Append(&downloadDirection.ForeignRtts) - allForeignRtts.Append(&uploadDirection.ForeignRtts) + boundedAllSelfRtts.BoundedAppend(&downloadDirection.SelfRtts) + boundedAllSelfRtts.BoundedAppend(&uploadDirection.SelfRtts) + boundedAllForeignRtts.BoundedAppend(&downloadDirection.ForeignRtts) + boundedAllForeignRtts.BoundedAppend(&uploadDirection.ForeignRtts) - result := rpm.CalculateRpm(allSelfRtts, allForeignRtts, specParameters.TrimmedMeanPct, specParameters.Percentile) + result := rpm.CalculateRpm(boundedAllSelfRtts, boundedAllForeignRtts, + specParameters.TrimmedMeanPct, specParameters.Percentile) if *debugCliFlag { - fmt.Printf("(Final RPM Calculation stats): %v\n", result.ToString()) + fmt.Printf("(Final RPM Calculation stats):\n%v\n", result.ToString()) } fmt.Printf("Final RPM: %.0f (P%d)\n", result.PNRpm, specParameters.Percentile) diff --git a/rpm/calculations.go b/rpm/calculations.go index f3619dc..851e168 100644 --- a/rpm/calculations.go +++ b/rpm/calculations.go @@ -19,7 +19,6 @@ import ( "github.com/network-quality/goresponsiveness/series" "github.com/network-quality/goresponsiveness/utilities" - "golang.org/x/exp/constraints" ) type Rpm[Data utilities.Number] struct { @@ -35,7 +34,7 @@ type Rpm[Data utilities.Number] struct { MeanRpm float64 } -func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered]( +func CalculateRpm[Data utilities.Number, Bucket utilities.Number]( selfRtts series.WindowSeries[Data, Bucket], aggregatedForeignRtts series.WindowSeries[Data, Bucket], trimming uint, percentile uint, ) *Rpm[Data] { // There may be more than one round trip accumulated together. If that is the case, @@ -56,12 +55,14 @@ func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered]( } } + boundedSelfRtts := selfRtts.ExtractBoundedSeries() + // First, let's do a double-sided trim of the top/bottom 10% of our measurements. - selfRttsTotalCount, _ := selfRtts.Count() + selfRttsTotalCount, _ := boundedSelfRtts.Count() foreignRttsTotalCount, _ := foreignRtts.Count() _, selfProbeRoundTripTimeMean, selfRttsTrimmed := - series.TrimmedMean(selfRtts, int(trimming)) + series.TrimmedMean(boundedSelfRtts, int(trimming)) _, foreignProbeRoundTripTimeMean, foreignRttsTrimmed := series.TrimmedMean(foreignRtts, int(trimming)) @@ -69,7 +70,7 @@ func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered]( foreignRttsTrimmedCount := len(foreignRttsTrimmed) // Second, let's do the P90 calculations. - _, selfProbeRoundTripTimePN := series.Percentile(selfRtts, percentile) + _, selfProbeRoundTripTimePN := series.Percentile(boundedSelfRtts, percentile) _, foreignProbeRoundTripTimePN := series.Percentile(foreignRtts, percentile) // Note: The specification indicates that we want to calculate the foreign probes as such: diff --git a/series/series.go b/series/series.go index ef133d2..de0d031 100644 --- a/series/series.go +++ b/series/series.go @@ -14,11 +14,12 @@ package series import ( + "cmp" "fmt" + "slices" "sync" "github.com/network-quality/goresponsiveness/utilities" - "golang.org/x/exp/constraints" ) type WindowSeriesDuration int @@ -28,7 +29,7 @@ const ( WindowOnly WindowSeriesDuration = iota ) -type WindowSeries[Data any, Bucket constraints.Ordered] interface { +type WindowSeries[Data any, Bucket utilities.Number] interface { fmt.Stringer Reserve(b Bucket) error @@ -42,15 +43,25 @@ type WindowSeries[Data any, Bucket constraints.Ordered] interface { Complete() bool GetType() WindowSeriesDuration + ExtractBoundedSeries() WindowSeries[Data, Bucket] + Append(appended *WindowSeries[Data, Bucket]) + BoundedAppend(appended *WindowSeries[Data, Bucket]) + + GetBucketBounds() (Bucket, Bucket) + + SetTrimmingBucketBounds(Bucket, Bucket) + ResetTrimmingBucketBounds() } -type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct { - windowSize int - data []utilities.Pair[Bucket, utilities.Optional[Data]] - latestIndex int - empty bool - lock sync.RWMutex +type windowSeriesWindowOnlyImpl[Data any, Bucket utilities.Number] struct { + windowSize int + data []utilities.Pair[Bucket, utilities.Optional[Data]] + latestIndex int // invariant: newest data is there. + empty bool + lock sync.RWMutex + lowerTrimmingBound utilities.Optional[Bucket] + upperTrimmingBound utilities.Optional[Bucket] } /* @@ -80,6 +91,13 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error { return nil } +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) BucketBounds() (Bucket, Bucket) { + newestBucket := wsi.data[wsi.latestIndex].First + oldestBucket := wsi.data[wsi.nextIndex(wsi.latestIndex)].First + + return oldestBucket, newestBucket +} + func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) error { wsi.lock.Lock() defer wsi.lock.Unlock() @@ -150,15 +168,28 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optio } result := make([]utilities.Optional[Data], wsi.windowSize) + var lowerTrimmingBound, upperTrimmingBound Bucket + hasBounds := false + if utilities.IsSome(wsi.lowerTrimmingBound) { + hasBounds = true + lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound) + upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound) + result = make([]utilities.Optional[Data], + int(upperTrimmingBound-lowerTrimmingBound)+1) + } + if wsi.empty { return result } iterator := wsi.latestIndex parallelIterator := 0 for { - result[parallelIterator] = wsi.data[iterator].Second + if !hasBounds || (lowerTrimmingBound <= wsi.data[iterator].First && + wsi.data[iterator].First <= upperTrimmingBound) { + result[parallelIterator] = wsi.data[iterator].Second + parallelIterator++ + } iterator = wsi.previousIndex(iterator) - parallelIterator++ if iterator == wsi.latestIndex { break } @@ -166,6 +197,32 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optio return result } +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ExtractBoundedSeries() WindowSeries[Data, Bucket] { + wsi.lock.Lock() + defer wsi.lock.Unlock() + + result := NewWindowSeries[Data, Bucket](WindowOnly, wsi.windowSize) + + var lowerTrimmingBound, upperTrimmingBound Bucket + hasBounds := false + if utilities.IsSome(wsi.lowerTrimmingBound) { + hasBounds = true + lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound) + upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound) + } + + for _, v := range wsi.data { + if hasBounds && (v.First < lowerTrimmingBound || v.First > upperTrimmingBound) { + continue + } + result.Reserve(v.First) + if utilities.IsSome(v.Second) { + result.Fill(v.First, utilities.GetSome(v.Second)) + } + } + return result +} + func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] { wsi.lock.Lock() defer wsi.lock.Unlock() @@ -198,11 +255,31 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string { return result } +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) SetTrimmingBucketBounds(lower Bucket, upper Bucket) { + wsi.lowerTrimmingBound = utilities.Some[Bucket](lower) + wsi.upperTrimmingBound = utilities.Some[Bucket](upper) +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ResetTrimmingBucketBounds() { + wsi.lowerTrimmingBound = utilities.None[Bucket]() + wsi.upperTrimmingBound = utilities.None[Bucket]() +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetBucketBounds() (Bucket, Bucket) { + wsi.lock.Lock() + defer wsi.lock.Unlock() + return wsi.data[wsi.nextIndex(wsi.latestIndex)].First, wsi.data[wsi.latestIndex].First +} + func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) { - panic("") + panic("Append is unimplemented on a window-only Window Series") +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) BoundedAppend(appended *WindowSeries[Data, Bucket]) { + panic("BoundedAppend is unimplemented on a window-only Window Series") } -func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered]( +func newWindowSeriesWindowOnlyImpl[Data any, Bucket utilities.Number]( windowSize int, ) *windowSeriesWindowOnlyImpl[Data, Bucket] { result := windowSeriesWindowOnlyImpl[Data, Bucket]{windowSize: windowSize, latestIndex: 0, empty: true} @@ -216,17 +293,19 @@ func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered]( * End of WindowSeries interface methods. */ -type windowSeriesForeverImpl[Data any, Bucket constraints.Ordered] struct { - data []utilities.Pair[Bucket, utilities.Optional[Data]] - empty bool - lock sync.RWMutex +type windowSeriesForeverImpl[Data any, Bucket utilities.Number] struct { + data []utilities.Pair[Bucket, utilities.Optional[Data]] + empty bool + lock sync.RWMutex + lowerTrimmingBound utilities.Optional[Bucket] + upperTrimmingBound utilities.Optional[Bucket] } func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error { wsi.lock.Lock() defer wsi.lock.Unlock() if !wsi.empty && b <= wsi.data[len(wsi.data)-1].First { - fmt.Printf("reserving must be monotonically increasing") + fmt.Printf("reserving must be monotonically increasing: %v vs %v", b, wsi.data[len(wsi.data)-1].First) return fmt.Errorf("reserving must be monotonically increasing") } @@ -247,13 +326,53 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error { return fmt.Errorf("attempting to fill a bucket that does not exist") } +func (wsi *windowSeriesForeverImpl[Data, Bucket]) ExtractBoundedSeries() WindowSeries[Data, Bucket] { + wsi.lock.Lock() + defer wsi.lock.Unlock() + + var lowerTrimmingBound, upperTrimmingBound Bucket + hasBounds := false + if utilities.IsSome(wsi.lowerTrimmingBound) { + hasBounds = true + lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound) + upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound) + } + + result := NewWindowSeries[Data, Bucket](Forever, 0) + + for _, v := range wsi.data { + if hasBounds && (v.First < lowerTrimmingBound || v.First > upperTrimmingBound) { + continue + } + result.Reserve(v.First) + if utilities.IsSome(v.Second) { + result.Fill(v.First, utilities.GetSome(v.Second)) + } + } + return result +} + func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] { wsi.lock.Lock() defer wsi.lock.Unlock() result := make([]utilities.Optional[Data], len(wsi.data)) - for i, v := range utilities.Reverse(wsi.data) { - result[i] = v.Second + var lowerTrimmingBound, upperTrimmingBound Bucket + hasBounds := false + if utilities.IsSome(wsi.lowerTrimmingBound) { + hasBounds = true + lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound) + upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound) + result = make([]utilities.Optional[Data], + int(upperTrimmingBound-lowerTrimmingBound)+1) + } + + index := 0 + for _, v := range wsi.data { + if !hasBounds || (lowerTrimmingBound <= v.First && v.First <= upperTrimmingBound) { + result[index] = v.Second + index++ + } } return result @@ -289,8 +408,12 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetType() WindowSeriesDuration return Forever } -func newWindowSeriesForeverImpl[Data any, Bucket constraints.Ordered]() *windowSeriesForeverImpl[Data, Bucket] { - result := windowSeriesForeverImpl[Data, Bucket]{empty: true} +func newWindowSeriesForeverImpl[Data any, Bucket utilities.Number]() *windowSeriesForeverImpl[Data, Bucket] { + result := windowSeriesForeverImpl[Data, Bucket]{ + empty: true, + lowerTrimmingBound: utilities.None[Bucket](), + upperTrimmingBound: utilities.None[Bucket](), + } result.data = nil @@ -319,6 +442,32 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string { return result } +func (wsi *windowSeriesForeverImpl[Data, Bucket]) SetTrimmingBucketBounds(lower Bucket, upper Bucket) { + wsi.lowerTrimmingBound = utilities.Some[Bucket](lower) + wsi.upperTrimmingBound = utilities.Some[Bucket](upper) +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) ResetTrimmingBucketBounds() { + wsi.lowerTrimmingBound = utilities.None[Bucket]() + wsi.upperTrimmingBound = utilities.None[Bucket]() +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetBucketBounds() (Bucket, Bucket) { + wsi.lock.Lock() + defer wsi.lock.Unlock() + if wsi.empty { + return 0, 0 + } + return wsi.data[0].First, wsi.data[len(wsi.data)-1].First +} + +// Sort the data according to the bucket ids (ascending) +func (wsi *windowSeriesForeverImpl[Data, Bucket]) sort() { + slices.SortFunc(wsi.data, func(left, right utilities.Pair[Bucket, utilities.Optional[Data]]) int { + return cmp.Compare(left.First, right.First) + }) +} + func (wsi *windowSeriesForeverImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) { result, ok := (*appended).(*windowSeriesForeverImpl[Data, Bucket]) if !ok { @@ -330,19 +479,56 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Append(appended *WindowSeries[ defer result.lock.Unlock() wsi.data = append(wsi.data, result.data...) + // Because the series that we are appending in may have overlapping buckets, + // we will sort them to maintain the invariant that the data items are sorted + // by bucket ids (increasing). + wsi.sort() +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) BoundedAppend(appended *WindowSeries[Data, Bucket]) { + result, ok := (*appended).(*windowSeriesForeverImpl[Data, Bucket]) + if !ok { + panic("Cannot merge a forever window series with a non-forever window series.") + } + wsi.lock.Lock() + defer wsi.lock.Unlock() + result.lock.Lock() + defer result.lock.Unlock() + + if utilities.IsNone(result.lowerTrimmingBound) || + utilities.IsNone(result.upperTrimmingBound) { + wsi.sort() + wsi.data = append(wsi.data, result.data...) + return + } else { + lowerTrimmingBound := utilities.GetSome(result.lowerTrimmingBound) + upperTrimmingBound := utilities.GetSome(result.upperTrimmingBound) + + toAppend := utilities.Filter(result.data, func( + element utilities.Pair[Bucket, utilities.Optional[Data]], + ) bool { + bucket := element.First + return lowerTrimmingBound <= bucket && bucket <= upperTrimmingBound + }) + wsi.data = append(wsi.data, toAppend...) + } + // Because the series that we are appending in may have overlapping buckets, + // we will sort them to maintain the invariant that the data items are sorted + // by bucket ids (increasing). + wsi.sort() } /* * End of WindowSeries interface methods. */ -func NewWindowSeries[Data any, Bucket constraints.Ordered](tipe WindowSeriesDuration, windowSize int) WindowSeries[Data, Bucket] { +func NewWindowSeries[Data any, Bucket utilities.Number](tipe WindowSeriesDuration, windowSize int) WindowSeries[Data, Bucket] { if tipe == WindowOnly { return newWindowSeriesWindowOnlyImpl[Data, Bucket](windowSize) } else if tipe == Forever { return newWindowSeriesForeverImpl[Data, Bucket]() } - panic("") + panic("Attempting to create a new window series with an invalid type.") } type NumericBucketGenerator[T utilities.Number] struct { diff --git a/series/series_test.go b/series/series_test.go index 54ddfcc..82778fc 100644 --- a/series/series_test.go +++ b/series/series_test.go @@ -109,7 +109,7 @@ func Test_ForeverValues(test *testing.T) { shouldMatch = append(shouldMatch, utilities.Some[float64](previous)) } - if !reflect.DeepEqual(utilities.Reverse(shouldMatch), series.GetValues()) { + if !reflect.DeepEqual(shouldMatch, series.GetValues()) { test.Fatalf("Values() on infinite mathematical series does not work.") } } @@ -941,3 +941,181 @@ func Test_ForeverLocking(test *testing.T) { test.Fatalf("Mutual exclusion checks did not properly lock out parallel ForEach operations.") } } + +func Test_ForeverGetBucketBounds(test *testing.T) { + series := newWindowSeriesForeverImpl[float64, int]() + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + + series.Fill(1, 8) + series.Fill(2, 9) + series.Fill(3, 10) + series.Fill(4, 11) + series.Fill(5, 12) + + lower, upper := series.GetBucketBounds() + if lower != 1 || upper != 5 { + test.Fatalf("expected a lower of 1 and upper of 5; got %v and %v, respectively!\n", lower, upper) + } +} + +func Test_WindowGetBucketBounds(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[float64, int](3) + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + + series.Fill(3, 10) + series.Fill(4, 11) + series.Fill(5, 12) + + lower, upper := series.GetBucketBounds() + if lower != 3 || upper != 5 { + test.Fatalf("expected a lower of 3 and upper of 5; got %v and %v, respectively!\n", lower, upper) + } +} + +func Test_ForeverBucketBoundsEmpty(test *testing.T) { + series := newWindowSeriesForeverImpl[float64, int]() + + lower, upper := series.GetBucketBounds() + if lower != 0 || upper != 0 { + test.Fatalf("expected a lower of 0 and upper of 0; got %v and %v, respectively!\n", lower, upper) + } +} + +func Test_ForeverTrimmingBucketBounds(test *testing.T) { + series := newWindowSeriesForeverImpl[float64, int]() + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + + series.Fill(1, 8) + series.Fill(2, 9) + series.Fill(3, 10) + series.Fill(4, 11) + series.Fill(5, 12) + + series.SetTrimmingBucketBounds(3, 5) + + trimmedValues := series.GetValues() + if len(trimmedValues) != 3 { + test.Fatalf("Expected that the list would have 3 elements but it only had %v!\n", len(trimmedValues)) + } + if utilities.GetSome(trimmedValues[0]) != 10 || utilities.GetSome(trimmedValues[1]) != 11 || + utilities.GetSome(trimmedValues[2]) != 12 { + test.Fatalf("Expected values are not the actual values.\n") + } +} + +func Test_WindowTrimmingBucketBounds(test *testing.T) { + series := newWindowSeriesWindowOnlyImpl[float64, int](5) + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + + series.Fill(1, 8) + series.Fill(2, 9) + series.Fill(3, 10) + series.Fill(4, 11) + series.Fill(5, 12) + + series.SetTrimmingBucketBounds(3, 5) + + trimmedValues := series.GetValues() + if len(trimmedValues) != 3 { + test.Fatalf("Expected that the list would have 3 elements but it only had %v!\n", len(trimmedValues)) + } + if utilities.GetSome(trimmedValues[0]) != 12 || utilities.GetSome(trimmedValues[1]) != 11 || + utilities.GetSome(trimmedValues[2]) != 10 { + test.Fatalf("Expected values are not the actual values.\n") + } +} + +func Test_ForeverBoundedAppend(test *testing.T) { + appending_series := NewWindowSeries[float64, int](Forever, 0) + baseSeries := NewWindowSeries[float64, int](Forever, 0) + + baseSeries.Reserve(1) + baseSeries.Fill(1, 1) + baseSeries.Reserve(2) + baseSeries.Fill(2, 2) + baseSeries.Reserve(3) + baseSeries.Fill(3, 3) + + appending_series.Reserve(4) + appending_series.Reserve(5) + appending_series.Reserve(6) + appending_series.Reserve(7) + appending_series.Reserve(8) + + appending_series.Fill(4, 8) + appending_series.Fill(5, 9) + appending_series.Fill(6, 10) + appending_series.Fill(7, 11) + appending_series.Fill(8, 12) + + appending_series.SetTrimmingBucketBounds(6, 8) + + baseSeries.BoundedAppend(&appending_series) + + if len(baseSeries.GetValues()) != 6 { + test.Fatalf("The base series should have 6 values, but it actually has %v (bounded test)", len(baseSeries.GetValues())) + } + + baseSeriesValues := baseSeries.GetValues() + if utilities.GetSome(baseSeriesValues[0]) != 1 || + utilities.GetSome(baseSeriesValues[1]) != 2 || + utilities.GetSome(baseSeriesValues[2]) != 3 || + utilities.GetSome(baseSeriesValues[3]) != 10 || + utilities.GetSome(baseSeriesValues[4]) != 11 || + utilities.GetSome(baseSeriesValues[5]) != 12 { + test.Fatalf("The values that should be in a series with bounded append are not there.") + } + baseSeries = NewWindowSeries[float64, int](Forever, 0) + baseSeries.Append(&appending_series) + if len(baseSeries.GetValues()) != 5 { + test.Fatalf("The base series should have 5 values, but it actually has %v (unbounded test)", len(baseSeries.GetValues())) + } +} + +func Test_ForeverExtractBounded(test *testing.T) { + series := NewWindowSeries[float64, int](Forever, 0) + + series.Reserve(1) + series.Reserve(2) + series.Reserve(3) + series.Reserve(4) + series.Reserve(5) + + series.Fill(1, 8) + series.Fill(2, 9) + series.Fill(3, 10) + series.Fill(4, 11) + series.Fill(5, 12) + + extracted := series.ExtractBoundedSeries() + + if len(extracted.GetValues()) != 5 { + test.Fatalf("Expected the extracted list to have 5 values but it really has %v", len(extracted.GetValues())) + } + + series.SetTrimmingBucketBounds(3, 5) + extracted = series.ExtractBoundedSeries() + if len(extracted.GetValues()) != 3 { + test.Fatalf("Expected the extracted list to have 3 values but it really has %v", len(extracted.GetValues())) + } +} diff --git a/series/statistics.go b/series/statistics.go index 5d11164..5ff4559 100644 --- a/series/statistics.go +++ b/series/statistics.go @@ -16,10 +16,9 @@ package series import ( "github.com/network-quality/goresponsiveness/utilities" - "golang.org/x/exp/constraints" ) -func SeriesStandardDeviation[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket]) (bool, float64) { +func SeriesStandardDeviation[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket]) (bool, float64) { complete := s.Complete() inputValues := s.GetValues() @@ -32,7 +31,7 @@ func SeriesStandardDeviation[Data utilities.Number, Bucket constraints.Ordered]( return complete, utilities.CalculateStandardDeviation[Data](values) } -func Percentile[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], p uint) (bool, Data) { +func Percentile[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket], p uint) (bool, Data) { complete := s.Complete() inputValues := s.GetValues() @@ -45,7 +44,7 @@ func Percentile[Data utilities.Number, Bucket constraints.Ordered](s WindowSerie return complete, utilities.CalculatePercentile(values, p) } -func AllSequentialIncreasesLessThan[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], limit float64, +func AllSequentialIncreasesLessThan[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket], limit float64, ) (bool, bool, float64) { complete := s.Complete() @@ -60,7 +59,7 @@ func AllSequentialIncreasesLessThan[Data utilities.Number, Bucket constraints.Or return complete, result, actualLimit } -func CalculateAverage[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket]) (bool, float64) { +func CalculateAverage[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket]) (bool, float64) { complete := s.Complete() inputValues := s.GetValues() @@ -73,7 +72,7 @@ func CalculateAverage[Data utilities.Number, Bucket constraints.Ordered](s Windo return complete, utilities.CalculateAverage(values) } -func TrimmedMean[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], trim int) (bool, float64, []Data) { +func TrimmedMean[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket], trim int) (bool, float64, []Data) { complete := s.Complete() inputValues := s.GetValues() diff --git a/stabilizer/algorithm.go b/stabilizer/algorithm.go index 86b4bb6..42dc0c6 100644 --- a/stabilizer/algorithm.go +++ b/stabilizer/algorithm.go @@ -24,7 +24,7 @@ import ( "golang.org/x/exp/constraints" ) -type MeasurementStablizer[Data constraints.Float | constraints.Integer, Bucket constraints.Ordered] struct { +type MeasurementStablizer[Data constraints.Float | constraints.Integer, Bucket utilities.Number] struct { // The number of instantaneous measurements in the current interval could be infinite (Forever). instantaneousses series.WindowSeries[Data, Bucket] // There are a fixed, finite number of aggregates (WindowOnly). @@ -58,7 +58,7 @@ type MeasurementStablizer[Data constraints.Float | constraints.Integer, Bucket c // If the calculated standard deviation of *those* values is less than SDT, we declare // stability. -func NewStabilizer[Data constraints.Float | constraints.Integer, Bucket constraints.Ordered]( +func NewStabilizer[Data constraints.Float | constraints.Integer, Bucket utilities.Number]( mad int, sdt float64, trimmingLevel uint, @@ -199,6 +199,7 @@ func (r3 *MeasurementStablizer[Data, Bucket]) IsStable() bool { r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) { if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) { md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md) + _, average := series.CalculateAverage(md) averages = append(averages, average) } @@ -214,3 +215,30 @@ func (r3 *MeasurementStablizer[Data, Bucket]) IsStable() bool { return isStable } + +func (r3 *MeasurementStablizer[Data, Bucket]) GetBounds() (Bucket, Bucket) { + r3.m.Lock() + defer r3.m.Unlock() + + haveMinimum := false + + lowerBound := Bucket(0) + upperBound := Bucket(0) + + r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) { + if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) { + md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md) + currentAggregateLowerBound, currentAggregateUpperBound := md.GetBucketBounds() + + if !haveMinimum { + lowerBound = currentAggregateLowerBound + haveMinimum = true + } else { + lowerBound = min(lowerBound, currentAggregateLowerBound) + } + upperBound = max(upperBound, currentAggregateUpperBound) + } + }) + + return lowerBound, upperBound +} -- cgit v1.2.3