summaryrefslogtreecommitdiff
path: root/lgc/lgc.go
diff options
context:
space:
mode:
Diffstat (limited to 'lgc/lgc.go')
-rw-r--r--lgc/lgc.go77
1 files changed, 77 insertions, 0 deletions
diff --git a/lgc/lgc.go b/lgc/lgc.go
index 0b3f075..16f67d2 100644
--- a/lgc/lgc.go
+++ b/lgc/lgc.go
@@ -38,6 +38,7 @@ type LoadGeneratingConnection interface {
IsValid() bool
ClientId() uint64
Stats() *stats.TraceStats
+ WaitUntilStarted(context.Context) bool
}
type LoadGeneratingConnectionCollection struct {
@@ -49,6 +50,27 @@ func NewLoadGeneratingConnectionCollection() LoadGeneratingConnectionCollection
return LoadGeneratingConnectionCollection{LGCs: new([]LoadGeneratingConnection)}
}
+func (collection *LoadGeneratingConnectionCollection) Get(idx int) (*LoadGeneratingConnection, error) {
+ if collection.Lock.TryLock() {
+ collection.Lock.Unlock()
+ return nil, fmt.Errorf("collection is unlocked")
+ }
+
+ if idx > len(*collection.LGCs) {
+ return nil, fmt.Errorf("index too large")
+ }
+ return &(*collection.LGCs)[idx], nil
+}
+
+func (collection *LoadGeneratingConnectionCollection) Append(conn LoadGeneratingConnection) error {
+ if collection.Lock.TryLock() {
+ collection.Lock.Unlock()
+ return fmt.Errorf("collection is unlocked")
+ }
+ *collection.LGCs = append(*collection.LGCs, conn)
+ return nil
+}
+
// TODO: All 64-bit fields that are accessed atomically must
// appear at the top of this struct.
type LoadGeneratingConnectionDownload struct {
@@ -61,11 +83,31 @@ type LoadGeneratingConnectionDownload struct {
client *http.Client
debug debug.DebugLevel
valid bool
+ validLock *sync.Mutex
InsecureSkipVerify bool
KeyLogger io.Writer
clientId uint64
tracer *httptrace.ClientTrace
stats stats.TraceStats
+ validWaiter *sync.Cond
+}
+
+func NewLoadGeneratingConnectionDownload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionDownload {
+ lgd := LoadGeneratingConnectionDownload{
+ URL: url,
+ KeyLogger: keyLogger,
+ ConnectToAddr: connectToAddr,
+ InsecureSkipVerify: insecureSkipVerify,
+ validLock: &sync.Mutex{},
+ }
+ lgd.validWaiter = sync.NewCond(lgd.validLock)
+ return lgd
+}
+
+func (lgd *LoadGeneratingConnectionDownload) WaitUntilStarted(ctxt context.Context) bool {
+ conditional := func() bool { return lgd.valid }
+ go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgd.validWaiter)
+ return utilities.WaitWithContext(ctxt, &conditional, lgd.validLock, lgd.validWaiter)
}
func (lgd *LoadGeneratingConnectionDownload) SetDnsStartTimeInfo(
@@ -286,7 +328,9 @@ func (lgd *LoadGeneratingConnectionDownload) Start(
utilities.OverrideHostTransport(transport, lgd.ConnectToAddr)
lgd.client = &http.Client{Transport: transport}
+ lgd.validLock.Lock()
lgd.valid = true
+ lgd.validLock.Unlock()
lgd.tracer = traceable.GenerateHttpTimingTracer(lgd, lgd.debug)
if debug.IsDebug(lgd.debug) {
@@ -319,7 +363,9 @@ func (lgd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) {
lgd.URL,
nil,
); err != nil {
+ lgd.validLock.Lock()
lgd.valid = false
+ lgd.validLock.Unlock()
return
}
@@ -331,13 +377,17 @@ func (lgd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) {
lgd.lastIntervalEnd = 0
if get, err = lgd.client.Do(request); err != nil {
+ lgd.validLock.Lock()
lgd.valid = false
+ lgd.validLock.Unlock()
return
}
// Header.Get returns "" when not set
if get.Header.Get("Content-Encoding") != "" {
+ lgd.validLock.Lock()
lgd.valid = false
+ lgd.validLock.Unlock()
fmt.Printf("Content-Encoding header was set (compression not allowed)")
return
}
@@ -361,9 +411,29 @@ type LoadGeneratingConnectionUpload struct {
client *http.Client
debug debug.DebugLevel
valid bool
+ validLock *sync.Mutex
InsecureSkipVerify bool
KeyLogger io.Writer
clientId uint64
+ validWaiter *sync.Cond
+}
+
+func NewLoadGeneratingConnectionUpload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionUpload {
+ lgu := LoadGeneratingConnectionUpload{
+ URL: url,
+ KeyLogger: keyLogger,
+ ConnectToAddr: connectToAddr,
+ InsecureSkipVerify: insecureSkipVerify,
+ validLock: &sync.Mutex{},
+ }
+ lgu.validWaiter = sync.NewCond(lgu.validLock)
+ return lgu
+}
+
+func (lgu *LoadGeneratingConnectionUpload) WaitUntilStarted(ctxt context.Context) bool {
+ conditional := func() bool { return lgu.valid }
+ go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgu.validWaiter)
+ return utilities.WaitWithContext(ctxt, &conditional, lgu.validLock, lgu.validWaiter)
}
func (lgu *LoadGeneratingConnectionUpload) ClientId() uint64 {
@@ -417,7 +487,9 @@ func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) bool {
lgu.URL,
s,
); err != nil {
+ lgu.validLock.Lock()
lgu.valid = false
+ lgu.validLock.Unlock()
return false
}
@@ -429,7 +501,9 @@ func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) bool {
lgu.lastIntervalEnd = 0
if resp, err = lgu.client.Do(request); err != nil {
+ lgu.validLock.Lock()
lgu.valid = false
+ lgu.validLock.Unlock()
return false
}
@@ -467,7 +541,10 @@ func (lgu *LoadGeneratingConnectionUpload) Start(
utilities.OverrideHostTransport(transport, lgu.ConnectToAddr)
lgu.client = &http.Client{Transport: transport}
+
+ lgu.validLock.Lock()
lgu.valid = true
+ lgu.validLock.Unlock()
if debug.IsDebug(lgu.debug) {
fmt.Printf("Started a load-generating upload (id: %v).\n", lgu.clientId)