summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2024-01-26 20:46:05 -0500
committerWill Hawkins <[email protected]>2024-01-26 20:46:05 -0500
commite60066f572464da3a6f55999a48b1d38cf6423d6 (patch)
treeb6e5c38f0ece8e22aac7414b43c24c4632fe4690
parent65b039e33717ee43620363704cb3daa304a5e724 (diff)
[Bugfix] Only probes collected during stable MAD count
For calculating the final RPM, only those probes that are sent/received during the stable MAD should count. Signed-off-by: Will Hawkins <[email protected]>
-rw-r--r--direction/direction.go2
-rw-r--r--networkQuality.go91
-rw-r--r--rpm/calculations.go11
-rw-r--r--series/series.go232
-rw-r--r--series/series_test.go180
-rw-r--r--series/statistics.go11
-rw-r--r--stabilizer/algorithm.go32
7 files changed, 503 insertions, 56 deletions
diff --git a/direction/direction.go b/direction/direction.go
index 9b41b26..2cc9fa9 100644
--- a/direction/direction.go
+++ b/direction/direction.go
@@ -45,4 +45,6 @@ type Direction struct {
ThroughputActivityCtx *context.Context
ThroughputActivityCtxCancel *context.CancelFunc
FormattedResults string
+ LowerBucketBound uint64
+ UpperBucketBound uint64
}
diff --git a/networkQuality.go b/networkQuality.go
index f22cc32..e88a000 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -568,6 +568,7 @@ func main() {
baselineProbeDebugging,
)
+ lowerBucketBound, upperBucketBound := uint64(0), uint64(0)
baseline_responsiveness_timeout:
for !baselineStableResponsiveness {
select {
@@ -642,13 +643,23 @@ func main() {
baselineNetworkActivityCtxCancel()
baselineProberOperatorCtxCancel()
- baselineRpm = rpm.CalculateRpm(baselineFauxSelfDownloadRtts,
- baselineForeignDownloadRtts, specParameters.TrimmedMeanPct, specParameters.Percentile)
+ lowerBucketBound, upperBucketBound = baselineResponsivenessStabilizer.GetBounds()
- fmt.Printf("Baseline RPM: %5.0f (P%d)\n", baselineRpm.PNRpm, specParameters.Percentile)
- fmt.Printf("Baseline RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n",
- baselineRpm.MeanRpm, specParameters.TrimmedMeanPct)
+ if *debugCliFlag {
+ fmt.Printf("Baseline responsiveness stablizer bucket bounds: (%v, %v)\n", lowerBucketBound, upperBucketBound)
+ }
+
+ for _, label := range []string{"Unbounded ", ""} {
+ baselineRpm = rpm.CalculateRpm(baselineFauxSelfDownloadRtts,
+ baselineForeignDownloadRtts, specParameters.TrimmedMeanPct, specParameters.Percentile)
+ fmt.Printf("%vBaseline RPM: %5.0f (P%d)\n", label, baselineRpm.PNRpm, specParameters.Percentile)
+ fmt.Printf("%vBaseline RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n",
+ label, baselineRpm.MeanRpm, specParameters.TrimmedMeanPct)
+
+ baselineFauxSelfDownloadRtts.SetTrimmingBucketBounds(lowerBucketBound, upperBucketBound)
+ baselineForeignDownloadRtts.SetTrimmingBucketBounds(lowerBucketBound, upperBucketBound)
+ }
}
var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil
@@ -911,6 +922,9 @@ func main() {
}
case <-timeoutChannel:
{
+ if *debugCliFlag {
+ fmt.Printf("%v responsiveness seeking interval has expired.\n", direction.DirectionLabel)
+ }
break responsiveness_timeout
}
case <-stabilityCheckTimeChannel:
@@ -933,7 +947,7 @@ func main() {
if *debugCliFlag {
fmt.Printf(
- "Responsiveness is instantaneously %s.\n",
+ "%v responsiveness is instantaneously %s.\n", direction.DirectionLabel,
utilities.Conditional(direction.StableResponsiveness, "stable", "unstable"))
}
@@ -1019,6 +1033,8 @@ func main() {
(*direction.ThroughputActivityCtxCancel)()
}
+ direction.LowerBucketBound, direction.UpperBucketBound = responsivenessStabilizer.GetBounds()
+
// Add a header to the results
direction.FormattedResults += fmt.Sprintf("%v:\n", direction.DirectionLabel)
@@ -1037,13 +1053,29 @@ func main() {
direction.FormattedResults += utilities.IndentOutput(
fmt.Sprintf("%v", extendedStats.Repr()), 1, "\t")
}
- directionResult := rpm.CalculateRpm(direction.SelfRtts, direction.ForeignRtts,
- specParameters.TrimmedMeanPct, specParameters.Percentile)
+
+ var directionResult *rpm.Rpm[float64] = nil
+ for _, label := range []string{"Unbounded ", ""} {
+ directionResult = rpm.CalculateRpm(direction.SelfRtts, direction.ForeignRtts,
+ specParameters.TrimmedMeanPct, specParameters.Percentile)
+ if *debugCliFlag {
+ direction.FormattedResults += utilities.IndentOutput(
+ fmt.Sprintf("%vRPM Calculation Statistics:\n", label), 1, "\t")
+ direction.FormattedResults += utilities.IndentOutput(directionResult.ToString(), 2, "\t")
+ }
+
+ direction.SelfRtts.SetTrimmingBucketBounds(
+ direction.LowerBucketBound, direction.UpperBucketBound)
+ direction.ForeignRtts.SetTrimmingBucketBounds(
+ direction.LowerBucketBound, direction.UpperBucketBound)
+ }
+
if *debugCliFlag {
direction.FormattedResults += utilities.IndentOutput(
- "RPM Calculation Statistics:\n", 1, "\t")
- direction.FormattedResults += utilities.IndentOutput(directionResult.ToString(), 2, "\t")
+ fmt.Sprintf("Bucket bounds: (%v, %v)\n",
+ direction.LowerBucketBound, direction.UpperBucketBound), 1, "\t")
}
+
if *printQualityAttenuation {
direction.FormattedResults += utilities.IndentOutput(
"Quality Attenuation Statistics:\n", 1, "\t")
@@ -1121,7 +1153,7 @@ func main() {
direction.GranularThroughputDataLogger.Close()
if *debugCliFlag {
- fmt.Printf("In debugging mode, we will cool down between tests.\n")
+ fmt.Printf("In debugging mode, we will cool down after tests.\n")
time.Sleep(constants.CooldownPeriod)
fmt.Printf("Done cooling down.\n")
}
@@ -1168,18 +1200,39 @@ func main() {
fmt.Printf("========\n")
}
- allSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
- allForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
+ if *debugCliFlag {
+ unboundedAllSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
+ unboundedAllForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
+
+ unboundedAllSelfRtts.Append(&downloadDirection.SelfRtts)
+ unboundedAllSelfRtts.Append(&uploadDirection.SelfRtts)
+ unboundedAllForeignRtts.Append(&downloadDirection.ForeignRtts)
+ unboundedAllForeignRtts.Append(&uploadDirection.ForeignRtts)
+
+ result := rpm.CalculateRpm(unboundedAllSelfRtts, unboundedAllForeignRtts,
+ specParameters.TrimmedMeanPct, specParameters.Percentile)
+
+ fmt.Printf("(Unbounded Final RPM Calculation stats):\n%v\n", result.ToString())
+
+ fmt.Printf("Unbounded Final RPM: %.0f (P%d)\n", result.PNRpm, specParameters.Percentile)
+ fmt.Printf("Unbounded Final RPM: %.0f (Single-Sided %v%% Trimmed Mean)\n",
+ result.MeanRpm, specParameters.TrimmedMeanPct)
+ fmt.Printf("\n")
+ }
+
+ boundedAllSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
+ boundedAllForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
- allSelfRtts.Append(&downloadDirection.SelfRtts)
- allSelfRtts.Append(&uploadDirection.SelfRtts)
- allForeignRtts.Append(&downloadDirection.ForeignRtts)
- allForeignRtts.Append(&uploadDirection.ForeignRtts)
+ boundedAllSelfRtts.BoundedAppend(&downloadDirection.SelfRtts)
+ boundedAllSelfRtts.BoundedAppend(&uploadDirection.SelfRtts)
+ boundedAllForeignRtts.BoundedAppend(&downloadDirection.ForeignRtts)
+ boundedAllForeignRtts.BoundedAppend(&uploadDirection.ForeignRtts)
- result := rpm.CalculateRpm(allSelfRtts, allForeignRtts, specParameters.TrimmedMeanPct, specParameters.Percentile)
+ result := rpm.CalculateRpm(boundedAllSelfRtts, boundedAllForeignRtts,
+ specParameters.TrimmedMeanPct, specParameters.Percentile)
if *debugCliFlag {
- fmt.Printf("(Final RPM Calculation stats): %v\n", result.ToString())
+ fmt.Printf("(Final RPM Calculation stats):\n%v\n", result.ToString())
}
fmt.Printf("Final RPM: %.0f (P%d)\n", result.PNRpm, specParameters.Percentile)
diff --git a/rpm/calculations.go b/rpm/calculations.go
index f3619dc..851e168 100644
--- a/rpm/calculations.go
+++ b/rpm/calculations.go
@@ -19,7 +19,6 @@ import (
"github.com/network-quality/goresponsiveness/series"
"github.com/network-quality/goresponsiveness/utilities"
- "golang.org/x/exp/constraints"
)
type Rpm[Data utilities.Number] struct {
@@ -35,7 +34,7 @@ type Rpm[Data utilities.Number] struct {
MeanRpm float64
}
-func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered](
+func CalculateRpm[Data utilities.Number, Bucket utilities.Number](
selfRtts series.WindowSeries[Data, Bucket], aggregatedForeignRtts series.WindowSeries[Data, Bucket], trimming uint, percentile uint,
) *Rpm[Data] {
// There may be more than one round trip accumulated together. If that is the case,
@@ -56,12 +55,14 @@ func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered](
}
}
+ boundedSelfRtts := selfRtts.ExtractBoundedSeries()
+
// First, let's do a double-sided trim of the top/bottom 10% of our measurements.
- selfRttsTotalCount, _ := selfRtts.Count()
+ selfRttsTotalCount, _ := boundedSelfRtts.Count()
foreignRttsTotalCount, _ := foreignRtts.Count()
_, selfProbeRoundTripTimeMean, selfRttsTrimmed :=
- series.TrimmedMean(selfRtts, int(trimming))
+ series.TrimmedMean(boundedSelfRtts, int(trimming))
_, foreignProbeRoundTripTimeMean, foreignRttsTrimmed :=
series.TrimmedMean(foreignRtts, int(trimming))
@@ -69,7 +70,7 @@ func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered](
foreignRttsTrimmedCount := len(foreignRttsTrimmed)
// Second, let's do the P90 calculations.
- _, selfProbeRoundTripTimePN := series.Percentile(selfRtts, percentile)
+ _, selfProbeRoundTripTimePN := series.Percentile(boundedSelfRtts, percentile)
_, foreignProbeRoundTripTimePN := series.Percentile(foreignRtts, percentile)
// Note: The specification indicates that we want to calculate the foreign probes as such:
diff --git a/series/series.go b/series/series.go
index ef133d2..de0d031 100644
--- a/series/series.go
+++ b/series/series.go
@@ -14,11 +14,12 @@
package series
import (
+ "cmp"
"fmt"
+ "slices"
"sync"
"github.com/network-quality/goresponsiveness/utilities"
- "golang.org/x/exp/constraints"
)
type WindowSeriesDuration int
@@ -28,7 +29,7 @@ const (
WindowOnly WindowSeriesDuration = iota
)
-type WindowSeries[Data any, Bucket constraints.Ordered] interface {
+type WindowSeries[Data any, Bucket utilities.Number] interface {
fmt.Stringer
Reserve(b Bucket) error
@@ -42,15 +43,25 @@ type WindowSeries[Data any, Bucket constraints.Ordered] interface {
Complete() bool
GetType() WindowSeriesDuration
+ ExtractBoundedSeries() WindowSeries[Data, Bucket]
+
Append(appended *WindowSeries[Data, Bucket])
+ BoundedAppend(appended *WindowSeries[Data, Bucket])
+
+ GetBucketBounds() (Bucket, Bucket)
+
+ SetTrimmingBucketBounds(Bucket, Bucket)
+ ResetTrimmingBucketBounds()
}
-type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct {
- windowSize int
- data []utilities.Pair[Bucket, utilities.Optional[Data]]
- latestIndex int
- empty bool
- lock sync.RWMutex
+type windowSeriesWindowOnlyImpl[Data any, Bucket utilities.Number] struct {
+ windowSize int
+ data []utilities.Pair[Bucket, utilities.Optional[Data]]
+ latestIndex int // invariant: newest data is there.
+ empty bool
+ lock sync.RWMutex
+ lowerTrimmingBound utilities.Optional[Bucket]
+ upperTrimmingBound utilities.Optional[Bucket]
}
/*
@@ -80,6 +91,13 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error {
return nil
}
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) BucketBounds() (Bucket, Bucket) {
+ newestBucket := wsi.data[wsi.latestIndex].First
+ oldestBucket := wsi.data[wsi.nextIndex(wsi.latestIndex)].First
+
+ return oldestBucket, newestBucket
+}
+
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) error {
wsi.lock.Lock()
defer wsi.lock.Unlock()
@@ -150,15 +168,28 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optio
}
result := make([]utilities.Optional[Data], wsi.windowSize)
+ var lowerTrimmingBound, upperTrimmingBound Bucket
+ hasBounds := false
+ if utilities.IsSome(wsi.lowerTrimmingBound) {
+ hasBounds = true
+ lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound)
+ upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound)
+ result = make([]utilities.Optional[Data],
+ int(upperTrimmingBound-lowerTrimmingBound)+1)
+ }
+
if wsi.empty {
return result
}
iterator := wsi.latestIndex
parallelIterator := 0
for {
- result[parallelIterator] = wsi.data[iterator].Second
+ if !hasBounds || (lowerTrimmingBound <= wsi.data[iterator].First &&
+ wsi.data[iterator].First <= upperTrimmingBound) {
+ result[parallelIterator] = wsi.data[iterator].Second
+ parallelIterator++
+ }
iterator = wsi.previousIndex(iterator)
- parallelIterator++
if iterator == wsi.latestIndex {
break
}
@@ -166,6 +197,32 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optio
return result
}
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ExtractBoundedSeries() WindowSeries[Data, Bucket] {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
+
+ result := NewWindowSeries[Data, Bucket](WindowOnly, wsi.windowSize)
+
+ var lowerTrimmingBound, upperTrimmingBound Bucket
+ hasBounds := false
+ if utilities.IsSome(wsi.lowerTrimmingBound) {
+ hasBounds = true
+ lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound)
+ upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound)
+ }
+
+ for _, v := range wsi.data {
+ if hasBounds && (v.First < lowerTrimmingBound || v.First > upperTrimmingBound) {
+ continue
+ }
+ result.Reserve(v.First)
+ if utilities.IsSome(v.Second) {
+ result.Fill(v.First, utilities.GetSome(v.Second))
+ }
+ }
+ return result
+}
+
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] {
wsi.lock.Lock()
defer wsi.lock.Unlock()
@@ -198,11 +255,31 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string {
return result
}
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) SetTrimmingBucketBounds(lower Bucket, upper Bucket) {
+ wsi.lowerTrimmingBound = utilities.Some[Bucket](lower)
+ wsi.upperTrimmingBound = utilities.Some[Bucket](upper)
+}
+
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ResetTrimmingBucketBounds() {
+ wsi.lowerTrimmingBound = utilities.None[Bucket]()
+ wsi.upperTrimmingBound = utilities.None[Bucket]()
+}
+
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetBucketBounds() (Bucket, Bucket) {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
+ return wsi.data[wsi.nextIndex(wsi.latestIndex)].First, wsi.data[wsi.latestIndex].First
+}
+
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) {
- panic("")
+ panic("Append is unimplemented on a window-only Window Series")
+}
+
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) BoundedAppend(appended *WindowSeries[Data, Bucket]) {
+ panic("BoundedAppend is unimplemented on a window-only Window Series")
}
-func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered](
+func newWindowSeriesWindowOnlyImpl[Data any, Bucket utilities.Number](
windowSize int,
) *windowSeriesWindowOnlyImpl[Data, Bucket] {
result := windowSeriesWindowOnlyImpl[Data, Bucket]{windowSize: windowSize, latestIndex: 0, empty: true}
@@ -216,17 +293,19 @@ func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered](
* End of WindowSeries interface methods.
*/
-type windowSeriesForeverImpl[Data any, Bucket constraints.Ordered] struct {
- data []utilities.Pair[Bucket, utilities.Optional[Data]]
- empty bool
- lock sync.RWMutex
+type windowSeriesForeverImpl[Data any, Bucket utilities.Number] struct {
+ data []utilities.Pair[Bucket, utilities.Optional[Data]]
+ empty bool
+ lock sync.RWMutex
+ lowerTrimmingBound utilities.Optional[Bucket]
+ upperTrimmingBound utilities.Optional[Bucket]
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error {
wsi.lock.Lock()
defer wsi.lock.Unlock()
if !wsi.empty && b <= wsi.data[len(wsi.data)-1].First {
- fmt.Printf("reserving must be monotonically increasing")
+ fmt.Printf("reserving must be monotonically increasing: %v vs %v", b, wsi.data[len(wsi.data)-1].First)
return fmt.Errorf("reserving must be monotonically increasing")
}
@@ -247,13 +326,53 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error {
return fmt.Errorf("attempting to fill a bucket that does not exist")
}
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) ExtractBoundedSeries() WindowSeries[Data, Bucket] {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
+
+ var lowerTrimmingBound, upperTrimmingBound Bucket
+ hasBounds := false
+ if utilities.IsSome(wsi.lowerTrimmingBound) {
+ hasBounds = true
+ lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound)
+ upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound)
+ }
+
+ result := NewWindowSeries[Data, Bucket](Forever, 0)
+
+ for _, v := range wsi.data {
+ if hasBounds && (v.First < lowerTrimmingBound || v.First > upperTrimmingBound) {
+ continue
+ }
+ result.Reserve(v.First)
+ if utilities.IsSome(v.Second) {
+ result.Fill(v.First, utilities.GetSome(v.Second))
+ }
+ }
+ return result
+}
+
func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] {
wsi.lock.Lock()
defer wsi.lock.Unlock()
result := make([]utilities.Optional[Data], len(wsi.data))
- for i, v := range utilities.Reverse(wsi.data) {
- result[i] = v.Second
+ var lowerTrimmingBound, upperTrimmingBound Bucket
+ hasBounds := false
+ if utilities.IsSome(wsi.lowerTrimmingBound) {
+ hasBounds = true
+ lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound)
+ upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound)
+ result = make([]utilities.Optional[Data],
+ int(upperTrimmingBound-lowerTrimmingBound)+1)
+ }
+
+ index := 0
+ for _, v := range wsi.data {
+ if !hasBounds || (lowerTrimmingBound <= v.First && v.First <= upperTrimmingBound) {
+ result[index] = v.Second
+ index++
+ }
}
return result
@@ -289,8 +408,12 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetType() WindowSeriesDuration
return Forever
}
-func newWindowSeriesForeverImpl[Data any, Bucket constraints.Ordered]() *windowSeriesForeverImpl[Data, Bucket] {
- result := windowSeriesForeverImpl[Data, Bucket]{empty: true}
+func newWindowSeriesForeverImpl[Data any, Bucket utilities.Number]() *windowSeriesForeverImpl[Data, Bucket] {
+ result := windowSeriesForeverImpl[Data, Bucket]{
+ empty: true,
+ lowerTrimmingBound: utilities.None[Bucket](),
+ upperTrimmingBound: utilities.None[Bucket](),
+ }
result.data = nil
@@ -319,6 +442,32 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string {
return result
}
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) SetTrimmingBucketBounds(lower Bucket, upper Bucket) {
+ wsi.lowerTrimmingBound = utilities.Some[Bucket](lower)
+ wsi.upperTrimmingBound = utilities.Some[Bucket](upper)
+}
+
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) ResetTrimmingBucketBounds() {
+ wsi.lowerTrimmingBound = utilities.None[Bucket]()
+ wsi.upperTrimmingBound = utilities.None[Bucket]()
+}
+
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetBucketBounds() (Bucket, Bucket) {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
+ if wsi.empty {
+ return 0, 0
+ }
+ return wsi.data[0].First, wsi.data[len(wsi.data)-1].First
+}
+
+// Sort the data according to the bucket ids (ascending)
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) sort() {
+ slices.SortFunc(wsi.data, func(left, right utilities.Pair[Bucket, utilities.Optional[Data]]) int {
+ return cmp.Compare(left.First, right.First)
+ })
+}
+
func (wsi *windowSeriesForeverImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) {
result, ok := (*appended).(*windowSeriesForeverImpl[Data, Bucket])
if !ok {
@@ -330,19 +479,56 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Append(appended *WindowSeries[
defer result.lock.Unlock()
wsi.data = append(wsi.data, result.data...)
+ // Because the series that we are appending in may have overlapping buckets,
+ // we will sort them to maintain the invariant that the data items are sorted
+ // by bucket ids (increasing).
+ wsi.sort()
+}
+
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) BoundedAppend(appended *WindowSeries[Data, Bucket]) {
+ result, ok := (*appended).(*windowSeriesForeverImpl[Data, Bucket])
+ if !ok {
+ panic("Cannot merge a forever window series with a non-forever window series.")
+ }
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
+ result.lock.Lock()
+ defer result.lock.Unlock()
+
+ if utilities.IsNone(result.lowerTrimmingBound) ||
+ utilities.IsNone(result.upperTrimmingBound) {
+ wsi.sort()
+ wsi.data = append(wsi.data, result.data...)
+ return
+ } else {
+ lowerTrimmingBound := utilities.GetSome(result.lowerTrimmingBound)
+ upperTrimmingBound := utilities.GetSome(result.upperTrimmingBound)
+
+ toAppend := utilities.Filter(result.data, func(
+ element utilities.Pair[Bucket, utilities.Optional[Data]],
+ ) bool {
+ bucket := element.First
+ return lowerTrimmingBound <= bucket && bucket <= upperTrimmingBound
+ })
+ wsi.data = append(wsi.data, toAppend...)
+ }
+ // Because the series that we are appending in may have overlapping buckets,
+ // we will sort them to maintain the invariant that the data items are sorted
+ // by bucket ids (increasing).
+ wsi.sort()
}
/*
* End of WindowSeries interface methods.
*/
-func NewWindowSeries[Data any, Bucket constraints.Ordered](tipe WindowSeriesDuration, windowSize int) WindowSeries[Data, Bucket] {
+func NewWindowSeries[Data any, Bucket utilities.Number](tipe WindowSeriesDuration, windowSize int) WindowSeries[Data, Bucket] {
if tipe == WindowOnly {
return newWindowSeriesWindowOnlyImpl[Data, Bucket](windowSize)
} else if tipe == Forever {
return newWindowSeriesForeverImpl[Data, Bucket]()
}
- panic("")
+ panic("Attempting to create a new window series with an invalid type.")
}
type NumericBucketGenerator[T utilities.Number] struct {
diff --git a/series/series_test.go b/series/series_test.go
index 54ddfcc..82778fc 100644
--- a/series/series_test.go
+++ b/series/series_test.go
@@ -109,7 +109,7 @@ func Test_ForeverValues(test *testing.T) {
shouldMatch = append(shouldMatch, utilities.Some[float64](previous))
}
- if !reflect.DeepEqual(utilities.Reverse(shouldMatch), series.GetValues()) {
+ if !reflect.DeepEqual(shouldMatch, series.GetValues()) {
test.Fatalf("Values() on infinite mathematical series does not work.")
}
}
@@ -941,3 +941,181 @@ func Test_ForeverLocking(test *testing.T) {
test.Fatalf("Mutual exclusion checks did not properly lock out parallel ForEach operations.")
}
}
+
+func Test_ForeverGetBucketBounds(test *testing.T) {
+ series := newWindowSeriesForeverImpl[float64, int]()
+
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+
+ series.Fill(1, 8)
+ series.Fill(2, 9)
+ series.Fill(3, 10)
+ series.Fill(4, 11)
+ series.Fill(5, 12)
+
+ lower, upper := series.GetBucketBounds()
+ if lower != 1 || upper != 5 {
+ test.Fatalf("expected a lower of 1 and upper of 5; got %v and %v, respectively!\n", lower, upper)
+ }
+}
+
+func Test_WindowGetBucketBounds(test *testing.T) {
+ series := newWindowSeriesWindowOnlyImpl[float64, int](3)
+
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+
+ series.Fill(3, 10)
+ series.Fill(4, 11)
+ series.Fill(5, 12)
+
+ lower, upper := series.GetBucketBounds()
+ if lower != 3 || upper != 5 {
+ test.Fatalf("expected a lower of 3 and upper of 5; got %v and %v, respectively!\n", lower, upper)
+ }
+}
+
+func Test_ForeverBucketBoundsEmpty(test *testing.T) {
+ series := newWindowSeriesForeverImpl[float64, int]()
+
+ lower, upper := series.GetBucketBounds()
+ if lower != 0 || upper != 0 {
+ test.Fatalf("expected a lower of 0 and upper of 0; got %v and %v, respectively!\n", lower, upper)
+ }
+}
+
+func Test_ForeverTrimmingBucketBounds(test *testing.T) {
+ series := newWindowSeriesForeverImpl[float64, int]()
+
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+
+ series.Fill(1, 8)
+ series.Fill(2, 9)
+ series.Fill(3, 10)
+ series.Fill(4, 11)
+ series.Fill(5, 12)
+
+ series.SetTrimmingBucketBounds(3, 5)
+
+ trimmedValues := series.GetValues()
+ if len(trimmedValues) != 3 {
+ test.Fatalf("Expected that the list would have 3 elements but it only had %v!\n", len(trimmedValues))
+ }
+ if utilities.GetSome(trimmedValues[0]) != 10 || utilities.GetSome(trimmedValues[1]) != 11 ||
+ utilities.GetSome(trimmedValues[2]) != 12 {
+ test.Fatalf("Expected values are not the actual values.\n")
+ }
+}
+
+func Test_WindowTrimmingBucketBounds(test *testing.T) {
+ series := newWindowSeriesWindowOnlyImpl[float64, int](5)
+
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+
+ series.Fill(1, 8)
+ series.Fill(2, 9)
+ series.Fill(3, 10)
+ series.Fill(4, 11)
+ series.Fill(5, 12)
+
+ series.SetTrimmingBucketBounds(3, 5)
+
+ trimmedValues := series.GetValues()
+ if len(trimmedValues) != 3 {
+ test.Fatalf("Expected that the list would have 3 elements but it only had %v!\n", len(trimmedValues))
+ }
+ if utilities.GetSome(trimmedValues[0]) != 12 || utilities.GetSome(trimmedValues[1]) != 11 ||
+ utilities.GetSome(trimmedValues[2]) != 10 {
+ test.Fatalf("Expected values are not the actual values.\n")
+ }
+}
+
+func Test_ForeverBoundedAppend(test *testing.T) {
+ appending_series := NewWindowSeries[float64, int](Forever, 0)
+ baseSeries := NewWindowSeries[float64, int](Forever, 0)
+
+ baseSeries.Reserve(1)
+ baseSeries.Fill(1, 1)
+ baseSeries.Reserve(2)
+ baseSeries.Fill(2, 2)
+ baseSeries.Reserve(3)
+ baseSeries.Fill(3, 3)
+
+ appending_series.Reserve(4)
+ appending_series.Reserve(5)
+ appending_series.Reserve(6)
+ appending_series.Reserve(7)
+ appending_series.Reserve(8)
+
+ appending_series.Fill(4, 8)
+ appending_series.Fill(5, 9)
+ appending_series.Fill(6, 10)
+ appending_series.Fill(7, 11)
+ appending_series.Fill(8, 12)
+
+ appending_series.SetTrimmingBucketBounds(6, 8)
+
+ baseSeries.BoundedAppend(&appending_series)
+
+ if len(baseSeries.GetValues()) != 6 {
+ test.Fatalf("The base series should have 6 values, but it actually has %v (bounded test)", len(baseSeries.GetValues()))
+ }
+
+ baseSeriesValues := baseSeries.GetValues()
+ if utilities.GetSome(baseSeriesValues[0]) != 1 ||
+ utilities.GetSome(baseSeriesValues[1]) != 2 ||
+ utilities.GetSome(baseSeriesValues[2]) != 3 ||
+ utilities.GetSome(baseSeriesValues[3]) != 10 ||
+ utilities.GetSome(baseSeriesValues[4]) != 11 ||
+ utilities.GetSome(baseSeriesValues[5]) != 12 {
+ test.Fatalf("The values that should be in a series with bounded append are not there.")
+ }
+ baseSeries = NewWindowSeries[float64, int](Forever, 0)
+ baseSeries.Append(&appending_series)
+ if len(baseSeries.GetValues()) != 5 {
+ test.Fatalf("The base series should have 5 values, but it actually has %v (unbounded test)", len(baseSeries.GetValues()))
+ }
+}
+
+func Test_ForeverExtractBounded(test *testing.T) {
+ series := NewWindowSeries[float64, int](Forever, 0)
+
+ series.Reserve(1)
+ series.Reserve(2)
+ series.Reserve(3)
+ series.Reserve(4)
+ series.Reserve(5)
+
+ series.Fill(1, 8)
+ series.Fill(2, 9)
+ series.Fill(3, 10)
+ series.Fill(4, 11)
+ series.Fill(5, 12)
+
+ extracted := series.ExtractBoundedSeries()
+
+ if len(extracted.GetValues()) != 5 {
+ test.Fatalf("Expected the extracted list to have 5 values but it really has %v", len(extracted.GetValues()))
+ }
+
+ series.SetTrimmingBucketBounds(3, 5)
+ extracted = series.ExtractBoundedSeries()
+ if len(extracted.GetValues()) != 3 {
+ test.Fatalf("Expected the extracted list to have 3 values but it really has %v", len(extracted.GetValues()))
+ }
+}
diff --git a/series/statistics.go b/series/statistics.go
index 5d11164..5ff4559 100644
--- a/series/statistics.go
+++ b/series/statistics.go
@@ -16,10 +16,9 @@ package series
import (
"github.com/network-quality/goresponsiveness/utilities"
- "golang.org/x/exp/constraints"
)
-func SeriesStandardDeviation[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket]) (bool, float64) {
+func SeriesStandardDeviation[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket]) (bool, float64) {
complete := s.Complete()
inputValues := s.GetValues()
@@ -32,7 +31,7 @@ func SeriesStandardDeviation[Data utilities.Number, Bucket constraints.Ordered](
return complete, utilities.CalculateStandardDeviation[Data](values)
}
-func Percentile[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], p uint) (bool, Data) {
+func Percentile[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket], p uint) (bool, Data) {
complete := s.Complete()
inputValues := s.GetValues()
@@ -45,7 +44,7 @@ func Percentile[Data utilities.Number, Bucket constraints.Ordered](s WindowSerie
return complete, utilities.CalculatePercentile(values, p)
}
-func AllSequentialIncreasesLessThan[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], limit float64,
+func AllSequentialIncreasesLessThan[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket], limit float64,
) (bool, bool, float64) {
complete := s.Complete()
@@ -60,7 +59,7 @@ func AllSequentialIncreasesLessThan[Data utilities.Number, Bucket constraints.Or
return complete, result, actualLimit
}
-func CalculateAverage[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket]) (bool, float64) {
+func CalculateAverage[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket]) (bool, float64) {
complete := s.Complete()
inputValues := s.GetValues()
@@ -73,7 +72,7 @@ func CalculateAverage[Data utilities.Number, Bucket constraints.Ordered](s Windo
return complete, utilities.CalculateAverage(values)
}
-func TrimmedMean[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], trim int) (bool, float64, []Data) {
+func TrimmedMean[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket], trim int) (bool, float64, []Data) {
complete := s.Complete()
inputValues := s.GetValues()
diff --git a/stabilizer/algorithm.go b/stabilizer/algorithm.go
index 86b4bb6..42dc0c6 100644
--- a/stabilizer/algorithm.go
+++ b/stabilizer/algorithm.go
@@ -24,7 +24,7 @@ import (
"golang.org/x/exp/constraints"
)
-type MeasurementStablizer[Data constraints.Float | constraints.Integer, Bucket constraints.Ordered] struct {
+type MeasurementStablizer[Data constraints.Float | constraints.Integer, Bucket utilities.Number] struct {
// The number of instantaneous measurements in the current interval could be infinite (Forever).
instantaneousses series.WindowSeries[Data, Bucket]
// There are a fixed, finite number of aggregates (WindowOnly).
@@ -58,7 +58,7 @@ type MeasurementStablizer[Data constraints.Float | constraints.Integer, Bucket c
// If the calculated standard deviation of *those* values is less than SDT, we declare
// stability.
-func NewStabilizer[Data constraints.Float | constraints.Integer, Bucket constraints.Ordered](
+func NewStabilizer[Data constraints.Float | constraints.Integer, Bucket utilities.Number](
mad int,
sdt float64,
trimmingLevel uint,
@@ -199,6 +199,7 @@ func (r3 *MeasurementStablizer[Data, Bucket]) IsStable() bool {
r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) {
if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) {
md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md)
+
_, average := series.CalculateAverage(md)
averages = append(averages, average)
}
@@ -214,3 +215,30 @@ func (r3 *MeasurementStablizer[Data, Bucket]) IsStable() bool {
return isStable
}
+
+func (r3 *MeasurementStablizer[Data, Bucket]) GetBounds() (Bucket, Bucket) {
+ r3.m.Lock()
+ defer r3.m.Unlock()
+
+ haveMinimum := false
+
+ lowerBound := Bucket(0)
+ upperBound := Bucket(0)
+
+ r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) {
+ if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) {
+ md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md)
+ currentAggregateLowerBound, currentAggregateUpperBound := md.GetBucketBounds()
+
+ if !haveMinimum {
+ lowerBound = currentAggregateLowerBound
+ haveMinimum = true
+ } else {
+ lowerBound = min(lowerBound, currentAggregateLowerBound)
+ }
+ upperBound = max(upperBound, currentAggregateUpperBound)
+ }
+ })
+
+ return lowerBound, upperBound
+}