summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lgc/lgc.go77
-rw-r--r--networkQuality.go17
-rw-r--r--rpm/rpm.go24
-rw-r--r--utilities/utilities.go20
-rw-r--r--utilities/utilities_test.go30
5 files changed, 150 insertions, 18 deletions
diff --git a/lgc/lgc.go b/lgc/lgc.go
index 0b3f075..16f67d2 100644
--- a/lgc/lgc.go
+++ b/lgc/lgc.go
@@ -38,6 +38,7 @@ type LoadGeneratingConnection interface {
IsValid() bool
ClientId() uint64
Stats() *stats.TraceStats
+ WaitUntilStarted(context.Context) bool
}
type LoadGeneratingConnectionCollection struct {
@@ -49,6 +50,27 @@ func NewLoadGeneratingConnectionCollection() LoadGeneratingConnectionCollection
return LoadGeneratingConnectionCollection{LGCs: new([]LoadGeneratingConnection)}
}
+func (collection *LoadGeneratingConnectionCollection) Get(idx int) (*LoadGeneratingConnection, error) {
+ if collection.Lock.TryLock() {
+ collection.Lock.Unlock()
+ return nil, fmt.Errorf("collection is unlocked")
+ }
+
+ if idx > len(*collection.LGCs) {
+ return nil, fmt.Errorf("index too large")
+ }
+ return &(*collection.LGCs)[idx], nil
+}
+
+func (collection *LoadGeneratingConnectionCollection) Append(conn LoadGeneratingConnection) error {
+ if collection.Lock.TryLock() {
+ collection.Lock.Unlock()
+ return fmt.Errorf("collection is unlocked")
+ }
+ *collection.LGCs = append(*collection.LGCs, conn)
+ return nil
+}
+
// TODO: All 64-bit fields that are accessed atomically must
// appear at the top of this struct.
type LoadGeneratingConnectionDownload struct {
@@ -61,11 +83,31 @@ type LoadGeneratingConnectionDownload struct {
client *http.Client
debug debug.DebugLevel
valid bool
+ validLock *sync.Mutex
InsecureSkipVerify bool
KeyLogger io.Writer
clientId uint64
tracer *httptrace.ClientTrace
stats stats.TraceStats
+ validWaiter *sync.Cond
+}
+
+func NewLoadGeneratingConnectionDownload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionDownload {
+ lgd := LoadGeneratingConnectionDownload{
+ URL: url,
+ KeyLogger: keyLogger,
+ ConnectToAddr: connectToAddr,
+ InsecureSkipVerify: insecureSkipVerify,
+ validLock: &sync.Mutex{},
+ }
+ lgd.validWaiter = sync.NewCond(lgd.validLock)
+ return lgd
+}
+
+func (lgd *LoadGeneratingConnectionDownload) WaitUntilStarted(ctxt context.Context) bool {
+ conditional := func() bool { return lgd.valid }
+ go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgd.validWaiter)
+ return utilities.WaitWithContext(ctxt, &conditional, lgd.validLock, lgd.validWaiter)
}
func (lgd *LoadGeneratingConnectionDownload) SetDnsStartTimeInfo(
@@ -286,7 +328,9 @@ func (lgd *LoadGeneratingConnectionDownload) Start(
utilities.OverrideHostTransport(transport, lgd.ConnectToAddr)
lgd.client = &http.Client{Transport: transport}
+ lgd.validLock.Lock()
lgd.valid = true
+ lgd.validLock.Unlock()
lgd.tracer = traceable.GenerateHttpTimingTracer(lgd, lgd.debug)
if debug.IsDebug(lgd.debug) {
@@ -319,7 +363,9 @@ func (lgd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) {
lgd.URL,
nil,
); err != nil {
+ lgd.validLock.Lock()
lgd.valid = false
+ lgd.validLock.Unlock()
return
}
@@ -331,13 +377,17 @@ func (lgd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) {
lgd.lastIntervalEnd = 0
if get, err = lgd.client.Do(request); err != nil {
+ lgd.validLock.Lock()
lgd.valid = false
+ lgd.validLock.Unlock()
return
}
// Header.Get returns "" when not set
if get.Header.Get("Content-Encoding") != "" {
+ lgd.validLock.Lock()
lgd.valid = false
+ lgd.validLock.Unlock()
fmt.Printf("Content-Encoding header was set (compression not allowed)")
return
}
@@ -361,9 +411,29 @@ type LoadGeneratingConnectionUpload struct {
client *http.Client
debug debug.DebugLevel
valid bool
+ validLock *sync.Mutex
InsecureSkipVerify bool
KeyLogger io.Writer
clientId uint64
+ validWaiter *sync.Cond
+}
+
+func NewLoadGeneratingConnectionUpload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionUpload {
+ lgu := LoadGeneratingConnectionUpload{
+ URL: url,
+ KeyLogger: keyLogger,
+ ConnectToAddr: connectToAddr,
+ InsecureSkipVerify: insecureSkipVerify,
+ validLock: &sync.Mutex{},
+ }
+ lgu.validWaiter = sync.NewCond(lgu.validLock)
+ return lgu
+}
+
+func (lgu *LoadGeneratingConnectionUpload) WaitUntilStarted(ctxt context.Context) bool {
+ conditional := func() bool { return lgu.valid }
+ go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgu.validWaiter)
+ return utilities.WaitWithContext(ctxt, &conditional, lgu.validLock, lgu.validWaiter)
}
func (lgu *LoadGeneratingConnectionUpload) ClientId() uint64 {
@@ -417,7 +487,9 @@ func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) bool {
lgu.URL,
s,
); err != nil {
+ lgu.validLock.Lock()
lgu.valid = false
+ lgu.validLock.Unlock()
return false
}
@@ -429,7 +501,9 @@ func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) bool {
lgu.lastIntervalEnd = 0
if resp, err = lgu.client.Do(request); err != nil {
+ lgu.validLock.Lock()
lgu.valid = false
+ lgu.validLock.Unlock()
return false
}
@@ -467,7 +541,10 @@ func (lgu *LoadGeneratingConnectionUpload) Start(
utilities.OverrideHostTransport(transport, lgu.ConnectToAddr)
lgu.client = &http.Client{Transport: transport}
+
+ lgu.validLock.Lock()
lgu.valid = true
+ lgu.validLock.Unlock()
if debug.IsDebug(lgu.debug) {
fmt.Printf("Started a load-generating upload (id: %v).\n", lgu.clientId)
diff --git a/networkQuality.go b/networkQuality.go
index ef7543d..97f7cb4 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -353,20 +353,13 @@ func main() {
* will create load-generating connections for upload/download
*/
generate_lgd := func() lgc.LoadGeneratingConnection {
- return &lgc.LoadGeneratingConnectionDownload{
- URL: config.Urls.LargeUrl,
- KeyLogger: sslKeyFileConcurrentWriter,
- ConnectToAddr: config.ConnectToAddr,
- InsecureSkipVerify: *insecureSkipVerify,
- }
+ lgd := lgc.NewLoadGeneratingConnectionDownload(config.Urls.LargeUrl, sslKeyFileConcurrentWriter, config.ConnectToAddr, *insecureSkipVerify)
+ return &lgd
}
generate_lgu := func() lgc.LoadGeneratingConnection {
- return &lgc.LoadGeneratingConnectionUpload{
- URL: config.Urls.UploadUrl,
- KeyLogger: sslKeyFileConcurrentWriter,
- ConnectToAddr: config.ConnectToAddr,
- }
+ lgu := lgc.NewLoadGeneratingConnectionUpload(config.Urls.LargeUrl, sslKeyFileConcurrentWriter, config.ConnectToAddr, *insecureSkipVerify)
+ return &lgu
}
generateSelfProbeConfiguration := func() rpm.ProbeConfiguration {
@@ -416,7 +409,7 @@ func main() {
)
// Handles for the first connection that the load-generating go routines (both up and
- // download) open are passed because on the self[Down|Up]ProbeConnectionCommunicationChannel
+ // download) open are passed back on the self[Down|Up]ProbeConnectionCommunicationChannel
// so that we can then start probes on those handles.
selfDownProbeConnection := <-selfDownProbeConnectionCommunicationChannel
selfUpProbeConnection := <-selfUpProbeConnectionCommunicationChannel
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 3774c8f..db6f11e 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -45,13 +45,13 @@ func addFlows(
defer lgcc.Lock.Unlock()
for i := uint64(0); i < toAdd; i++ {
// First, generate the connection.
- *lgcc.LGCs = append(*lgcc.LGCs, lgcGenerator())
+ newGenerator := lgcGenerator()
+ lgcc.Append(newGenerator)
// Second, try to start the connection.
- if !(*lgcc.LGCs)[len(*lgcc.LGCs)-1].Start(ctx, debug) {
+ if !newGenerator.Start(ctx, debug) {
// If there was an error, we'll make sure that the caller knows it.
fmt.Printf(
- "Error starting lgc with id %d!\n",
- (*lgcc.LGCs)[len(*lgcc.LGCs)-1].ClientId(),
+ "Error starting lgc with id %d!\n", newGenerator.ClientId(),
)
return i
}
@@ -407,7 +407,19 @@ func LoadGenerator(
// We have at least a single load-generating channel. This channel will be the one that
// the self probes use. Let's send it back to the caller so that they can pass it on if they need to.
- probeConnectionCommunicationChannel <- (*loadGeneratingConnections.LGCs)[0]
+ go func() {
+ loadGeneratingConnections.Lock.Lock()
+ zerothConnection, err := loadGeneratingConnections.Get(0)
+ loadGeneratingConnections.Lock.Unlock()
+ if err != nil {
+ panic("Could not get the zeroth connection!\n")
+ }
+ if !(*zerothConnection).WaitUntilStarted(loadGeneratorCtx) {
+ fmt.Fprintf(os.Stderr, "Could not wait until the zeroth load-generating connection was started!\n")
+ return
+ }
+ probeConnectionCommunicationChannel <- *zerothConnection
+ }()
nextSampleStartTime := time.Now().Add(rampupInterval)
@@ -732,7 +744,7 @@ func (probe *ProbeTracer) SetGotConnTimeInfo(
if (probe.probeType == SelfUp || probe.probeType == SelfDown) && !gotConnInfo.Reused {
fmt.Fprintf(
os.Stderr,
- "A self probe sent used a new connection!\n",
+ "A self probe sent using a new connection!\n",
)
}
if gotConnInfo.Reused {
diff --git a/utilities/utilities.go b/utilities/utilities.go
index 377be56..e75d373 100644
--- a/utilities/utilities.go
+++ b/utilities/utilities.go
@@ -15,6 +15,7 @@
package utilities
import (
+ "context"
"fmt"
"math"
"math/rand"
@@ -22,6 +23,7 @@ import (
"reflect"
"sort"
"strings"
+ "sync"
"sync/atomic"
"time"
@@ -209,3 +211,21 @@ func ApproximatelyEqual[T float32 | float64](truth T, maybe T, fudge T) bool {
func UserAgent() string {
return fmt.Sprintf("goresponsiveness/%s", GitVersion)
}
+
+func WaitWithContext(ctxt context.Context, condition *func() bool, mu *sync.Mutex, c *sync.Cond) bool {
+ mu.Lock()
+ for !(*condition)() && ctxt.Err() == nil {
+ c.Wait()
+ }
+ return ctxt.Err() == nil
+}
+
+func ContextSignaler(ctxt context.Context, st time.Duration, condition *func() bool, c *sync.Cond) {
+ for !(*condition)() && ctxt.Err() == nil {
+ time.Sleep(st)
+ }
+ if ctxt.Err() != nil {
+ c.Broadcast()
+ return
+ }
+}
diff --git a/utilities/utilities_test.go b/utilities/utilities_test.go
index 3a84d76..aa66f6b 100644
--- a/utilities/utilities_test.go
+++ b/utilities/utilities_test.go
@@ -14,6 +14,7 @@
package utilities
import (
+ "context"
"log"
"sync"
"testing"
@@ -86,3 +87,32 @@ func TestFilenameAppend(t *testing.T) {
t.Fatalf("%s != %s for FilenameAppend.", expected, result)
}
}
+
+func TestWaitWithContext(t *testing.T) {
+ ctxt, canceller := context.WithCancel(context.Background())
+ never_true := func() bool { return false }
+ mu := sync.Mutex{}
+ cond := sync.NewCond(&mu)
+
+ wg := sync.WaitGroup{}
+
+ wg.Add(3)
+
+ go func() {
+ ContextSignaler(ctxt, 500*time.Millisecond, &never_true, cond)
+ wg.Done()
+ }()
+
+ go func() {
+ WaitWithContext(ctxt, &never_true, &mu, cond)
+ wg.Done()
+ }()
+
+ go func() {
+ time.Sleep(2 * time.Second)
+ canceller()
+ wg.Done()
+ }()
+
+ wg.Wait()
+}