summaryrefslogtreecommitdiff
path: root/series/series.go
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2023-12-13 19:56:03 -0500
committerWill Hawkins <[email protected]>2024-01-04 19:10:37 -0500
commitf3990f950277c2f61e0e1811b4b8a81fc0219da4 (patch)
tree6969e4ac2c4e94e75fe2e0c5581da5c07785dce8 /series/series.go
parent552f01ad73248474553ce471695745db58c862ea (diff)
[Feature] Support for testing upload/download in parallel
Use the `--rpm.parallel` to test in parallel mode. The default testing mode is serial. Signed-off-by: Will Hawkins <[email protected]>
Diffstat (limited to 'series/series.go')
-rw-r--r--series/series.go62
1 files changed, 62 insertions, 0 deletions
diff --git a/series/series.go b/series/series.go
index 6b9af1f..ef133d2 100644
--- a/series/series.go
+++ b/series/series.go
@@ -41,6 +41,8 @@ type WindowSeries[Data any, Bucket constraints.Ordered] interface {
GetValues() []utilities.Optional[Data]
Complete() bool
GetType() WindowSeriesDuration
+
+ Append(appended *WindowSeries[Data, Bucket])
}
type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct {
@@ -48,6 +50,7 @@ type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct {
data []utilities.Pair[Bucket, utilities.Optional[Data]]
latestIndex int
empty bool
+ lock sync.RWMutex
}
/*
@@ -58,6 +61,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error {
if !wsi.empty && b <= wsi.data[wsi.latestIndex].First {
return fmt.Errorf("reserving must be monotonically increasing")
}
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
if wsi.empty {
/* Special case if we are empty: The latestIndex is where we want this value to go! */
@@ -76,6 +81,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error {
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) error {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
iterator := wsi.latestIndex
for {
if wsi.data[iterator].First == b {
@@ -91,6 +98,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) erro
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Count() (some int, none int) {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
some = 0
none = 0
for _, v := range wsi.data {
@@ -104,6 +113,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Count() (some int, none int
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Complete() bool {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
for _, v := range wsi.data {
if utilities.IsNone(v.Second) {
return false
@@ -113,10 +124,18 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Complete() bool {
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) nextIndex(currentIndex int) int {
+ // Internal functions should be called with the lock held!
+ if wsi.lock.TryLock() {
+ panic("windowSeriesWindowOnlyImpl nextIndex called without lock held.")
+ }
return (currentIndex + 1) % wsi.windowSize
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) previousIndex(currentIndex int) int {
+ // Internal functions should be called with the lock held!
+ if wsi.lock.TryLock() {
+ panic("windowSeriesWindowOnlyImpl nextIndex called without lock held.")
+ }
nextIndex := currentIndex - 1
if nextIndex < 0 {
nextIndex += wsi.windowSize
@@ -125,6 +144,10 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) previousIndex(currentIndex
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optional[Data] {
+ // Internal functions should be called with the lock held!
+ if wsi.lock.TryLock() {
+ panic("windowSeriesWindowOnlyImpl nextIndex called without lock held.")
+ }
result := make([]utilities.Optional[Data], wsi.windowSize)
if wsi.empty {
@@ -144,6 +167,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optio
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
return wsi.toArray()
}
@@ -152,12 +177,16 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetType() WindowSeriesDurat
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ForEach(eacher func(b Bucket, d *utilities.Optional[Data])) {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
for _, v := range wsi.data {
eacher(v.First, &v.Second)
}
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
result := fmt.Sprintf("Window series (window (%d) only, latest index: %v): ", wsi.windowSize, wsi.latestIndex)
for _, v := range wsi.data {
valueString := "None"
@@ -169,6 +198,10 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string {
return result
}
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) {
+ panic("")
+}
+
func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered](
windowSize int,
) *windowSeriesWindowOnlyImpl[Data, Bucket] {
@@ -186,10 +219,14 @@ func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered](
type windowSeriesForeverImpl[Data any, Bucket constraints.Ordered] struct {
data []utilities.Pair[Bucket, utilities.Optional[Data]]
empty bool
+ lock sync.RWMutex
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
if !wsi.empty && b <= wsi.data[len(wsi.data)-1].First {
+ fmt.Printf("reserving must be monotonically increasing")
return fmt.Errorf("reserving must be monotonically increasing")
}
@@ -199,6 +236,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error {
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
for i := range wsi.data {
if wsi.data[i].First == b {
wsi.data[i].Second = utilities.Some[Data](d)
@@ -209,6 +248,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error {
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
result := make([]utilities.Optional[Data], len(wsi.data))
for i, v := range utilities.Reverse(wsi.data) {
@@ -219,6 +260,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Option
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) Count() (some int, none int) {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
some = 0
none = 0
for _, v := range wsi.data {
@@ -232,6 +275,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Count() (some int, none int) {
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) Complete() bool {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
for _, v := range wsi.data {
if utilities.IsNone(v.Second) {
return false
@@ -253,12 +298,16 @@ func newWindowSeriesForeverImpl[Data any, Bucket constraints.Ordered]() *windowS
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) ForEach(eacher func(b Bucket, d *utilities.Optional[Data])) {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
for _, v := range wsi.data {
eacher(v.First, &v.Second)
}
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
result := "Window series (forever): "
for _, v := range wsi.data {
valueString := "None"
@@ -270,6 +319,19 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string {
return result
}
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) {
+ result, ok := (*appended).(*windowSeriesForeverImpl[Data, Bucket])
+ if !ok {
+ panic("Cannot merge a forever window series with a non-forever window series.")
+ }
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
+ result.lock.Lock()
+ defer result.lock.Unlock()
+
+ wsi.data = append(wsi.data, result.data...)
+}
+
/*
* End of WindowSeries interface methods.
*/