diff options
| -rw-r--r-- | lgc/lgc.go | 77 | ||||
| -rw-r--r-- | networkQuality.go | 17 | ||||
| -rw-r--r-- | rpm/rpm.go | 24 | ||||
| -rw-r--r-- | utilities/utilities.go | 20 | ||||
| -rw-r--r-- | utilities/utilities_test.go | 30 |
5 files changed, 150 insertions, 18 deletions
@@ -38,6 +38,7 @@ type LoadGeneratingConnection interface { IsValid() bool ClientId() uint64 Stats() *stats.TraceStats + WaitUntilStarted(context.Context) bool } type LoadGeneratingConnectionCollection struct { @@ -49,6 +50,27 @@ func NewLoadGeneratingConnectionCollection() LoadGeneratingConnectionCollection return LoadGeneratingConnectionCollection{LGCs: new([]LoadGeneratingConnection)} } +func (collection *LoadGeneratingConnectionCollection) Get(idx int) (*LoadGeneratingConnection, error) { + if collection.Lock.TryLock() { + collection.Lock.Unlock() + return nil, fmt.Errorf("collection is unlocked") + } + + if idx > len(*collection.LGCs) { + return nil, fmt.Errorf("index too large") + } + return &(*collection.LGCs)[idx], nil +} + +func (collection *LoadGeneratingConnectionCollection) Append(conn LoadGeneratingConnection) error { + if collection.Lock.TryLock() { + collection.Lock.Unlock() + return fmt.Errorf("collection is unlocked") + } + *collection.LGCs = append(*collection.LGCs, conn) + return nil +} + // TODO: All 64-bit fields that are accessed atomically must // appear at the top of this struct. type LoadGeneratingConnectionDownload struct { @@ -61,11 +83,31 @@ type LoadGeneratingConnectionDownload struct { client *http.Client debug debug.DebugLevel valid bool + validLock *sync.Mutex InsecureSkipVerify bool KeyLogger io.Writer clientId uint64 tracer *httptrace.ClientTrace stats stats.TraceStats + validWaiter *sync.Cond +} + +func NewLoadGeneratingConnectionDownload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionDownload { + lgd := LoadGeneratingConnectionDownload{ + URL: url, + KeyLogger: keyLogger, + ConnectToAddr: connectToAddr, + InsecureSkipVerify: insecureSkipVerify, + validLock: &sync.Mutex{}, + } + lgd.validWaiter = sync.NewCond(lgd.validLock) + return lgd +} + +func (lgd *LoadGeneratingConnectionDownload) WaitUntilStarted(ctxt context.Context) bool { + conditional := func() bool { return lgd.valid } + go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgd.validWaiter) + return utilities.WaitWithContext(ctxt, &conditional, lgd.validLock, lgd.validWaiter) } func (lgd *LoadGeneratingConnectionDownload) SetDnsStartTimeInfo( @@ -286,7 +328,9 @@ func (lgd *LoadGeneratingConnectionDownload) Start( utilities.OverrideHostTransport(transport, lgd.ConnectToAddr) lgd.client = &http.Client{Transport: transport} + lgd.validLock.Lock() lgd.valid = true + lgd.validLock.Unlock() lgd.tracer = traceable.GenerateHttpTimingTracer(lgd, lgd.debug) if debug.IsDebug(lgd.debug) { @@ -319,7 +363,9 @@ func (lgd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) { lgd.URL, nil, ); err != nil { + lgd.validLock.Lock() lgd.valid = false + lgd.validLock.Unlock() return } @@ -331,13 +377,17 @@ func (lgd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) { lgd.lastIntervalEnd = 0 if get, err = lgd.client.Do(request); err != nil { + lgd.validLock.Lock() lgd.valid = false + lgd.validLock.Unlock() return } // Header.Get returns "" when not set if get.Header.Get("Content-Encoding") != "" { + lgd.validLock.Lock() lgd.valid = false + lgd.validLock.Unlock() fmt.Printf("Content-Encoding header was set (compression not allowed)") return } @@ -361,9 +411,29 @@ type LoadGeneratingConnectionUpload struct { client *http.Client debug debug.DebugLevel valid bool + validLock *sync.Mutex InsecureSkipVerify bool KeyLogger io.Writer clientId uint64 + validWaiter *sync.Cond +} + +func NewLoadGeneratingConnectionUpload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionUpload { + lgu := LoadGeneratingConnectionUpload{ + URL: url, + KeyLogger: keyLogger, + ConnectToAddr: connectToAddr, + InsecureSkipVerify: insecureSkipVerify, + validLock: &sync.Mutex{}, + } + lgu.validWaiter = sync.NewCond(lgu.validLock) + return lgu +} + +func (lgu *LoadGeneratingConnectionUpload) WaitUntilStarted(ctxt context.Context) bool { + conditional := func() bool { return lgu.valid } + go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgu.validWaiter) + return utilities.WaitWithContext(ctxt, &conditional, lgu.validLock, lgu.validWaiter) } func (lgu *LoadGeneratingConnectionUpload) ClientId() uint64 { @@ -417,7 +487,9 @@ func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) bool { lgu.URL, s, ); err != nil { + lgu.validLock.Lock() lgu.valid = false + lgu.validLock.Unlock() return false } @@ -429,7 +501,9 @@ func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) bool { lgu.lastIntervalEnd = 0 if resp, err = lgu.client.Do(request); err != nil { + lgu.validLock.Lock() lgu.valid = false + lgu.validLock.Unlock() return false } @@ -467,7 +541,10 @@ func (lgu *LoadGeneratingConnectionUpload) Start( utilities.OverrideHostTransport(transport, lgu.ConnectToAddr) lgu.client = &http.Client{Transport: transport} + + lgu.validLock.Lock() lgu.valid = true + lgu.validLock.Unlock() if debug.IsDebug(lgu.debug) { fmt.Printf("Started a load-generating upload (id: %v).\n", lgu.clientId) diff --git a/networkQuality.go b/networkQuality.go index ef7543d..97f7cb4 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -353,20 +353,13 @@ func main() { * will create load-generating connections for upload/download */ generate_lgd := func() lgc.LoadGeneratingConnection { - return &lgc.LoadGeneratingConnectionDownload{ - URL: config.Urls.LargeUrl, - KeyLogger: sslKeyFileConcurrentWriter, - ConnectToAddr: config.ConnectToAddr, - InsecureSkipVerify: *insecureSkipVerify, - } + lgd := lgc.NewLoadGeneratingConnectionDownload(config.Urls.LargeUrl, sslKeyFileConcurrentWriter, config.ConnectToAddr, *insecureSkipVerify) + return &lgd } generate_lgu := func() lgc.LoadGeneratingConnection { - return &lgc.LoadGeneratingConnectionUpload{ - URL: config.Urls.UploadUrl, - KeyLogger: sslKeyFileConcurrentWriter, - ConnectToAddr: config.ConnectToAddr, - } + lgu := lgc.NewLoadGeneratingConnectionUpload(config.Urls.LargeUrl, sslKeyFileConcurrentWriter, config.ConnectToAddr, *insecureSkipVerify) + return &lgu } generateSelfProbeConfiguration := func() rpm.ProbeConfiguration { @@ -416,7 +409,7 @@ func main() { ) // Handles for the first connection that the load-generating go routines (both up and - // download) open are passed because on the self[Down|Up]ProbeConnectionCommunicationChannel + // download) open are passed back on the self[Down|Up]ProbeConnectionCommunicationChannel // so that we can then start probes on those handles. selfDownProbeConnection := <-selfDownProbeConnectionCommunicationChannel selfUpProbeConnection := <-selfUpProbeConnectionCommunicationChannel @@ -45,13 +45,13 @@ func addFlows( defer lgcc.Lock.Unlock() for i := uint64(0); i < toAdd; i++ { // First, generate the connection. - *lgcc.LGCs = append(*lgcc.LGCs, lgcGenerator()) + newGenerator := lgcGenerator() + lgcc.Append(newGenerator) // Second, try to start the connection. - if !(*lgcc.LGCs)[len(*lgcc.LGCs)-1].Start(ctx, debug) { + if !newGenerator.Start(ctx, debug) { // If there was an error, we'll make sure that the caller knows it. fmt.Printf( - "Error starting lgc with id %d!\n", - (*lgcc.LGCs)[len(*lgcc.LGCs)-1].ClientId(), + "Error starting lgc with id %d!\n", newGenerator.ClientId(), ) return i } @@ -407,7 +407,19 @@ func LoadGenerator( // We have at least a single load-generating channel. This channel will be the one that // the self probes use. Let's send it back to the caller so that they can pass it on if they need to. - probeConnectionCommunicationChannel <- (*loadGeneratingConnections.LGCs)[0] + go func() { + loadGeneratingConnections.Lock.Lock() + zerothConnection, err := loadGeneratingConnections.Get(0) + loadGeneratingConnections.Lock.Unlock() + if err != nil { + panic("Could not get the zeroth connection!\n") + } + if !(*zerothConnection).WaitUntilStarted(loadGeneratorCtx) { + fmt.Fprintf(os.Stderr, "Could not wait until the zeroth load-generating connection was started!\n") + return + } + probeConnectionCommunicationChannel <- *zerothConnection + }() nextSampleStartTime := time.Now().Add(rampupInterval) @@ -732,7 +744,7 @@ func (probe *ProbeTracer) SetGotConnTimeInfo( if (probe.probeType == SelfUp || probe.probeType == SelfDown) && !gotConnInfo.Reused { fmt.Fprintf( os.Stderr, - "A self probe sent used a new connection!\n", + "A self probe sent using a new connection!\n", ) } if gotConnInfo.Reused { diff --git a/utilities/utilities.go b/utilities/utilities.go index 377be56..e75d373 100644 --- a/utilities/utilities.go +++ b/utilities/utilities.go @@ -15,6 +15,7 @@ package utilities import ( + "context" "fmt" "math" "math/rand" @@ -22,6 +23,7 @@ import ( "reflect" "sort" "strings" + "sync" "sync/atomic" "time" @@ -209,3 +211,21 @@ func ApproximatelyEqual[T float32 | float64](truth T, maybe T, fudge T) bool { func UserAgent() string { return fmt.Sprintf("goresponsiveness/%s", GitVersion) } + +func WaitWithContext(ctxt context.Context, condition *func() bool, mu *sync.Mutex, c *sync.Cond) bool { + mu.Lock() + for !(*condition)() && ctxt.Err() == nil { + c.Wait() + } + return ctxt.Err() == nil +} + +func ContextSignaler(ctxt context.Context, st time.Duration, condition *func() bool, c *sync.Cond) { + for !(*condition)() && ctxt.Err() == nil { + time.Sleep(st) + } + if ctxt.Err() != nil { + c.Broadcast() + return + } +} diff --git a/utilities/utilities_test.go b/utilities/utilities_test.go index 3a84d76..aa66f6b 100644 --- a/utilities/utilities_test.go +++ b/utilities/utilities_test.go @@ -14,6 +14,7 @@ package utilities import ( + "context" "log" "sync" "testing" @@ -86,3 +87,32 @@ func TestFilenameAppend(t *testing.T) { t.Fatalf("%s != %s for FilenameAppend.", expected, result) } } + +func TestWaitWithContext(t *testing.T) { + ctxt, canceller := context.WithCancel(context.Background()) + never_true := func() bool { return false } + mu := sync.Mutex{} + cond := sync.NewCond(&mu) + + wg := sync.WaitGroup{} + + wg.Add(3) + + go func() { + ContextSignaler(ctxt, 500*time.Millisecond, &never_true, cond) + wg.Done() + }() + + go func() { + WaitWithContext(ctxt, &never_true, &mu, cond) + wg.Done() + }() + + go func() { + time.Sleep(2 * time.Second) + canceller() + wg.Done() + }() + + wg.Wait() +} |
