diff options
Diffstat (limited to 'series/series.go')
| -rw-r--r-- | series/series.go | 232 |
1 files changed, 209 insertions, 23 deletions
diff --git a/series/series.go b/series/series.go index ef133d2..de0d031 100644 --- a/series/series.go +++ b/series/series.go @@ -14,11 +14,12 @@ package series import ( + "cmp" "fmt" + "slices" "sync" "github.com/network-quality/goresponsiveness/utilities" - "golang.org/x/exp/constraints" ) type WindowSeriesDuration int @@ -28,7 +29,7 @@ const ( WindowOnly WindowSeriesDuration = iota ) -type WindowSeries[Data any, Bucket constraints.Ordered] interface { +type WindowSeries[Data any, Bucket utilities.Number] interface { fmt.Stringer Reserve(b Bucket) error @@ -42,15 +43,25 @@ type WindowSeries[Data any, Bucket constraints.Ordered] interface { Complete() bool GetType() WindowSeriesDuration + ExtractBoundedSeries() WindowSeries[Data, Bucket] + Append(appended *WindowSeries[Data, Bucket]) + BoundedAppend(appended *WindowSeries[Data, Bucket]) + + GetBucketBounds() (Bucket, Bucket) + + SetTrimmingBucketBounds(Bucket, Bucket) + ResetTrimmingBucketBounds() } -type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct { - windowSize int - data []utilities.Pair[Bucket, utilities.Optional[Data]] - latestIndex int - empty bool - lock sync.RWMutex +type windowSeriesWindowOnlyImpl[Data any, Bucket utilities.Number] struct { + windowSize int + data []utilities.Pair[Bucket, utilities.Optional[Data]] + latestIndex int // invariant: newest data is there. + empty bool + lock sync.RWMutex + lowerTrimmingBound utilities.Optional[Bucket] + upperTrimmingBound utilities.Optional[Bucket] } /* @@ -80,6 +91,13 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error { return nil } +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) BucketBounds() (Bucket, Bucket) { + newestBucket := wsi.data[wsi.latestIndex].First + oldestBucket := wsi.data[wsi.nextIndex(wsi.latestIndex)].First + + return oldestBucket, newestBucket +} + func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) error { wsi.lock.Lock() defer wsi.lock.Unlock() @@ -150,15 +168,28 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optio } result := make([]utilities.Optional[Data], wsi.windowSize) + var lowerTrimmingBound, upperTrimmingBound Bucket + hasBounds := false + if utilities.IsSome(wsi.lowerTrimmingBound) { + hasBounds = true + lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound) + upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound) + result = make([]utilities.Optional[Data], + int(upperTrimmingBound-lowerTrimmingBound)+1) + } + if wsi.empty { return result } iterator := wsi.latestIndex parallelIterator := 0 for { - result[parallelIterator] = wsi.data[iterator].Second + if !hasBounds || (lowerTrimmingBound <= wsi.data[iterator].First && + wsi.data[iterator].First <= upperTrimmingBound) { + result[parallelIterator] = wsi.data[iterator].Second + parallelIterator++ + } iterator = wsi.previousIndex(iterator) - parallelIterator++ if iterator == wsi.latestIndex { break } @@ -166,6 +197,32 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optio return result } +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ExtractBoundedSeries() WindowSeries[Data, Bucket] { + wsi.lock.Lock() + defer wsi.lock.Unlock() + + result := NewWindowSeries[Data, Bucket](WindowOnly, wsi.windowSize) + + var lowerTrimmingBound, upperTrimmingBound Bucket + hasBounds := false + if utilities.IsSome(wsi.lowerTrimmingBound) { + hasBounds = true + lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound) + upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound) + } + + for _, v := range wsi.data { + if hasBounds && (v.First < lowerTrimmingBound || v.First > upperTrimmingBound) { + continue + } + result.Reserve(v.First) + if utilities.IsSome(v.Second) { + result.Fill(v.First, utilities.GetSome(v.Second)) + } + } + return result +} + func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] { wsi.lock.Lock() defer wsi.lock.Unlock() @@ -198,11 +255,31 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string { return result } +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) SetTrimmingBucketBounds(lower Bucket, upper Bucket) { + wsi.lowerTrimmingBound = utilities.Some[Bucket](lower) + wsi.upperTrimmingBound = utilities.Some[Bucket](upper) +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ResetTrimmingBucketBounds() { + wsi.lowerTrimmingBound = utilities.None[Bucket]() + wsi.upperTrimmingBound = utilities.None[Bucket]() +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetBucketBounds() (Bucket, Bucket) { + wsi.lock.Lock() + defer wsi.lock.Unlock() + return wsi.data[wsi.nextIndex(wsi.latestIndex)].First, wsi.data[wsi.latestIndex].First +} + func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) { - panic("") + panic("Append is unimplemented on a window-only Window Series") +} + +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) BoundedAppend(appended *WindowSeries[Data, Bucket]) { + panic("BoundedAppend is unimplemented on a window-only Window Series") } -func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered]( +func newWindowSeriesWindowOnlyImpl[Data any, Bucket utilities.Number]( windowSize int, ) *windowSeriesWindowOnlyImpl[Data, Bucket] { result := windowSeriesWindowOnlyImpl[Data, Bucket]{windowSize: windowSize, latestIndex: 0, empty: true} @@ -216,17 +293,19 @@ func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered]( * End of WindowSeries interface methods. */ -type windowSeriesForeverImpl[Data any, Bucket constraints.Ordered] struct { - data []utilities.Pair[Bucket, utilities.Optional[Data]] - empty bool - lock sync.RWMutex +type windowSeriesForeverImpl[Data any, Bucket utilities.Number] struct { + data []utilities.Pair[Bucket, utilities.Optional[Data]] + empty bool + lock sync.RWMutex + lowerTrimmingBound utilities.Optional[Bucket] + upperTrimmingBound utilities.Optional[Bucket] } func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error { wsi.lock.Lock() defer wsi.lock.Unlock() if !wsi.empty && b <= wsi.data[len(wsi.data)-1].First { - fmt.Printf("reserving must be monotonically increasing") + fmt.Printf("reserving must be monotonically increasing: %v vs %v", b, wsi.data[len(wsi.data)-1].First) return fmt.Errorf("reserving must be monotonically increasing") } @@ -247,13 +326,53 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error { return fmt.Errorf("attempting to fill a bucket that does not exist") } +func (wsi *windowSeriesForeverImpl[Data, Bucket]) ExtractBoundedSeries() WindowSeries[Data, Bucket] { + wsi.lock.Lock() + defer wsi.lock.Unlock() + + var lowerTrimmingBound, upperTrimmingBound Bucket + hasBounds := false + if utilities.IsSome(wsi.lowerTrimmingBound) { + hasBounds = true + lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound) + upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound) + } + + result := NewWindowSeries[Data, Bucket](Forever, 0) + + for _, v := range wsi.data { + if hasBounds && (v.First < lowerTrimmingBound || v.First > upperTrimmingBound) { + continue + } + result.Reserve(v.First) + if utilities.IsSome(v.Second) { + result.Fill(v.First, utilities.GetSome(v.Second)) + } + } + return result +} + func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] { wsi.lock.Lock() defer wsi.lock.Unlock() result := make([]utilities.Optional[Data], len(wsi.data)) - for i, v := range utilities.Reverse(wsi.data) { - result[i] = v.Second + var lowerTrimmingBound, upperTrimmingBound Bucket + hasBounds := false + if utilities.IsSome(wsi.lowerTrimmingBound) { + hasBounds = true + lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound) + upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound) + result = make([]utilities.Optional[Data], + int(upperTrimmingBound-lowerTrimmingBound)+1) + } + + index := 0 + for _, v := range wsi.data { + if !hasBounds || (lowerTrimmingBound <= v.First && v.First <= upperTrimmingBound) { + result[index] = v.Second + index++ + } } return result @@ -289,8 +408,12 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetType() WindowSeriesDuration return Forever } -func newWindowSeriesForeverImpl[Data any, Bucket constraints.Ordered]() *windowSeriesForeverImpl[Data, Bucket] { - result := windowSeriesForeverImpl[Data, Bucket]{empty: true} +func newWindowSeriesForeverImpl[Data any, Bucket utilities.Number]() *windowSeriesForeverImpl[Data, Bucket] { + result := windowSeriesForeverImpl[Data, Bucket]{ + empty: true, + lowerTrimmingBound: utilities.None[Bucket](), + upperTrimmingBound: utilities.None[Bucket](), + } result.data = nil @@ -319,6 +442,32 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string { return result } +func (wsi *windowSeriesForeverImpl[Data, Bucket]) SetTrimmingBucketBounds(lower Bucket, upper Bucket) { + wsi.lowerTrimmingBound = utilities.Some[Bucket](lower) + wsi.upperTrimmingBound = utilities.Some[Bucket](upper) +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) ResetTrimmingBucketBounds() { + wsi.lowerTrimmingBound = utilities.None[Bucket]() + wsi.upperTrimmingBound = utilities.None[Bucket]() +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetBucketBounds() (Bucket, Bucket) { + wsi.lock.Lock() + defer wsi.lock.Unlock() + if wsi.empty { + return 0, 0 + } + return wsi.data[0].First, wsi.data[len(wsi.data)-1].First +} + +// Sort the data according to the bucket ids (ascending) +func (wsi *windowSeriesForeverImpl[Data, Bucket]) sort() { + slices.SortFunc(wsi.data, func(left, right utilities.Pair[Bucket, utilities.Optional[Data]]) int { + return cmp.Compare(left.First, right.First) + }) +} + func (wsi *windowSeriesForeverImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) { result, ok := (*appended).(*windowSeriesForeverImpl[Data, Bucket]) if !ok { @@ -330,19 +479,56 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Append(appended *WindowSeries[ defer result.lock.Unlock() wsi.data = append(wsi.data, result.data...) + // Because the series that we are appending in may have overlapping buckets, + // we will sort them to maintain the invariant that the data items are sorted + // by bucket ids (increasing). + wsi.sort() +} + +func (wsi *windowSeriesForeverImpl[Data, Bucket]) BoundedAppend(appended *WindowSeries[Data, Bucket]) { + result, ok := (*appended).(*windowSeriesForeverImpl[Data, Bucket]) + if !ok { + panic("Cannot merge a forever window series with a non-forever window series.") + } + wsi.lock.Lock() + defer wsi.lock.Unlock() + result.lock.Lock() + defer result.lock.Unlock() + + if utilities.IsNone(result.lowerTrimmingBound) || + utilities.IsNone(result.upperTrimmingBound) { + wsi.sort() + wsi.data = append(wsi.data, result.data...) + return + } else { + lowerTrimmingBound := utilities.GetSome(result.lowerTrimmingBound) + upperTrimmingBound := utilities.GetSome(result.upperTrimmingBound) + + toAppend := utilities.Filter(result.data, func( + element utilities.Pair[Bucket, utilities.Optional[Data]], + ) bool { + bucket := element.First + return lowerTrimmingBound <= bucket && bucket <= upperTrimmingBound + }) + wsi.data = append(wsi.data, toAppend...) + } + // Because the series that we are appending in may have overlapping buckets, + // we will sort them to maintain the invariant that the data items are sorted + // by bucket ids (increasing). + wsi.sort() } /* * End of WindowSeries interface methods. */ -func NewWindowSeries[Data any, Bucket constraints.Ordered](tipe WindowSeriesDuration, windowSize int) WindowSeries[Data, Bucket] { +func NewWindowSeries[Data any, Bucket utilities.Number](tipe WindowSeriesDuration, windowSize int) WindowSeries[Data, Bucket] { if tipe == WindowOnly { return newWindowSeriesWindowOnlyImpl[Data, Bucket](windowSize) } else if tipe == Forever { return newWindowSeriesForeverImpl[Data, Bucket]() } - panic("") + panic("Attempting to create a new window series with an invalid type.") } type NumericBucketGenerator[T utilities.Number] struct { |
