summaryrefslogtreecommitdiff
path: root/series
diff options
context:
space:
mode:
Diffstat (limited to 'series')
-rw-r--r--series/series.go62
-rw-r--r--series/series_test.go64
2 files changed, 126 insertions, 0 deletions
diff --git a/series/series.go b/series/series.go
index 6b9af1f..ef133d2 100644
--- a/series/series.go
+++ b/series/series.go
@@ -41,6 +41,8 @@ type WindowSeries[Data any, Bucket constraints.Ordered] interface {
GetValues() []utilities.Optional[Data]
Complete() bool
GetType() WindowSeriesDuration
+
+ Append(appended *WindowSeries[Data, Bucket])
}
type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct {
@@ -48,6 +50,7 @@ type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct {
data []utilities.Pair[Bucket, utilities.Optional[Data]]
latestIndex int
empty bool
+ lock sync.RWMutex
}
/*
@@ -58,6 +61,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error {
if !wsi.empty && b <= wsi.data[wsi.latestIndex].First {
return fmt.Errorf("reserving must be monotonically increasing")
}
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
if wsi.empty {
/* Special case if we are empty: The latestIndex is where we want this value to go! */
@@ -76,6 +81,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error {
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) error {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
iterator := wsi.latestIndex
for {
if wsi.data[iterator].First == b {
@@ -91,6 +98,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) erro
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Count() (some int, none int) {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
some = 0
none = 0
for _, v := range wsi.data {
@@ -104,6 +113,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Count() (some int, none int
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Complete() bool {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
for _, v := range wsi.data {
if utilities.IsNone(v.Second) {
return false
@@ -113,10 +124,18 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Complete() bool {
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) nextIndex(currentIndex int) int {
+ // Internal functions should be called with the lock held!
+ if wsi.lock.TryLock() {
+ panic("windowSeriesWindowOnlyImpl nextIndex called without lock held.")
+ }
return (currentIndex + 1) % wsi.windowSize
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) previousIndex(currentIndex int) int {
+ // Internal functions should be called with the lock held!
+ if wsi.lock.TryLock() {
+ panic("windowSeriesWindowOnlyImpl nextIndex called without lock held.")
+ }
nextIndex := currentIndex - 1
if nextIndex < 0 {
nextIndex += wsi.windowSize
@@ -125,6 +144,10 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) previousIndex(currentIndex
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optional[Data] {
+ // Internal functions should be called with the lock held!
+ if wsi.lock.TryLock() {
+ panic("windowSeriesWindowOnlyImpl nextIndex called without lock held.")
+ }
result := make([]utilities.Optional[Data], wsi.windowSize)
if wsi.empty {
@@ -144,6 +167,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optio
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
return wsi.toArray()
}
@@ -152,12 +177,16 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetType() WindowSeriesDurat
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ForEach(eacher func(b Bucket, d *utilities.Optional[Data])) {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
for _, v := range wsi.data {
eacher(v.First, &v.Second)
}
}
func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
result := fmt.Sprintf("Window series (window (%d) only, latest index: %v): ", wsi.windowSize, wsi.latestIndex)
for _, v := range wsi.data {
valueString := "None"
@@ -169,6 +198,10 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string {
return result
}
+func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) {
+ panic("")
+}
+
func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered](
windowSize int,
) *windowSeriesWindowOnlyImpl[Data, Bucket] {
@@ -186,10 +219,14 @@ func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered](
type windowSeriesForeverImpl[Data any, Bucket constraints.Ordered] struct {
data []utilities.Pair[Bucket, utilities.Optional[Data]]
empty bool
+ lock sync.RWMutex
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
if !wsi.empty && b <= wsi.data[len(wsi.data)-1].First {
+ fmt.Printf("reserving must be monotonically increasing")
return fmt.Errorf("reserving must be monotonically increasing")
}
@@ -199,6 +236,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error {
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
for i := range wsi.data {
if wsi.data[i].First == b {
wsi.data[i].Second = utilities.Some[Data](d)
@@ -209,6 +248,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error {
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
result := make([]utilities.Optional[Data], len(wsi.data))
for i, v := range utilities.Reverse(wsi.data) {
@@ -219,6 +260,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Option
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) Count() (some int, none int) {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
some = 0
none = 0
for _, v := range wsi.data {
@@ -232,6 +275,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Count() (some int, none int) {
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) Complete() bool {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
for _, v := range wsi.data {
if utilities.IsNone(v.Second) {
return false
@@ -253,12 +298,16 @@ func newWindowSeriesForeverImpl[Data any, Bucket constraints.Ordered]() *windowS
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) ForEach(eacher func(b Bucket, d *utilities.Optional[Data])) {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
for _, v := range wsi.data {
eacher(v.First, &v.Second)
}
}
func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string {
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
result := "Window series (forever): "
for _, v := range wsi.data {
valueString := "None"
@@ -270,6 +319,19 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string {
return result
}
+func (wsi *windowSeriesForeverImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) {
+ result, ok := (*appended).(*windowSeriesForeverImpl[Data, Bucket])
+ if !ok {
+ panic("Cannot merge a forever window series with a non-forever window series.")
+ }
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
+ result.lock.Lock()
+ defer result.lock.Unlock()
+
+ wsi.data = append(wsi.data, result.data...)
+}
+
/*
* End of WindowSeries interface methods.
*/
diff --git a/series/series_test.go b/series/series_test.go
index 3ed752d..54ddfcc 100644
--- a/series/series_test.go
+++ b/series/series_test.go
@@ -15,14 +15,21 @@ package series
import (
"reflect"
+ "sync"
"testing"
+ "time"
+ RPMTesting "github.com/network-quality/goresponsiveness/testing"
"github.com/network-quality/goresponsiveness/utilities"
)
func TestNextIndex(t *testing.T) {
wsi := newWindowSeriesWindowOnlyImpl[int, int](4)
+ // Calling internal functions must be done with the lock held!
+ wsi.lock.Lock()
+ defer wsi.lock.Unlock()
+
idx := wsi.nextIndex(wsi.latestIndex)
if idx != 1 {
t.Fatalf("nextIndex is wrong (1)")
@@ -54,6 +61,18 @@ func TestNextIndex(t *testing.T) {
wsi.latestIndex = idx
}
+func TestNextIndexUnlocked(t *testing.T) {
+ wsi := newWindowSeriesWindowOnlyImpl[int, int](4)
+
+ panicingTest := func() {
+ wsi.nextIndex(wsi.latestIndex)
+ }
+
+ if !RPMTesting.DidPanic(panicingTest) {
+ t.Fatalf("Expected a call to nextIndex (without the lock held) to panic but it did not")
+ }
+}
+
func TestSimpleWindowComplete(t *testing.T) {
wsi := newWindowSeriesWindowOnlyImpl[int, int](4)
if wsi.Complete() {
@@ -877,3 +896,48 @@ func Test_ForeverStandardDeviationCalculation2(test *testing.T) {
test.Fatalf("Standard deviation(series) max calculation(series) failed: Expected: %v; Actual: %v.", expected, sd)
}
}
+
+func Test_ForeverLocking(test *testing.T) {
+ series := newWindowSeriesForeverImpl[float64, int]()
+ testFail := false
+
+ series.Reserve(1)
+ series.Reserve(2)
+
+ series.Fill(1, 8)
+ series.Fill(2, 9)
+
+ wg := sync.WaitGroup{}
+
+ counter := 0
+
+ wg.Add(2)
+ go func() {
+ series.ForEach(func(b int, d *utilities.Optional[float64]) {
+ // All of these ++s should be done under a single lock of the lock and, therefore,
+ // the ForEach below should not start until both buckets are ForEach'd over!
+ counter++
+ // Make this a long wait so we know that there is no chance for a race and that
+ // we are really testing what we mean to test!
+ time.Sleep(time.Second * 5)
+ })
+ wg.Done()
+ }()
+
+ time.Sleep(1 * time.Second)
+
+ go func() {
+ series.ForEach(func(b int, d *utilities.Optional[float64]) {
+ if counter != 2 {
+ testFail = true
+ }
+ })
+ wg.Done()
+ }()
+
+ wg.Wait()
+
+ if testFail {
+ test.Fatalf("Mutual exclusion checks did not properly lock out parallel ForEach operations.")
+ }
+}