diff options
| author | Will Hawkins <[email protected]> | 2024-01-26 20:46:05 -0500 | 
|---|---|---|
| committer | Will Hawkins <[email protected]> | 2024-01-26 20:46:05 -0500 | 
| commit | e60066f572464da3a6f55999a48b1d38cf6423d6 (patch) | |
| tree | b6e5c38f0ece8e22aac7414b43c24c4632fe4690 | |
| parent | 65b039e33717ee43620363704cb3daa304a5e724 (diff) | |
[Bugfix] Only probes collected during stable MAD count
For calculating the final RPM, only those probes that are sent/received
during the stable MAD should count.
Signed-off-by: Will Hawkins <[email protected]>
| -rw-r--r-- | direction/direction.go | 2 | ||||
| -rw-r--r-- | networkQuality.go | 91 | ||||
| -rw-r--r-- | rpm/calculations.go | 11 | ||||
| -rw-r--r-- | series/series.go | 232 | ||||
| -rw-r--r-- | series/series_test.go | 180 | ||||
| -rw-r--r-- | series/statistics.go | 11 | ||||
| -rw-r--r-- | stabilizer/algorithm.go | 32 | 
7 files changed, 503 insertions, 56 deletions
diff --git a/direction/direction.go b/direction/direction.go index 9b41b26..2cc9fa9 100644 --- a/direction/direction.go +++ b/direction/direction.go @@ -45,4 +45,6 @@ type Direction struct {  	ThroughputActivityCtx             *context.Context  	ThroughputActivityCtxCancel       *context.CancelFunc  	FormattedResults                  string +	LowerBucketBound                  uint64 +	UpperBucketBound                  uint64  } diff --git a/networkQuality.go b/networkQuality.go index f22cc32..e88a000 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -568,6 +568,7 @@ func main() {  			baselineProbeDebugging,  		) +		lowerBucketBound, upperBucketBound := uint64(0), uint64(0)  	baseline_responsiveness_timeout:  		for !baselineStableResponsiveness {  			select { @@ -642,13 +643,23 @@ func main() {  		baselineNetworkActivityCtxCancel()  		baselineProberOperatorCtxCancel() -		baselineRpm = rpm.CalculateRpm(baselineFauxSelfDownloadRtts, -			baselineForeignDownloadRtts, specParameters.TrimmedMeanPct, specParameters.Percentile) +		lowerBucketBound, upperBucketBound = baselineResponsivenessStabilizer.GetBounds() -		fmt.Printf("Baseline RPM: %5.0f (P%d)\n", baselineRpm.PNRpm, specParameters.Percentile) -		fmt.Printf("Baseline RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", -			baselineRpm.MeanRpm, specParameters.TrimmedMeanPct) +		if *debugCliFlag { +			fmt.Printf("Baseline responsiveness stablizer bucket bounds: (%v, %v)\n", lowerBucketBound, upperBucketBound) +		} + +		for _, label := range []string{"Unbounded ", ""} { +			baselineRpm = rpm.CalculateRpm(baselineFauxSelfDownloadRtts, +				baselineForeignDownloadRtts, specParameters.TrimmedMeanPct, specParameters.Percentile) +			fmt.Printf("%vBaseline RPM: %5.0f (P%d)\n", label, baselineRpm.PNRpm, specParameters.Percentile) +			fmt.Printf("%vBaseline RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", +				label, baselineRpm.MeanRpm, specParameters.TrimmedMeanPct) + +			baselineFauxSelfDownloadRtts.SetTrimmingBucketBounds(lowerBucketBound, upperBucketBound) +			baselineForeignDownloadRtts.SetTrimmingBucketBounds(lowerBucketBound, upperBucketBound) +		}  	}  	var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil @@ -911,6 +922,9 @@ func main() {  					}  				case <-timeoutChannel:  					{ +						if *debugCliFlag { +							fmt.Printf("%v responsiveness seeking interval has expired.\n", direction.DirectionLabel) +						}  						break responsiveness_timeout  					}  				case <-stabilityCheckTimeChannel: @@ -933,7 +947,7 @@ func main() {  						if *debugCliFlag {  							fmt.Printf( -								"Responsiveness is instantaneously %s.\n", +								"%v responsiveness is instantaneously %s.\n", direction.DirectionLabel,  								utilities.Conditional(direction.StableResponsiveness, "stable", "unstable"))  						} @@ -1019,6 +1033,8 @@ func main() {  				(*direction.ThroughputActivityCtxCancel)()  			} +			direction.LowerBucketBound, direction.UpperBucketBound = responsivenessStabilizer.GetBounds() +  			// Add a header to the results  			direction.FormattedResults += fmt.Sprintf("%v:\n", direction.DirectionLabel) @@ -1037,13 +1053,29 @@ func main() {  				direction.FormattedResults += utilities.IndentOutput(  					fmt.Sprintf("%v", extendedStats.Repr()), 1, "\t")  			} -			directionResult := rpm.CalculateRpm(direction.SelfRtts, direction.ForeignRtts, -				specParameters.TrimmedMeanPct, specParameters.Percentile) + +			var directionResult *rpm.Rpm[float64] = nil +			for _, label := range []string{"Unbounded ", ""} { +				directionResult = rpm.CalculateRpm(direction.SelfRtts, direction.ForeignRtts, +					specParameters.TrimmedMeanPct, specParameters.Percentile) +				if *debugCliFlag { +					direction.FormattedResults += utilities.IndentOutput( +						fmt.Sprintf("%vRPM Calculation Statistics:\n", label), 1, "\t") +					direction.FormattedResults += utilities.IndentOutput(directionResult.ToString(), 2, "\t") +				} + +				direction.SelfRtts.SetTrimmingBucketBounds( +					direction.LowerBucketBound, direction.UpperBucketBound) +				direction.ForeignRtts.SetTrimmingBucketBounds( +					direction.LowerBucketBound, direction.UpperBucketBound) +			} +  			if *debugCliFlag {  				direction.FormattedResults += utilities.IndentOutput( -					"RPM Calculation Statistics:\n", 1, "\t") -				direction.FormattedResults += utilities.IndentOutput(directionResult.ToString(), 2, "\t") +					fmt.Sprintf("Bucket bounds: (%v, %v)\n", +						direction.LowerBucketBound, direction.UpperBucketBound), 1, "\t")  			} +  			if *printQualityAttenuation {  				direction.FormattedResults += utilities.IndentOutput(  					"Quality Attenuation Statistics:\n", 1, "\t") @@ -1121,7 +1153,7 @@ func main() {  			direction.GranularThroughputDataLogger.Close()  			if *debugCliFlag { -				fmt.Printf("In debugging mode, we will cool down between tests.\n") +				fmt.Printf("In debugging mode, we will cool down after tests.\n")  				time.Sleep(constants.CooldownPeriod)  				fmt.Printf("Done cooling down.\n")  			} @@ -1168,18 +1200,39 @@ func main() {  		fmt.Printf("========\n")  	} -	allSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) -	allForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) +	if *debugCliFlag { +		unboundedAllSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) +		unboundedAllForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) + +		unboundedAllSelfRtts.Append(&downloadDirection.SelfRtts) +		unboundedAllSelfRtts.Append(&uploadDirection.SelfRtts) +		unboundedAllForeignRtts.Append(&downloadDirection.ForeignRtts) +		unboundedAllForeignRtts.Append(&uploadDirection.ForeignRtts) + +		result := rpm.CalculateRpm(unboundedAllSelfRtts, unboundedAllForeignRtts, +			specParameters.TrimmedMeanPct, specParameters.Percentile) + +		fmt.Printf("(Unbounded Final RPM Calculation stats):\n%v\n", result.ToString()) + +		fmt.Printf("Unbounded Final RPM: %.0f (P%d)\n", result.PNRpm, specParameters.Percentile) +		fmt.Printf("Unbounded Final RPM: %.0f (Single-Sided %v%% Trimmed Mean)\n", +			result.MeanRpm, specParameters.TrimmedMeanPct) +		fmt.Printf("\n") +	} + +	boundedAllSelfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) +	boundedAllForeignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) -	allSelfRtts.Append(&downloadDirection.SelfRtts) -	allSelfRtts.Append(&uploadDirection.SelfRtts) -	allForeignRtts.Append(&downloadDirection.ForeignRtts) -	allForeignRtts.Append(&uploadDirection.ForeignRtts) +	boundedAllSelfRtts.BoundedAppend(&downloadDirection.SelfRtts) +	boundedAllSelfRtts.BoundedAppend(&uploadDirection.SelfRtts) +	boundedAllForeignRtts.BoundedAppend(&downloadDirection.ForeignRtts) +	boundedAllForeignRtts.BoundedAppend(&uploadDirection.ForeignRtts) -	result := rpm.CalculateRpm(allSelfRtts, allForeignRtts, specParameters.TrimmedMeanPct, specParameters.Percentile) +	result := rpm.CalculateRpm(boundedAllSelfRtts, boundedAllForeignRtts, +		specParameters.TrimmedMeanPct, specParameters.Percentile)  	if *debugCliFlag { -		fmt.Printf("(Final RPM Calculation stats): %v\n", result.ToString()) +		fmt.Printf("(Final RPM Calculation stats):\n%v\n", result.ToString())  	}  	fmt.Printf("Final RPM: %.0f (P%d)\n", result.PNRpm, specParameters.Percentile) diff --git a/rpm/calculations.go b/rpm/calculations.go index f3619dc..851e168 100644 --- a/rpm/calculations.go +++ b/rpm/calculations.go @@ -19,7 +19,6 @@ import (  	"github.com/network-quality/goresponsiveness/series"  	"github.com/network-quality/goresponsiveness/utilities" -	"golang.org/x/exp/constraints"  )  type Rpm[Data utilities.Number] struct { @@ -35,7 +34,7 @@ type Rpm[Data utilities.Number] struct {  	MeanRpm             float64  } -func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered]( +func CalculateRpm[Data utilities.Number, Bucket utilities.Number](  	selfRtts series.WindowSeries[Data, Bucket], aggregatedForeignRtts series.WindowSeries[Data, Bucket], trimming uint, percentile uint,  ) *Rpm[Data] {  	// There may be more than one round trip accumulated together. If that is the case, @@ -56,12 +55,14 @@ func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered](  		}  	} +	boundedSelfRtts := selfRtts.ExtractBoundedSeries() +  	// First, let's do a double-sided trim of the top/bottom 10% of our measurements. -	selfRttsTotalCount, _ := selfRtts.Count() +	selfRttsTotalCount, _ := boundedSelfRtts.Count()  	foreignRttsTotalCount, _ := foreignRtts.Count()  	_, selfProbeRoundTripTimeMean, selfRttsTrimmed := -		series.TrimmedMean(selfRtts, int(trimming)) +		series.TrimmedMean(boundedSelfRtts, int(trimming))  	_, foreignProbeRoundTripTimeMean, foreignRttsTrimmed :=  		series.TrimmedMean(foreignRtts, int(trimming)) @@ -69,7 +70,7 @@ func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered](  	foreignRttsTrimmedCount := len(foreignRttsTrimmed)  	// Second, let's do the P90 calculations. -	_, selfProbeRoundTripTimePN := series.Percentile(selfRtts, percentile) +	_, selfProbeRoundTripTimePN := series.Percentile(boundedSelfRtts, percentile)  	_, foreignProbeRoundTripTimePN := series.Percentile(foreignRtts, percentile)  	// Note: The specification indicates that we want to calculate the foreign probes as such: diff --git a/series/series.go b/series/series.go index ef133d2..de0d031 100644 --- a/series/series.go +++ b/series/series.go @@ -14,11 +14,12 @@  package series  import ( +	"cmp"  	"fmt" +	"slices"  	"sync"  	"github.com/network-quality/goresponsiveness/utilities" -	"golang.org/x/exp/constraints"  )  type WindowSeriesDuration int @@ -28,7 +29,7 @@ const (  	WindowOnly WindowSeriesDuration = iota  ) -type WindowSeries[Data any, Bucket constraints.Ordered] interface { +type WindowSeries[Data any, Bucket utilities.Number] interface {  	fmt.Stringer  	Reserve(b Bucket) error @@ -42,15 +43,25 @@ type WindowSeries[Data any, Bucket constraints.Ordered] interface {  	Complete() bool  	GetType() WindowSeriesDuration +	ExtractBoundedSeries() WindowSeries[Data, Bucket] +  	Append(appended *WindowSeries[Data, Bucket]) +	BoundedAppend(appended *WindowSeries[Data, Bucket]) + +	GetBucketBounds() (Bucket, Bucket) + +	SetTrimmingBucketBounds(Bucket, Bucket) +	ResetTrimmingBucketBounds()  } -type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct { -	windowSize  int -	data        []utilities.Pair[Bucket, utilities.Optional[Data]] -	latestIndex int -	empty       bool -	lock        sync.RWMutex +type windowSeriesWindowOnlyImpl[Data any, Bucket utilities.Number] struct { +	windowSize         int +	data               []utilities.Pair[Bucket, utilities.Optional[Data]] +	latestIndex        int // invariant: newest data is there. +	empty              bool +	lock               sync.RWMutex +	lowerTrimmingBound utilities.Optional[Bucket] +	upperTrimmingBound utilities.Optional[Bucket]  }  /* @@ -80,6 +91,13 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error {  	return nil  } +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) BucketBounds() (Bucket, Bucket) { +	newestBucket := wsi.data[wsi.latestIndex].First +	oldestBucket := wsi.data[wsi.nextIndex(wsi.latestIndex)].First + +	return oldestBucket, newestBucket +} +  func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) error {  	wsi.lock.Lock()  	defer wsi.lock.Unlock() @@ -150,15 +168,28 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optio  	}  	result := make([]utilities.Optional[Data], wsi.windowSize) +	var lowerTrimmingBound, upperTrimmingBound Bucket +	hasBounds := false +	if utilities.IsSome(wsi.lowerTrimmingBound) { +		hasBounds = true +		lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound) +		upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound) +		result = make([]utilities.Optional[Data], +			int(upperTrimmingBound-lowerTrimmingBound)+1) +	} +  	if wsi.empty {  		return result  	}  	iterator := wsi.latestIndex  	parallelIterator := 0  	for { -		result[parallelIterator] = wsi.data[iterator].Second +		if !hasBounds || (lowerTrimmingBound <= wsi.data[iterator].First && +			wsi.data[iterator].First <= upperTrimmingBound) { +			result[parallelIterator] = wsi.data[iterator].Second +			parallelIterator++ +		}  		iterator = wsi.previousIndex(iterator) -		parallelIterator++  		if iterator == wsi.latestIndex {  			break  		} @@ -166,6 +197,32 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optio  	return result  } +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ExtractBoundedSeries() WindowSeries[Data, Bucket] { +	wsi.lock.Lock() +	defer wsi.lock.Unlock() + +	result := NewWindowSeries[Data, Bucket](WindowOnly, wsi.windowSize) + +	var lowerTrimmingBound, upperTrimmingBound Bucket +	hasBounds := false +	if utilities.IsSome(wsi.lowerTrimmingBound) { +		hasBounds = true +		lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound) +		upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound) +	} + +	for _, v := range wsi.data { +		if hasBounds && (v.First < lowerTrimmingBound || v.First > upperTrimmingBound) { +			continue +		} +		result.Reserve(v.First) +		if utilities.IsSome(v.Second) { +			result.Fill(v.First, utilities.GetSome(v.Second)) +		} +	} +	return result +} +  func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] {  	wsi.lock.Lock()  	defer wsi.lock.Unlock() @@ -198,11 +255,31 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string {  	return result  } +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) SetTrimmingBucketBounds(lower Bucket, upper Bucket) { +	wsi.lowerTrimmingBound = utilities.Some[Bucket](lower) +	wsi.upperTrimmingBound = utilities.Some[Bucket](upper) +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ResetTrimmingBucketBounds() { +	wsi.lowerTrimmingBound = utilities.None[Bucket]() +	wsi.upperTrimmingBound = utilities.None[Bucket]() +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetBucketBounds() (Bucket, Bucket) { +	wsi.lock.Lock() +	defer wsi.lock.Unlock() +	return wsi.data[wsi.nextIndex(wsi.latestIndex)].First, wsi.data[wsi.latestIndex].First +} +  func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) { -	panic("") +	panic("Append is unimplemented on a window-only Window Series") +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) BoundedAppend(appended *WindowSeries[Data, Bucket]) { +	panic("BoundedAppend is unimplemented on a window-only Window Series")  } -func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered]( +func newWindowSeriesWindowOnlyImpl[Data any, Bucket utilities.Number](  	windowSize int,  ) *windowSeriesWindowOnlyImpl[Data, Bucket] {  	result := windowSeriesWindowOnlyImpl[Data, Bucket]{windowSize: windowSize, latestIndex: 0, empty: true} @@ -216,17 +293,19 @@ func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered](   * End of WindowSeries interface methods.   */ -type windowSeriesForeverImpl[Data any, Bucket constraints.Ordered] struct { -	data  []utilities.Pair[Bucket, utilities.Optional[Data]] -	empty bool -	lock  sync.RWMutex +type windowSeriesForeverImpl[Data any, Bucket utilities.Number] struct { +	data               []utilities.Pair[Bucket, utilities.Optional[Data]] +	empty              bool +	lock               sync.RWMutex +	lowerTrimmingBound utilities.Optional[Bucket] +	upperTrimmingBound utilities.Optional[Bucket]  }  func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error {  	wsi.lock.Lock()  	defer wsi.lock.Unlock()  	if !wsi.empty && b <= wsi.data[len(wsi.data)-1].First { -		fmt.Printf("reserving must be monotonically increasing") +		fmt.Printf("reserving must be monotonically increasing: %v vs %v", b, wsi.data[len(wsi.data)-1].First)  		return fmt.Errorf("reserving must be monotonically increasing")  	} @@ -247,13 +326,53 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error {  	return fmt.Errorf("attempting to fill a bucket that does not exist")  } +func (wsi *windowSeriesForeverImpl[Data, Bucket]) ExtractBoundedSeries() WindowSeries[Data, Bucket] { +	wsi.lock.Lock() +	defer wsi.lock.Unlock() + +	var lowerTrimmingBound, upperTrimmingBound Bucket +	hasBounds := false +	if utilities.IsSome(wsi.lowerTrimmingBound) { +		hasBounds = true +		lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound) +		upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound) +	} + +	result := NewWindowSeries[Data, Bucket](Forever, 0) + +	for _, v := range wsi.data { +		if hasBounds && (v.First < lowerTrimmingBound || v.First > upperTrimmingBound) { +			continue +		} +		result.Reserve(v.First) +		if utilities.IsSome(v.Second) { +			result.Fill(v.First, utilities.GetSome(v.Second)) +		} +	} +	return result +} +  func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] {  	wsi.lock.Lock()  	defer wsi.lock.Unlock()  	result := make([]utilities.Optional[Data], len(wsi.data)) -	for i, v := range utilities.Reverse(wsi.data) { -		result[i] = v.Second +	var lowerTrimmingBound, upperTrimmingBound Bucket +	hasBounds := false +	if utilities.IsSome(wsi.lowerTrimmingBound) { +		hasBounds = true +		lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound) +		upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound) +		result = make([]utilities.Optional[Data], +			int(upperTrimmingBound-lowerTrimmingBound)+1) +	} + +	index := 0 +	for _, v := range wsi.data { +		if !hasBounds || (lowerTrimmingBound <= v.First && v.First <= upperTrimmingBound) { +			result[index] = v.Second +			index++ +		}  	}  	return result @@ -289,8 +408,12 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetType() WindowSeriesDuration  	return Forever  } -func newWindowSeriesForeverImpl[Data any, Bucket constraints.Ordered]() *windowSeriesForeverImpl[Data, Bucket] { -	result := windowSeriesForeverImpl[Data, Bucket]{empty: true} +func newWindowSeriesForeverImpl[Data any, Bucket utilities.Number]() *windowSeriesForeverImpl[Data, Bucket] { +	result := windowSeriesForeverImpl[Data, Bucket]{ +		empty:              true, +		lowerTrimmingBound: utilities.None[Bucket](), +		upperTrimmingBound: utilities.None[Bucket](), +	}  	result.data = nil @@ -319,6 +442,32 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string {  	return result  } +func (wsi *windowSeriesForeverImpl[Data, Bucket]) SetTrimmingBucketBounds(lower Bucket, upper Bucket) { +	wsi.lowerTrimmingBound = utilities.Some[Bucket](lower) +	wsi.upperTrimmingBound = utilities.Some[Bucket](upper) +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) ResetTrimmingBucketBounds() { +	wsi.lowerTrimmingBound = utilities.None[Bucket]() +	wsi.upperTrimmingBound = utilities.None[Bucket]() +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetBucketBounds() (Bucket, Bucket) { +	wsi.lock.Lock() +	defer wsi.lock.Unlock() +	if wsi.empty { +		return 0, 0 +	} +	return wsi.data[0].First, wsi.data[len(wsi.data)-1].First +} + +// Sort the data according to the bucket ids (ascending) +func (wsi *windowSeriesForeverImpl[Data, Bucket]) sort() { +	slices.SortFunc(wsi.data, func(left, right utilities.Pair[Bucket, utilities.Optional[Data]]) int { +		return cmp.Compare(left.First, right.First) +	}) +} +  func (wsi *windowSeriesForeverImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) {  	result, ok := (*appended).(*windowSeriesForeverImpl[Data, Bucket])  	if !ok { @@ -330,19 +479,56 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Append(appended *WindowSeries[  	defer result.lock.Unlock()  	wsi.data = append(wsi.data, result.data...) +	// Because the series that we are appending in may have overlapping buckets, +	// we will sort them to maintain the invariant that the data items are sorted +	// by bucket ids (increasing). +	wsi.sort() +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) BoundedAppend(appended *WindowSeries[Data, Bucket]) { +	result, ok := (*appended).(*windowSeriesForeverImpl[Data, Bucket]) +	if !ok { +		panic("Cannot merge a forever window series with a non-forever window series.") +	} +	wsi.lock.Lock() +	defer wsi.lock.Unlock() +	result.lock.Lock() +	defer result.lock.Unlock() + +	if utilities.IsNone(result.lowerTrimmingBound) || +		utilities.IsNone(result.upperTrimmingBound) { +		wsi.sort() +		wsi.data = append(wsi.data, result.data...) +		return +	} else { +		lowerTrimmingBound := utilities.GetSome(result.lowerTrimmingBound) +		upperTrimmingBound := utilities.GetSome(result.upperTrimmingBound) + +		toAppend := utilities.Filter(result.data, func( +			element utilities.Pair[Bucket, utilities.Optional[Data]], +		) bool { +			bucket := element.First +			return lowerTrimmingBound <= bucket && bucket <= upperTrimmingBound +		}) +		wsi.data = append(wsi.data, toAppend...) +	} +	// Because the series that we are appending in may have overlapping buckets, +	// we will sort them to maintain the invariant that the data items are sorted +	// by bucket ids (increasing). +	wsi.sort()  }  /*   * End of WindowSeries interface methods.   */ -func NewWindowSeries[Data any, Bucket constraints.Ordered](tipe WindowSeriesDuration, windowSize int) WindowSeries[Data, Bucket] { +func NewWindowSeries[Data any, Bucket utilities.Number](tipe WindowSeriesDuration, windowSize int) WindowSeries[Data, Bucket] {  	if tipe == WindowOnly {  		return newWindowSeriesWindowOnlyImpl[Data, Bucket](windowSize)  	} else if tipe == Forever {  		return newWindowSeriesForeverImpl[Data, Bucket]()  	} -	panic("") +	panic("Attempting to create a new window series with an invalid type.")  }  type NumericBucketGenerator[T utilities.Number] struct { diff --git a/series/series_test.go b/series/series_test.go index 54ddfcc..82778fc 100644 --- a/series/series_test.go +++ b/series/series_test.go @@ -109,7 +109,7 @@ func Test_ForeverValues(test *testing.T) {  		shouldMatch = append(shouldMatch, utilities.Some[float64](previous))  	} -	if !reflect.DeepEqual(utilities.Reverse(shouldMatch), series.GetValues()) { +	if !reflect.DeepEqual(shouldMatch, series.GetValues()) {  		test.Fatalf("Values() on infinite mathematical series does not work.")  	}  } @@ -941,3 +941,181 @@ func Test_ForeverLocking(test *testing.T) {  		test.Fatalf("Mutual exclusion checks did not properly lock out parallel ForEach operations.")  	}  } + +func Test_ForeverGetBucketBounds(test *testing.T) { +	series := newWindowSeriesForeverImpl[float64, int]() + +	series.Reserve(1) +	series.Reserve(2) +	series.Reserve(3) +	series.Reserve(4) +	series.Reserve(5) + +	series.Fill(1, 8) +	series.Fill(2, 9) +	series.Fill(3, 10) +	series.Fill(4, 11) +	series.Fill(5, 12) + +	lower, upper := series.GetBucketBounds() +	if lower != 1 || upper != 5 { +		test.Fatalf("expected a lower of 1 and upper of 5; got %v and %v, respectively!\n", lower, upper) +	} +} + +func Test_WindowGetBucketBounds(test *testing.T) { +	series := newWindowSeriesWindowOnlyImpl[float64, int](3) + +	series.Reserve(1) +	series.Reserve(2) +	series.Reserve(3) +	series.Reserve(4) +	series.Reserve(5) + +	series.Fill(3, 10) +	series.Fill(4, 11) +	series.Fill(5, 12) + +	lower, upper := series.GetBucketBounds() +	if lower != 3 || upper != 5 { +		test.Fatalf("expected a lower of 3 and upper of 5; got %v and %v, respectively!\n", lower, upper) +	} +} + +func Test_ForeverBucketBoundsEmpty(test *testing.T) { +	series := newWindowSeriesForeverImpl[float64, int]() + +	lower, upper := series.GetBucketBounds() +	if lower != 0 || upper != 0 { +		test.Fatalf("expected a lower of 0 and upper of 0; got %v and %v, respectively!\n", lower, upper) +	} +} + +func Test_ForeverTrimmingBucketBounds(test *testing.T) { +	series := newWindowSeriesForeverImpl[float64, int]() + +	series.Reserve(1) +	series.Reserve(2) +	series.Reserve(3) +	series.Reserve(4) +	series.Reserve(5) + +	series.Fill(1, 8) +	series.Fill(2, 9) +	series.Fill(3, 10) +	series.Fill(4, 11) +	series.Fill(5, 12) + +	series.SetTrimmingBucketBounds(3, 5) + +	trimmedValues := series.GetValues() +	if len(trimmedValues) != 3 { +		test.Fatalf("Expected that the list would have 3 elements but it only had %v!\n", len(trimmedValues)) +	} +	if utilities.GetSome(trimmedValues[0]) != 10 || utilities.GetSome(trimmedValues[1]) != 11 || +		utilities.GetSome(trimmedValues[2]) != 12 { +		test.Fatalf("Expected values are not the actual values.\n") +	} +} + +func Test_WindowTrimmingBucketBounds(test *testing.T) { +	series := newWindowSeriesWindowOnlyImpl[float64, int](5) + +	series.Reserve(1) +	series.Reserve(2) +	series.Reserve(3) +	series.Reserve(4) +	series.Reserve(5) + +	series.Fill(1, 8) +	series.Fill(2, 9) +	series.Fill(3, 10) +	series.Fill(4, 11) +	series.Fill(5, 12) + +	series.SetTrimmingBucketBounds(3, 5) + +	trimmedValues := series.GetValues() +	if len(trimmedValues) != 3 { +		test.Fatalf("Expected that the list would have 3 elements but it only had %v!\n", len(trimmedValues)) +	} +	if utilities.GetSome(trimmedValues[0]) != 12 || utilities.GetSome(trimmedValues[1]) != 11 || +		utilities.GetSome(trimmedValues[2]) != 10 { +		test.Fatalf("Expected values are not the actual values.\n") +	} +} + +func Test_ForeverBoundedAppend(test *testing.T) { +	appending_series := NewWindowSeries[float64, int](Forever, 0) +	baseSeries := NewWindowSeries[float64, int](Forever, 0) + +	baseSeries.Reserve(1) +	baseSeries.Fill(1, 1) +	baseSeries.Reserve(2) +	baseSeries.Fill(2, 2) +	baseSeries.Reserve(3) +	baseSeries.Fill(3, 3) + +	appending_series.Reserve(4) +	appending_series.Reserve(5) +	appending_series.Reserve(6) +	appending_series.Reserve(7) +	appending_series.Reserve(8) + +	appending_series.Fill(4, 8) +	appending_series.Fill(5, 9) +	appending_series.Fill(6, 10) +	appending_series.Fill(7, 11) +	appending_series.Fill(8, 12) + +	appending_series.SetTrimmingBucketBounds(6, 8) + +	baseSeries.BoundedAppend(&appending_series) + +	if len(baseSeries.GetValues()) != 6 { +		test.Fatalf("The base series should have 6 values, but it actually has %v (bounded test)", len(baseSeries.GetValues())) +	} + +	baseSeriesValues := baseSeries.GetValues() +	if utilities.GetSome(baseSeriesValues[0]) != 1 || +		utilities.GetSome(baseSeriesValues[1]) != 2 || +		utilities.GetSome(baseSeriesValues[2]) != 3 || +		utilities.GetSome(baseSeriesValues[3]) != 10 || +		utilities.GetSome(baseSeriesValues[4]) != 11 || +		utilities.GetSome(baseSeriesValues[5]) != 12 { +		test.Fatalf("The values that should be in a series with bounded append are not there.") +	} +	baseSeries = NewWindowSeries[float64, int](Forever, 0) +	baseSeries.Append(&appending_series) +	if len(baseSeries.GetValues()) != 5 { +		test.Fatalf("The base series should have 5 values, but it actually has %v (unbounded test)", len(baseSeries.GetValues())) +	} +} + +func Test_ForeverExtractBounded(test *testing.T) { +	series := NewWindowSeries[float64, int](Forever, 0) + +	series.Reserve(1) +	series.Reserve(2) +	series.Reserve(3) +	series.Reserve(4) +	series.Reserve(5) + +	series.Fill(1, 8) +	series.Fill(2, 9) +	series.Fill(3, 10) +	series.Fill(4, 11) +	series.Fill(5, 12) + +	extracted := series.ExtractBoundedSeries() + +	if len(extracted.GetValues()) != 5 { +		test.Fatalf("Expected the extracted list to have 5 values but it really has %v", len(extracted.GetValues())) +	} + +	series.SetTrimmingBucketBounds(3, 5) +	extracted = series.ExtractBoundedSeries() +	if len(extracted.GetValues()) != 3 { +		test.Fatalf("Expected the extracted list to have 3 values but it really has %v", len(extracted.GetValues())) +	} +} diff --git a/series/statistics.go b/series/statistics.go index 5d11164..5ff4559 100644 --- a/series/statistics.go +++ b/series/statistics.go @@ -16,10 +16,9 @@ package series  import (  	"github.com/network-quality/goresponsiveness/utilities" -	"golang.org/x/exp/constraints"  ) -func SeriesStandardDeviation[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket]) (bool, float64) { +func SeriesStandardDeviation[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket]) (bool, float64) {  	complete := s.Complete()  	inputValues := s.GetValues() @@ -32,7 +31,7 @@ func SeriesStandardDeviation[Data utilities.Number, Bucket constraints.Ordered](  	return complete, utilities.CalculateStandardDeviation[Data](values)  } -func Percentile[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], p uint) (bool, Data) { +func Percentile[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket], p uint) (bool, Data) {  	complete := s.Complete()  	inputValues := s.GetValues() @@ -45,7 +44,7 @@ func Percentile[Data utilities.Number, Bucket constraints.Ordered](s WindowSerie  	return complete, utilities.CalculatePercentile(values, p)  } -func AllSequentialIncreasesLessThan[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], limit float64, +func AllSequentialIncreasesLessThan[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket], limit float64,  ) (bool, bool, float64) {  	complete := s.Complete() @@ -60,7 +59,7 @@ func AllSequentialIncreasesLessThan[Data utilities.Number, Bucket constraints.Or  	return complete, result, actualLimit  } -func CalculateAverage[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket]) (bool, float64) { +func CalculateAverage[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket]) (bool, float64) {  	complete := s.Complete()  	inputValues := s.GetValues() @@ -73,7 +72,7 @@ func CalculateAverage[Data utilities.Number, Bucket constraints.Ordered](s Windo  	return complete, utilities.CalculateAverage(values)  } -func TrimmedMean[Data utilities.Number, Bucket constraints.Ordered](s WindowSeries[Data, Bucket], trim int) (bool, float64, []Data) { +func TrimmedMean[Data utilities.Number, Bucket utilities.Number](s WindowSeries[Data, Bucket], trim int) (bool, float64, []Data) {  	complete := s.Complete()  	inputValues := s.GetValues() diff --git a/stabilizer/algorithm.go b/stabilizer/algorithm.go index 86b4bb6..42dc0c6 100644 --- a/stabilizer/algorithm.go +++ b/stabilizer/algorithm.go @@ -24,7 +24,7 @@ import (  	"golang.org/x/exp/constraints"  ) -type MeasurementStablizer[Data constraints.Float | constraints.Integer, Bucket constraints.Ordered] struct { +type MeasurementStablizer[Data constraints.Float | constraints.Integer, Bucket utilities.Number] struct {  	// The number of instantaneous measurements in the current interval could be infinite (Forever).  	instantaneousses series.WindowSeries[Data, Bucket]  	// There are a fixed, finite number of aggregates (WindowOnly). @@ -58,7 +58,7 @@ type MeasurementStablizer[Data constraints.Float | constraints.Integer, Bucket c  // If the calculated standard deviation of *those* values is less than SDT, we declare  // stability. -func NewStabilizer[Data constraints.Float | constraints.Integer, Bucket constraints.Ordered]( +func NewStabilizer[Data constraints.Float | constraints.Integer, Bucket utilities.Number](  	mad int,  	sdt float64,  	trimmingLevel uint, @@ -199,6 +199,7 @@ func (r3 *MeasurementStablizer[Data, Bucket]) IsStable() bool {  	r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) {  		if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) {  			md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md) +  			_, average := series.CalculateAverage(md)  			averages = append(averages, average)  		} @@ -214,3 +215,30 @@ func (r3 *MeasurementStablizer[Data, Bucket]) IsStable() bool {  	return isStable  } + +func (r3 *MeasurementStablizer[Data, Bucket]) GetBounds() (Bucket, Bucket) { +	r3.m.Lock() +	defer r3.m.Unlock() + +	haveMinimum := false + +	lowerBound := Bucket(0) +	upperBound := Bucket(0) + +	r3.aggregates.ForEach(func(b int, md *utilities.Optional[series.WindowSeries[Data, Bucket]]) { +		if utilities.IsSome[series.WindowSeries[Data, Bucket]](*md) { +			md := utilities.GetSome[series.WindowSeries[Data, Bucket]](*md) +			currentAggregateLowerBound, currentAggregateUpperBound := md.GetBucketBounds() + +			if !haveMinimum { +				lowerBound = currentAggregateLowerBound +				haveMinimum = true +			} else { +				lowerBound = min(lowerBound, currentAggregateLowerBound) +			} +			upperBound = max(upperBound, currentAggregateUpperBound) +		} +	}) + +	return lowerBound, upperBound +}  | 
