summaryrefslogtreecommitdiff
path: root/series/series.go
diff options
context:
space:
mode:
Diffstat (limited to 'series/series.go')
-rw-r--r--series/series.go232
1 files changed, 209 insertions, 23 deletions
diff --git a/series/series.go b/series/series.go
index ef133d2..de0d031 100644
--- a/series/series.go
+++ b/series/series.go
@@ -14,11 +14,12 @@
package series
import (
+ "cmp"
"fmt"
+ "slices"
"sync"
"github.com/network-quality/goresponsiveness/utilities"
- "golang.org/x/exp/constraints"
)
type WindowSeriesDuration int
@@ -28,7 +29,7 @@ const (
WindowOnly WindowSeriesDuration = iota
)
-type WindowSeries[Data any, Bucket constraints.Ordered] interface {
+type WindowSeries[Data any, Bucket utilities.Number] interface {
fmt.Stringer
Reserve(b Bucket) error
@@ -42,15 +43,25 @@ type WindowSeries[Data any, Bucket constraints.Ordered] interface {
Complete() bool
GetType() WindowSeriesDuration
+ ExtractBoundedSeries() WindowSeries[Data, Bucket]
+
Append(appended *WindowSeries[Data, Bucket])
+ BoundedAppend(appended *WindowSeries[Data, Bucket])
+
+ GetBucketBounds() (Bucket, Bucket)
+
+ SetTrimmingBucketBounds(Bucket, Bucket)
+ ResetTrimmingBucketBounds()
}
-type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct {
- windowSize int
- data []utilities.Pair[Bucket, utilities.Optional[Data]]
- latestIndex int
- empty bool
- lock sync.RWMutex
+type windowSeriesWindowOnlyImpl[Data any, Bucket utilities.Number] struct {
+ windowSize int
+ data []utilities.Pair[Bucket, utilities.Optional[Data]]
+ latestIndex int // invariant: newest data is there.
+ empty bool
+ lock sync.RWMutex
+ lowerTrimmingBound utilities.Optional[Bucket]
+ upperTrimmingBound utilities.Optional[Bucket]
}
/*
@@ -80,6 +91,13 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error {
return nil
}
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) BucketBounds() (Bucket, Bucket) {
+ newestBucket := wsi.data[wsi.latestIndex].First
+ oldestBucket := wsi.data[wsi.nextIndex(wsi.latestIndex)].First
+
+ return oldestBucket, newestBucket
+}
+
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) error {
wsi.lock.Lock()
defer wsi.lock.Unlock()
@@ -150,15 +168,28 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optio
}
result := make([]utilities.Optional[Data], wsi.windowSize)
+ var lowerTrimmingBound, upperTrimmingBound Bucket
+ hasBounds := false
+ if utilities.IsSome(wsi.lowerTrimmingBound) {
+ hasBounds = true
+ lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound)
+ upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound)
+ result = make([]utilities.Optional[Data],
+ int(upperTrimmingBound-lowerTrimmingBound)+1)
+ }
+
if wsi.empty {
return result
}
iterator := wsi.latestIndex
parallelIterator := 0
for {
- result[parallelIterator] = wsi.data[iterator].Second
+ if !hasBounds || (lowerTrimmingBound <= wsi.data[iterator].First &&
+ wsi.data[iterator].First <= upperTrimmingBound) {
+ result[parallelIterator] = wsi.data[iterator].Second
+ parallelIterator++
+ }
iterator = wsi.previousIndex(iterator)
- parallelIterator++
if iterator == wsi.latestIndex {
break
}
@@ -166,6 +197,32 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optio
return result
}
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ExtractBoundedSeries() WindowSeries[Data, Bucket] {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
+
+ result := NewWindowSeries[Data, Bucket](WindowOnly, wsi.windowSize)
+
+ var lowerTrimmingBound, upperTrimmingBound Bucket
+ hasBounds := false
+ if utilities.IsSome(wsi.lowerTrimmingBound) {
+ hasBounds = true
+ lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound)
+ upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound)
+ }
+
+ for _, v := range wsi.data {
+ if hasBounds && (v.First < lowerTrimmingBound || v.First > upperTrimmingBound) {
+ continue
+ }
+ result.Reserve(v.First)
+ if utilities.IsSome(v.Second) {
+ result.Fill(v.First, utilities.GetSome(v.Second))
+ }
+ }
+ return result
+}
+
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] {
wsi.lock.Lock()
defer wsi.lock.Unlock()
@@ -198,11 +255,31 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string {
return result
}
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) SetTrimmingBucketBounds(lower Bucket, upper Bucket) {
+ wsi.lowerTrimmingBound = utilities.Some[Bucket](lower)
+ wsi.upperTrimmingBound = utilities.Some[Bucket](upper)
+}
+
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ResetTrimmingBucketBounds() {
+ wsi.lowerTrimmingBound = utilities.None[Bucket]()
+ wsi.upperTrimmingBound = utilities.None[Bucket]()
+}
+
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetBucketBounds() (Bucket, Bucket) {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
+ return wsi.data[wsi.nextIndex(wsi.latestIndex)].First, wsi.data[wsi.latestIndex].First
+}
+
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) {
- panic("")
+ panic("Append is unimplemented on a window-only Window Series")
+}
+
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) BoundedAppend(appended *WindowSeries[Data, Bucket]) {
+ panic("BoundedAppend is unimplemented on a window-only Window Series")
}
-func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered](
+func newWindowSeriesWindowOnlyImpl[Data any, Bucket utilities.Number](
windowSize int,
) *windowSeriesWindowOnlyImpl[Data, Bucket] {
result := windowSeriesWindowOnlyImpl[Data, Bucket]{windowSize: windowSize, latestIndex: 0, empty: true}
@@ -216,17 +293,19 @@ func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered](
* End of WindowSeries interface methods.
*/
-type windowSeriesForeverImpl[Data any, Bucket constraints.Ordered] struct {
- data []utilities.Pair[Bucket, utilities.Optional[Data]]
- empty bool
- lock sync.RWMutex
+type windowSeriesForeverImpl[Data any, Bucket utilities.Number] struct {
+ data []utilities.Pair[Bucket, utilities.Optional[Data]]
+ empty bool
+ lock sync.RWMutex
+ lowerTrimmingBound utilities.Optional[Bucket]
+ upperTrimmingBound utilities.Optional[Bucket]
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error {
wsi.lock.Lock()
defer wsi.lock.Unlock()
if !wsi.empty && b <= wsi.data[len(wsi.data)-1].First {
- fmt.Printf("reserving must be monotonically increasing")
+ fmt.Printf("reserving must be monotonically increasing: %v vs %v", b, wsi.data[len(wsi.data)-1].First)
return fmt.Errorf("reserving must be monotonically increasing")
}
@@ -247,13 +326,53 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error {
return fmt.Errorf("attempting to fill a bucket that does not exist")
}
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) ExtractBoundedSeries() WindowSeries[Data, Bucket] {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
+
+ var lowerTrimmingBound, upperTrimmingBound Bucket
+ hasBounds := false
+ if utilities.IsSome(wsi.lowerTrimmingBound) {
+ hasBounds = true
+ lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound)
+ upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound)
+ }
+
+ result := NewWindowSeries[Data, Bucket](Forever, 0)
+
+ for _, v := range wsi.data {
+ if hasBounds && (v.First < lowerTrimmingBound || v.First > upperTrimmingBound) {
+ continue
+ }
+ result.Reserve(v.First)
+ if utilities.IsSome(v.Second) {
+ result.Fill(v.First, utilities.GetSome(v.Second))
+ }
+ }
+ return result
+}
+
func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] {
wsi.lock.Lock()
defer wsi.lock.Unlock()
result := make([]utilities.Optional[Data], len(wsi.data))
- for i, v := range utilities.Reverse(wsi.data) {
- result[i] = v.Second
+ var lowerTrimmingBound, upperTrimmingBound Bucket
+ hasBounds := false
+ if utilities.IsSome(wsi.lowerTrimmingBound) {
+ hasBounds = true
+ lowerTrimmingBound = utilities.GetSome(wsi.lowerTrimmingBound)
+ upperTrimmingBound = utilities.GetSome(wsi.upperTrimmingBound)
+ result = make([]utilities.Optional[Data],
+ int(upperTrimmingBound-lowerTrimmingBound)+1)
+ }
+
+ index := 0
+ for _, v := range wsi.data {
+ if !hasBounds || (lowerTrimmingBound <= v.First && v.First <= upperTrimmingBound) {
+ result[index] = v.Second
+ index++
+ }
}
return result
@@ -289,8 +408,12 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetType() WindowSeriesDuration
return Forever
}
-func newWindowSeriesForeverImpl[Data any, Bucket constraints.Ordered]() *windowSeriesForeverImpl[Data, Bucket] {
- result := windowSeriesForeverImpl[Data, Bucket]{empty: true}
+func newWindowSeriesForeverImpl[Data any, Bucket utilities.Number]() *windowSeriesForeverImpl[Data, Bucket] {
+ result := windowSeriesForeverImpl[Data, Bucket]{
+ empty: true,
+ lowerTrimmingBound: utilities.None[Bucket](),
+ upperTrimmingBound: utilities.None[Bucket](),
+ }
result.data = nil
@@ -319,6 +442,32 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string {
return result
}
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) SetTrimmingBucketBounds(lower Bucket, upper Bucket) {
+ wsi.lowerTrimmingBound = utilities.Some[Bucket](lower)
+ wsi.upperTrimmingBound = utilities.Some[Bucket](upper)
+}
+
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) ResetTrimmingBucketBounds() {
+ wsi.lowerTrimmingBound = utilities.None[Bucket]()
+ wsi.upperTrimmingBound = utilities.None[Bucket]()
+}
+
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetBucketBounds() (Bucket, Bucket) {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
+ if wsi.empty {
+ return 0, 0
+ }
+ return wsi.data[0].First, wsi.data[len(wsi.data)-1].First
+}
+
+// Sort the data according to the bucket ids (ascending)
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) sort() {
+ slices.SortFunc(wsi.data, func(left, right utilities.Pair[Bucket, utilities.Optional[Data]]) int {
+ return cmp.Compare(left.First, right.First)
+ })
+}
+
func (wsi *windowSeriesForeverImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) {
result, ok := (*appended).(*windowSeriesForeverImpl[Data, Bucket])
if !ok {
@@ -330,19 +479,56 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Append(appended *WindowSeries[
defer result.lock.Unlock()
wsi.data = append(wsi.data, result.data...)
+ // Because the series that we are appending in may have overlapping buckets,
+ // we will sort them to maintain the invariant that the data items are sorted
+ // by bucket ids (increasing).
+ wsi.sort()
+}
+
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) BoundedAppend(appended *WindowSeries[Data, Bucket]) {
+ result, ok := (*appended).(*windowSeriesForeverImpl[Data, Bucket])
+ if !ok {
+ panic("Cannot merge a forever window series with a non-forever window series.")
+ }
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
+ result.lock.Lock()
+ defer result.lock.Unlock()
+
+ if utilities.IsNone(result.lowerTrimmingBound) ||
+ utilities.IsNone(result.upperTrimmingBound) {
+ wsi.sort()
+ wsi.data = append(wsi.data, result.data...)
+ return
+ } else {
+ lowerTrimmingBound := utilities.GetSome(result.lowerTrimmingBound)
+ upperTrimmingBound := utilities.GetSome(result.upperTrimmingBound)
+
+ toAppend := utilities.Filter(result.data, func(
+ element utilities.Pair[Bucket, utilities.Optional[Data]],
+ ) bool {
+ bucket := element.First
+ return lowerTrimmingBound <= bucket && bucket <= upperTrimmingBound
+ })
+ wsi.data = append(wsi.data, toAppend...)
+ }
+ // Because the series that we are appending in may have overlapping buckets,
+ // we will sort them to maintain the invariant that the data items are sorted
+ // by bucket ids (increasing).
+ wsi.sort()
}
/*
* End of WindowSeries interface methods.
*/
-func NewWindowSeries[Data any, Bucket constraints.Ordered](tipe WindowSeriesDuration, windowSize int) WindowSeries[Data, Bucket] {
+func NewWindowSeries[Data any, Bucket utilities.Number](tipe WindowSeriesDuration, windowSize int) WindowSeries[Data, Bucket] {
if tipe == WindowOnly {
return newWindowSeriesWindowOnlyImpl[Data, Bucket](windowSize)
} else if tipe == Forever {
return newWindowSeriesForeverImpl[Data, Bucket]()
}
- panic("")
+ panic("Attempting to create a new window series with an invalid type.")
}
type NumericBucketGenerator[T utilities.Number] struct {