diff options
Diffstat (limited to 'series')
| -rw-r--r-- | series/series.go | 62 | ||||
| -rw-r--r-- | series/series_test.go | 64 |
2 files changed, 126 insertions, 0 deletions
diff --git a/series/series.go b/series/series.go index 6b9af1f..ef133d2 100644 --- a/series/series.go +++ b/series/series.go @@ -41,6 +41,8 @@ type WindowSeries[Data any, Bucket constraints.Ordered] interface { GetValues() []utilities.Optional[Data] Complete() bool GetType() WindowSeriesDuration + + Append(appended *WindowSeries[Data, Bucket]) } type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct { @@ -48,6 +50,7 @@ type windowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered] struct { data []utilities.Pair[Bucket, utilities.Optional[Data]] latestIndex int empty bool + lock sync.RWMutex } /* @@ -58,6 +61,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error { if !wsi.empty && b <= wsi.data[wsi.latestIndex].First { return fmt.Errorf("reserving must be monotonically increasing") } + wsi.lock.Lock() + defer wsi.lock.Unlock() if wsi.empty { /* Special case if we are empty: The latestIndex is where we want this value to go! */ @@ -76,6 +81,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Reserve(b Bucket) error { } func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) error { + wsi.lock.Lock() + defer wsi.lock.Unlock() iterator := wsi.latestIndex for { if wsi.data[iterator].First == b { @@ -91,6 +98,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Fill(b Bucket, d Data) erro } func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Count() (some int, none int) { + wsi.lock.Lock() + defer wsi.lock.Unlock() some = 0 none = 0 for _, v := range wsi.data { @@ -104,6 +113,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Count() (some int, none int } func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Complete() bool { + wsi.lock.Lock() + defer wsi.lock.Unlock() for _, v := range wsi.data { if utilities.IsNone(v.Second) { return false @@ -113,10 +124,18 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Complete() bool { } func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) nextIndex(currentIndex int) int { + // Internal functions should be called with the lock held! + if wsi.lock.TryLock() { + panic("windowSeriesWindowOnlyImpl nextIndex called without lock held.") + } return (currentIndex + 1) % wsi.windowSize } func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) previousIndex(currentIndex int) int { + // Internal functions should be called with the lock held! + if wsi.lock.TryLock() { + panic("windowSeriesWindowOnlyImpl nextIndex called without lock held.") + } nextIndex := currentIndex - 1 if nextIndex < 0 { nextIndex += wsi.windowSize @@ -125,6 +144,10 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) previousIndex(currentIndex } func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optional[Data] { + // Internal functions should be called with the lock held! + if wsi.lock.TryLock() { + panic("windowSeriesWindowOnlyImpl nextIndex called without lock held.") + } result := make([]utilities.Optional[Data], wsi.windowSize) if wsi.empty { @@ -144,6 +167,8 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) toArray() []utilities.Optio } func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] { + wsi.lock.Lock() + defer wsi.lock.Unlock() return wsi.toArray() } @@ -152,12 +177,16 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) GetType() WindowSeriesDurat } func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) ForEach(eacher func(b Bucket, d *utilities.Optional[Data])) { + wsi.lock.Lock() + defer wsi.lock.Unlock() for _, v := range wsi.data { eacher(v.First, &v.Second) } } func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string { + wsi.lock.Lock() + defer wsi.lock.Unlock() result := fmt.Sprintf("Window series (window (%d) only, latest index: %v): ", wsi.windowSize, wsi.latestIndex) for _, v := range wsi.data { valueString := "None" @@ -169,6 +198,10 @@ func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) String() string { return result } +func (wsi *windowSeriesWindowOnlyImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) { + panic("") +} + func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered]( windowSize int, ) *windowSeriesWindowOnlyImpl[Data, Bucket] { @@ -186,10 +219,14 @@ func newWindowSeriesWindowOnlyImpl[Data any, Bucket constraints.Ordered]( type windowSeriesForeverImpl[Data any, Bucket constraints.Ordered] struct { data []utilities.Pair[Bucket, utilities.Optional[Data]] empty bool + lock sync.RWMutex } func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error { + wsi.lock.Lock() + defer wsi.lock.Unlock() if !wsi.empty && b <= wsi.data[len(wsi.data)-1].First { + fmt.Printf("reserving must be monotonically increasing") return fmt.Errorf("reserving must be monotonically increasing") } @@ -199,6 +236,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Reserve(b Bucket) error { } func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error { + wsi.lock.Lock() + defer wsi.lock.Unlock() for i := range wsi.data { if wsi.data[i].First == b { wsi.data[i].Second = utilities.Some[Data](d) @@ -209,6 +248,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Fill(b Bucket, d Data) error { } func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Optional[Data] { + wsi.lock.Lock() + defer wsi.lock.Unlock() result := make([]utilities.Optional[Data], len(wsi.data)) for i, v := range utilities.Reverse(wsi.data) { @@ -219,6 +260,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) GetValues() []utilities.Option } func (wsi *windowSeriesForeverImpl[Data, Bucket]) Count() (some int, none int) { + wsi.lock.Lock() + defer wsi.lock.Unlock() some = 0 none = 0 for _, v := range wsi.data { @@ -232,6 +275,8 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) Count() (some int, none int) { } func (wsi *windowSeriesForeverImpl[Data, Bucket]) Complete() bool { + wsi.lock.Lock() + defer wsi.lock.Unlock() for _, v := range wsi.data { if utilities.IsNone(v.Second) { return false @@ -253,12 +298,16 @@ func newWindowSeriesForeverImpl[Data any, Bucket constraints.Ordered]() *windowS } func (wsi *windowSeriesForeverImpl[Data, Bucket]) ForEach(eacher func(b Bucket, d *utilities.Optional[Data])) { + wsi.lock.Lock() + defer wsi.lock.Unlock() for _, v := range wsi.data { eacher(v.First, &v.Second) } } func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string { + wsi.lock.Lock() + defer wsi.lock.Unlock() result := "Window series (forever): " for _, v := range wsi.data { valueString := "None" @@ -270,6 +319,19 @@ func (wsi *windowSeriesForeverImpl[Data, Bucket]) String() string { return result } +func (wsi *windowSeriesForeverImpl[Data, Bucket]) Append(appended *WindowSeries[Data, Bucket]) { + result, ok := (*appended).(*windowSeriesForeverImpl[Data, Bucket]) + if !ok { + panic("Cannot merge a forever window series with a non-forever window series.") + } + wsi.lock.Lock() + defer wsi.lock.Unlock() + result.lock.Lock() + defer result.lock.Unlock() + + wsi.data = append(wsi.data, result.data...) +} + /* * End of WindowSeries interface methods. */ diff --git a/series/series_test.go b/series/series_test.go index 3ed752d..54ddfcc 100644 --- a/series/series_test.go +++ b/series/series_test.go @@ -15,14 +15,21 @@ package series import ( "reflect" + "sync" "testing" + "time" + RPMTesting "github.com/network-quality/goresponsiveness/testing" "github.com/network-quality/goresponsiveness/utilities" ) func TestNextIndex(t *testing.T) { wsi := newWindowSeriesWindowOnlyImpl[int, int](4) + // Calling internal functions must be done with the lock held! + wsi.lock.Lock() + defer wsi.lock.Unlock() + idx := wsi.nextIndex(wsi.latestIndex) if idx != 1 { t.Fatalf("nextIndex is wrong (1)") @@ -54,6 +61,18 @@ func TestNextIndex(t *testing.T) { wsi.latestIndex = idx } +func TestNextIndexUnlocked(t *testing.T) { + wsi := newWindowSeriesWindowOnlyImpl[int, int](4) + + panicingTest := func() { + wsi.nextIndex(wsi.latestIndex) + } + + if !RPMTesting.DidPanic(panicingTest) { + t.Fatalf("Expected a call to nextIndex (without the lock held) to panic but it did not") + } +} + func TestSimpleWindowComplete(t *testing.T) { wsi := newWindowSeriesWindowOnlyImpl[int, int](4) if wsi.Complete() { @@ -877,3 +896,48 @@ func Test_ForeverStandardDeviationCalculation2(test *testing.T) { test.Fatalf("Standard deviation(series) max calculation(series) failed: Expected: %v; Actual: %v.", expected, sd) } } + +func Test_ForeverLocking(test *testing.T) { + series := newWindowSeriesForeverImpl[float64, int]() + testFail := false + + series.Reserve(1) + series.Reserve(2) + + series.Fill(1, 8) + series.Fill(2, 9) + + wg := sync.WaitGroup{} + + counter := 0 + + wg.Add(2) + go func() { + series.ForEach(func(b int, d *utilities.Optional[float64]) { + // All of these ++s should be done under a single lock of the lock and, therefore, + // the ForEach below should not start until both buckets are ForEach'd over! + counter++ + // Make this a long wait so we know that there is no chance for a race and that + // we are really testing what we mean to test! + time.Sleep(time.Second * 5) + }) + wg.Done() + }() + + time.Sleep(1 * time.Second) + + go func() { + series.ForEach(func(b int, d *utilities.Optional[float64]) { + if counter != 2 { + testFail = true + } + }) + wg.Done() + }() + + wg.Wait() + + if testFail { + test.Fatalf("Mutual exclusion checks did not properly lock out parallel ForEach operations.") + } +} |
