diff options
Diffstat (limited to 'lgc')
| -rw-r--r-- | lgc/collection.go | 54 | ||||
| -rw-r--r-- | lgc/download.go | 373 | ||||
| -rw-r--r-- | lgc/lgc.go | 533 | ||||
| -rw-r--r-- | lgc/upload.go | 206 |
4 files changed, 641 insertions, 525 deletions
diff --git a/lgc/collection.go b/lgc/collection.go new file mode 100644 index 0000000..7560186 --- /dev/null +++ b/lgc/collection.go @@ -0,0 +1,54 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package lgc + +import ( + "fmt" + "sync" +) + +type LoadGeneratingConnectionCollection struct { + Lock sync.Mutex + LGCs *[]LoadGeneratingConnection +} + +func NewLoadGeneratingConnectionCollection() LoadGeneratingConnectionCollection { + return LoadGeneratingConnectionCollection{LGCs: new([]LoadGeneratingConnection)} +} + +func (collection *LoadGeneratingConnectionCollection) Get(idx int) (*LoadGeneratingConnection, error) { + if collection.Lock.TryLock() { + collection.Lock.Unlock() + return nil, fmt.Errorf("collection is unlocked") + } + + if idx > len(*collection.LGCs) { + return nil, fmt.Errorf("index too large") + } + return &(*collection.LGCs)[idx], nil +} + +func (collection *LoadGeneratingConnectionCollection) Append(conn LoadGeneratingConnection) error { + if collection.Lock.TryLock() { + collection.Lock.Unlock() + return fmt.Errorf("collection is unlocked") + } + *collection.LGCs = append(*collection.LGCs, conn) + return nil +} + +func (collection *LoadGeneratingConnectionCollection) Len() int { + return len(*collection.LGCs) +} diff --git a/lgc/download.go b/lgc/download.go new file mode 100644 index 0000000..71ed647 --- /dev/null +++ b/lgc/download.go @@ -0,0 +1,373 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package lgc + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net/http" + "net/http/httptrace" + "sync" + "sync/atomic" + "time" + + "github.com/network-quality/goresponsiveness/debug" + "github.com/network-quality/goresponsiveness/stats" + "github.com/network-quality/goresponsiveness/traceable" + "github.com/network-quality/goresponsiveness/utilities" +) + +// TODO: All 64-bit fields that are accessed atomically must +// appear at the top of this struct. +type LoadGeneratingConnectionDownload struct { + downloaded uint64 + lastIntervalEnd int64 + ConnectToAddr string + URL string + downloadStartTime time.Time + lastDownloaded uint64 + client *http.Client + debug debug.DebugLevel + InsecureSkipVerify bool + KeyLogger io.Writer + clientId uint64 + tracer *httptrace.ClientTrace + stats stats.TraceStats + status LgcStatus + statusLock *sync.Mutex + statusWaiter *sync.Cond +} + +func NewLoadGeneratingConnectionDownload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionDownload { + lgd := LoadGeneratingConnectionDownload{ + URL: url, + KeyLogger: keyLogger, + ConnectToAddr: connectToAddr, + InsecureSkipVerify: insecureSkipVerify, + statusLock: &sync.Mutex{}, + } + lgd.statusWaiter = sync.NewCond(lgd.statusLock) + return lgd +} + +func (lgd *LoadGeneratingConnectionDownload) WaitUntilStarted(ctxt context.Context) bool { + conditional := func() bool { return lgd.status != LGC_STATUS_NOT_STARTED } + go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgd.statusWaiter) + return utilities.WaitWithContext(ctxt, &conditional, lgd.statusLock, lgd.statusWaiter) +} + +func (lgd *LoadGeneratingConnectionDownload) SetDnsStartTimeInfo( + now time.Time, + dnsStartInfo httptrace.DNSStartInfo, +) { + lgd.stats.DnsStartTime = now + lgd.stats.DnsStart = dnsStartInfo + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "DNS Start for %v: %v\n", + lgd.ClientId(), + dnsStartInfo, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetDnsDoneTimeInfo( + now time.Time, + dnsDoneInfo httptrace.DNSDoneInfo, +) { + lgd.stats.DnsDoneTime = now + lgd.stats.DnsDone = dnsDoneInfo + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "DNS Done for %v: %v\n", + lgd.ClientId(), + lgd.stats.DnsDone, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetConnectStartTime( + now time.Time, +) { + lgd.stats.ConnectStartTime = now + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "TCP Start for %v at %v\n", + lgd.ClientId(), + lgd.stats.ConnectStartTime, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetConnectDoneTimeError( + now time.Time, + err error, +) { + lgd.stats.ConnectDoneTime = now + lgd.stats.ConnectDoneError = err + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "TCP Done for %v (with error %v) @ %v\n", + lgd.ClientId(), + lgd.stats.ConnectDoneError, + lgd.stats.ConnectDoneTime, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetGetConnTime(now time.Time) { + lgd.stats.GetConnectionStartTime = now + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Started getting connection for %v @ %v\n", + lgd.ClientId(), + lgd.stats.GetConnectionStartTime, + ) + } + lgd.statusLock.Lock() + lgd.status = LGC_STATUS_RUNNING + lgd.statusWaiter.Broadcast() + lgd.statusLock.Unlock() +} + +func (lgd *LoadGeneratingConnectionDownload) SetGotConnTimeInfo( + now time.Time, + gotConnInfo httptrace.GotConnInfo, +) { + if gotConnInfo.Reused { + fmt.Printf("Unexpectedly reusing a connection!\n") + panic(!gotConnInfo.Reused) + } + lgd.stats.GetConnectionDoneTime = now + lgd.stats.ConnInfo = gotConnInfo + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Got connection for %v at %v with info %v\n", + lgd.ClientId(), + lgd.stats.GetConnectionDoneTime, + lgd.stats.ConnInfo, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetTLSHandshakeStartTime( + now time.Time, +) { + lgd.stats.TLSStartTime = utilities.Some(now) + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Started TLS Handshake for %v @ %v\n", + lgd.ClientId(), + lgd.stats.TLSStartTime, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetTLSHandshakeDoneTimeState( + now time.Time, + connectionState tls.ConnectionState, +) { + lgd.stats.TLSDoneTime = utilities.Some(now) + lgd.stats.TLSConnInfo = connectionState + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Completed TLS handshake for %v at %v with info %v\n", + lgd.ClientId(), + lgd.stats.TLSDoneTime, + lgd.stats.TLSConnInfo, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetHttpWroteRequestTimeInfo( + now time.Time, + info httptrace.WroteRequestInfo, +) { + lgd.stats.HttpWroteRequestTime = now + lgd.stats.HttpInfo = info + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "(lgd) Http finished writing request for %v at %v with info %v\n", + lgd.ClientId(), + lgd.stats.HttpWroteRequestTime, + lgd.stats.HttpInfo, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetHttpResponseReadyTime( + now time.Time, +) { + lgd.stats.HttpResponseReadyTime = now + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Got the first byte of HTTP response headers for %v at %v\n", + lgd.ClientId(), + lgd.stats.HttpResponseReadyTime, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) ClientId() uint64 { + return lgd.clientId +} + +func (lgd *LoadGeneratingConnectionDownload) TransferredInInterval() (uint64, time.Duration) { + transferred := atomic.SwapUint64(&lgd.downloaded, 0) + newIntervalEnd := (time.Now().Sub(lgd.downloadStartTime)).Nanoseconds() + previousIntervalEnd := atomic.SwapInt64(&lgd.lastIntervalEnd, newIntervalEnd) + intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd) + if debug.IsDebug(lgd.debug) { + fmt.Printf("download: Transferred: %v bytes in %v.\n", transferred, intervalLength) + } + return transferred, intervalLength +} + +func (lgd *LoadGeneratingConnectionDownload) Client() *http.Client { + return lgd.client +} + +type countingReader struct { + n *uint64 + ctx context.Context + readable io.Reader +} + +func (cr *countingReader) Read(p []byte) (n int, err error) { + if cr.ctx.Err() != nil { + return 0, io.EOF + } + + n, err = cr.readable.Read(p) + atomic.AddUint64(cr.n, uint64(n)) + return +} + +func (lgd *LoadGeneratingConnectionDownload) Start( + parentCtx context.Context, + debugLevel debug.DebugLevel, +) bool { + lgd.downloaded = 0 + lgd.debug = debugLevel + lgd.clientId = utilities.GenerateUniqueId() + + transport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: lgd.InsecureSkipVerify, + }, + } + + if !utilities.IsInterfaceNil(lgd.KeyLogger) { + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Using an SSL Key Logger for this load-generating download.\n", + ) + } + + // The presence of a custom TLSClientConfig in a *generic* `transport` + // means that go will default to HTTP/1.1 and cowardly avoid HTTP/2: + // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278 + // Also, it would appear that the API's choice of HTTP vs HTTP2 can + // depend on whether the url contains + // https:// or http://: + // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L74 + transport.TLSClientConfig.KeyLogWriter = lgd.KeyLogger + } + transport.TLSClientConfig.InsecureSkipVerify = lgd.InsecureSkipVerify + + utilities.OverrideHostTransport(transport, lgd.ConnectToAddr) + + lgd.client = &http.Client{Transport: transport} + lgd.tracer = traceable.GenerateHttpTimingTracer(lgd, lgd.debug) + + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Started a load-generating download (id: %v).\n", + lgd.clientId, + ) + } + + go lgd.doDownload(parentCtx) + return true +} + +func (lgd *LoadGeneratingConnectionDownload) Status() LgcStatus { + return lgd.status +} + +func (lgd *LoadGeneratingConnectionDownload) Stats() *stats.TraceStats { + return &lgd.stats +} + +func (lgd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) error { + var request *http.Request = nil + var get *http.Response = nil + var err error = nil + + if request, err = http.NewRequestWithContext( + httptrace.WithClientTrace(ctx, lgd.tracer), + "GET", + lgd.URL, + nil, + ); err != nil { + lgd.statusLock.Lock() + lgd.status = LGC_STATUS_ERROR + lgd.statusWaiter.Broadcast() + lgd.statusLock.Unlock() + return err + } + + // Used to disable compression + request.Header.Set("Accept-Encoding", "identity") + request.Header.Set("User-Agent", utilities.UserAgent()) + + lgd.downloadStartTime = time.Now() + lgd.lastIntervalEnd = 0 + + if get, err = lgd.client.Do(request); err != nil { + lgd.statusLock.Lock() + lgd.status = LGC_STATUS_ERROR + lgd.statusWaiter.Broadcast() + lgd.statusLock.Unlock() + return err + } + + // Header.Get returns "" when not set + if get.Header.Get("Content-Encoding") != "" { + lgd.statusLock.Lock() + lgd.status = LGC_STATUS_ERROR + lgd.statusWaiter.Broadcast() + lgd.statusLock.Unlock() + fmt.Printf("Content-Encoding header was set (compression not allowed)") + return fmt.Errorf("Content-Encoding header was set (compression not allowed)") + } + cr := &countingReader{n: &lgd.downloaded, ctx: ctx, readable: get.Body} + _, _ = io.Copy(io.Discard, cr) + + lgd.statusLock.Lock() + lgd.status = LGC_STATUS_DONE + lgd.statusWaiter.Broadcast() + lgd.statusLock.Unlock() + + get.Body.Close() + if debug.IsDebug(lgd.debug) { + fmt.Printf("Ending a load-generating download.\n") + } + + return nil +} @@ -16,545 +16,28 @@ package lgc import ( "context" - "crypto/tls" - "fmt" - "io" "net/http" - "net/http/httptrace" - "sync" - "sync/atomic" "time" "github.com/network-quality/goresponsiveness/debug" "github.com/network-quality/goresponsiveness/stats" - "github.com/network-quality/goresponsiveness/traceable" - "github.com/network-quality/goresponsiveness/utilities" ) type LoadGeneratingConnection interface { Start(context.Context, debug.DebugLevel) bool TransferredInInterval() (uint64, time.Duration) Client() *http.Client - IsValid() bool + Status() LgcStatus ClientId() uint64 Stats() *stats.TraceStats WaitUntilStarted(context.Context) bool } -type LoadGeneratingConnectionCollection struct { - Lock sync.Mutex - LGCs *[]LoadGeneratingConnection -} - -func NewLoadGeneratingConnectionCollection() LoadGeneratingConnectionCollection { - return LoadGeneratingConnectionCollection{LGCs: new([]LoadGeneratingConnection)} -} - -func (collection *LoadGeneratingConnectionCollection) Get(idx int) (*LoadGeneratingConnection, error) { - if collection.Lock.TryLock() { - collection.Lock.Unlock() - return nil, fmt.Errorf("collection is unlocked") - } - - if idx > len(*collection.LGCs) { - return nil, fmt.Errorf("index too large") - } - return &(*collection.LGCs)[idx], nil -} - -func (collection *LoadGeneratingConnectionCollection) Append(conn LoadGeneratingConnection) error { - if collection.Lock.TryLock() { - collection.Lock.Unlock() - return fmt.Errorf("collection is unlocked") - } - *collection.LGCs = append(*collection.LGCs, conn) - return nil -} - -// TODO: All 64-bit fields that are accessed atomically must -// appear at the top of this struct. -type LoadGeneratingConnectionDownload struct { - downloaded uint64 - lastIntervalEnd int64 - ConnectToAddr string - URL string - downloadStartTime time.Time - lastDownloaded uint64 - client *http.Client - debug debug.DebugLevel - valid bool - validLock *sync.Mutex - InsecureSkipVerify bool - KeyLogger io.Writer - clientId uint64 - tracer *httptrace.ClientTrace - stats stats.TraceStats - validWaiter *sync.Cond -} - -func NewLoadGeneratingConnectionDownload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionDownload { - lgd := LoadGeneratingConnectionDownload{ - URL: url, - KeyLogger: keyLogger, - ConnectToAddr: connectToAddr, - InsecureSkipVerify: insecureSkipVerify, - validLock: &sync.Mutex{}, - } - lgd.validWaiter = sync.NewCond(lgd.validLock) - return lgd -} - -func (lgd *LoadGeneratingConnectionDownload) WaitUntilStarted(ctxt context.Context) bool { - conditional := func() bool { return lgd.valid } - go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgd.validWaiter) - return utilities.WaitWithContext(ctxt, &conditional, lgd.validLock, lgd.validWaiter) -} - -func (lgd *LoadGeneratingConnectionDownload) SetDnsStartTimeInfo( - now time.Time, - dnsStartInfo httptrace.DNSStartInfo, -) { - lgd.stats.DnsStartTime = now - lgd.stats.DnsStart = dnsStartInfo - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "DNS Start for %v: %v\n", - lgd.ClientId(), - dnsStartInfo, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) SetDnsDoneTimeInfo( - now time.Time, - dnsDoneInfo httptrace.DNSDoneInfo, -) { - lgd.stats.DnsDoneTime = now - lgd.stats.DnsDone = dnsDoneInfo - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "DNS Done for %v: %v\n", - lgd.ClientId(), - lgd.stats.DnsDone, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) SetConnectStartTime( - now time.Time, -) { - lgd.stats.ConnectStartTime = now - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "TCP Start for %v at %v\n", - lgd.ClientId(), - lgd.stats.ConnectStartTime, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) SetConnectDoneTimeError( - now time.Time, - err error, -) { - lgd.stats.ConnectDoneTime = now - lgd.stats.ConnectDoneError = err - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "TCP Done for %v (with error %v) @ %v\n", - lgd.ClientId(), - lgd.stats.ConnectDoneError, - lgd.stats.ConnectDoneTime, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) SetGetConnTime(now time.Time) { - lgd.stats.GetConnectionStartTime = now - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "Started getting connection for %v @ %v\n", - lgd.ClientId(), - lgd.stats.GetConnectionStartTime, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) SetGotConnTimeInfo( - now time.Time, - gotConnInfo httptrace.GotConnInfo, -) { - if gotConnInfo.Reused { - fmt.Printf("Unexpectedly reusing a connection!\n") - panic(!gotConnInfo.Reused) - } - lgd.stats.GetConnectionDoneTime = now - lgd.stats.ConnInfo = gotConnInfo - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "Got connection for %v at %v with info %v\n", - lgd.ClientId(), - lgd.stats.GetConnectionDoneTime, - lgd.stats.ConnInfo, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) SetTLSHandshakeStartTime( - now time.Time, -) { - lgd.stats.TLSStartTime = utilities.Some(now) - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "Started TLS Handshake for %v @ %v\n", - lgd.ClientId(), - lgd.stats.TLSStartTime, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) SetTLSHandshakeDoneTimeState( - now time.Time, - connectionState tls.ConnectionState, -) { - lgd.stats.TLSDoneTime = utilities.Some(now) - lgd.stats.TLSConnInfo = connectionState - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "Completed TLS handshake for %v at %v with info %v\n", - lgd.ClientId(), - lgd.stats.TLSDoneTime, - lgd.stats.TLSConnInfo, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) SetHttpWroteRequestTimeInfo( - now time.Time, - info httptrace.WroteRequestInfo, -) { - lgd.stats.HttpWroteRequestTime = now - lgd.stats.HttpInfo = info - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "(lgd) Http finished writing request for %v at %v with info %v\n", - lgd.ClientId(), - lgd.stats.HttpWroteRequestTime, - lgd.stats.HttpInfo, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) SetHttpResponseReadyTime( - now time.Time, -) { - lgd.stats.HttpResponseReadyTime = now - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "Got the first byte of HTTP response headers for %v at %v\n", - lgd.ClientId(), - lgd.stats.HttpResponseReadyTime, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) ClientId() uint64 { - return lgd.clientId -} - -func (lgd *LoadGeneratingConnectionDownload) TransferredInInterval() (uint64, time.Duration) { - transferred := atomic.SwapUint64(&lgd.downloaded, 0) - newIntervalEnd := (time.Now().Sub(lgd.downloadStartTime)).Nanoseconds() - previousIntervalEnd := atomic.SwapInt64(&lgd.lastIntervalEnd, newIntervalEnd) - intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd) - if debug.IsDebug(lgd.debug) { - fmt.Printf("download: Transferred: %v bytes in %v.\n", transferred, intervalLength) - } - return transferred, intervalLength -} - -func (lgd *LoadGeneratingConnectionDownload) Client() *http.Client { - return lgd.client -} - -type countingReader struct { - n *uint64 - ctx context.Context - readable io.Reader -} - -func (cr *countingReader) Read(p []byte) (n int, err error) { - if cr.ctx.Err() != nil { - return 0, io.EOF - } - n, err = cr.readable.Read(p) - atomic.AddUint64(cr.n, uint64(n)) - return -} - -func (lgd *LoadGeneratingConnectionDownload) Start( - parentCtx context.Context, - debugLevel debug.DebugLevel, -) bool { - lgd.downloaded = 0 - lgd.debug = debugLevel - lgd.clientId = utilities.GenerateUniqueId() - - transport := &http.Transport{ - Proxy: http.ProxyFromEnvironment, - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: lgd.InsecureSkipVerify, - }, - } - - if !utilities.IsInterfaceNil(lgd.KeyLogger) { - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "Using an SSL Key Logger for this load-generating download.\n", - ) - } - - // The presence of a custom TLSClientConfig in a *generic* `transport` - // means that go will default to HTTP/1.1 and cowardly avoid HTTP/2: - // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278 - // Also, it would appear that the API's choice of HTTP vs HTTP2 can - // depend on whether the url contains - // https:// or http://: - // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L74 - transport.TLSClientConfig.KeyLogWriter = lgd.KeyLogger - } - transport.TLSClientConfig.InsecureSkipVerify = lgd.InsecureSkipVerify - - utilities.OverrideHostTransport(transport, lgd.ConnectToAddr) - - lgd.client = &http.Client{Transport: transport} - lgd.validLock.Lock() - lgd.valid = true - lgd.validLock.Unlock() - lgd.tracer = traceable.GenerateHttpTimingTracer(lgd, lgd.debug) - - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "Started a load-generating download (id: %v).\n", - lgd.clientId, - ) - } +type LgcStatus int - go lgd.doDownload(parentCtx) - return true -} - -func (lgd *LoadGeneratingConnectionDownload) IsValid() bool { - return lgd.valid -} - -func (lgd *LoadGeneratingConnectionDownload) Stats() *stats.TraceStats { - return &lgd.stats -} - -func (lgd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) { - var request *http.Request = nil - var get *http.Response = nil - var err error = nil - - if request, err = http.NewRequestWithContext( - httptrace.WithClientTrace(ctx, lgd.tracer), - "GET", - lgd.URL, - nil, - ); err != nil { - lgd.validLock.Lock() - lgd.valid = false - lgd.validLock.Unlock() - return - } - - // Used to disable compression - request.Header.Set("Accept-Encoding", "identity") - request.Header.Set("User-Agent", utilities.UserAgent()) - - lgd.downloadStartTime = time.Now() - lgd.lastIntervalEnd = 0 - - if get, err = lgd.client.Do(request); err != nil { - lgd.validLock.Lock() - lgd.valid = false - lgd.validLock.Unlock() - return - } - - // Header.Get returns "" when not set - if get.Header.Get("Content-Encoding") != "" { - lgd.validLock.Lock() - lgd.valid = false - lgd.validLock.Unlock() - fmt.Printf("Content-Encoding header was set (compression not allowed)") - return - } - cr := &countingReader{n: &lgd.downloaded, ctx: ctx, readable: get.Body} - _, _ = io.Copy(io.Discard, cr) - get.Body.Close() - if debug.IsDebug(lgd.debug) { - fmt.Printf("Ending a load-generating download.\n") - } -} - -// TODO: All 64-bit fields that are accessed atomically must -// appear at the top of this struct. -type LoadGeneratingConnectionUpload struct { - uploaded uint64 - lastIntervalEnd int64 - URL string - ConnectToAddr string - uploadStartTime time.Time - lastUploaded uint64 - client *http.Client - debug debug.DebugLevel - valid bool - validLock *sync.Mutex - InsecureSkipVerify bool - KeyLogger io.Writer - clientId uint64 - validWaiter *sync.Cond -} - -func NewLoadGeneratingConnectionUpload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionUpload { - lgu := LoadGeneratingConnectionUpload{ - URL: url, - KeyLogger: keyLogger, - ConnectToAddr: connectToAddr, - InsecureSkipVerify: insecureSkipVerify, - validLock: &sync.Mutex{}, - } - lgu.validWaiter = sync.NewCond(lgu.validLock) - return lgu -} - -func (lgu *LoadGeneratingConnectionUpload) WaitUntilStarted(ctxt context.Context) bool { - conditional := func() bool { return lgu.valid } - go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgu.validWaiter) - return utilities.WaitWithContext(ctxt, &conditional, lgu.validLock, lgu.validWaiter) -} - -func (lgu *LoadGeneratingConnectionUpload) ClientId() uint64 { - return lgu.clientId -} - -func (lgu *LoadGeneratingConnectionUpload) TransferredInInterval() (uint64, time.Duration) { - transferred := atomic.SwapUint64(&lgu.uploaded, 0) - newIntervalEnd := (time.Now().Sub(lgu.uploadStartTime)).Nanoseconds() - previousIntervalEnd := atomic.SwapInt64(&lgu.lastIntervalEnd, newIntervalEnd) - intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd) - if debug.IsDebug(lgu.debug) { - fmt.Printf("upload: Transferred: %v bytes in %v.\n", transferred, intervalLength) - } - return transferred, intervalLength -} - -func (lgu *LoadGeneratingConnectionUpload) Client() *http.Client { - return lgu.client -} - -func (lgu *LoadGeneratingConnectionUpload) IsValid() bool { - return lgu.valid -} - -type syntheticCountingReader struct { - n *uint64 - ctx context.Context -} - -func (s *syntheticCountingReader) Read(p []byte) (n int, err error) { - if s.ctx.Err() != nil { - return 0, io.EOF - } - err = nil - n = len(p) - - atomic.AddUint64(s.n, uint64(n)) - return -} - -func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) bool { - lgu.uploaded = 0 - s := &syntheticCountingReader{n: &lgu.uploaded, ctx: ctx} - var resp *http.Response = nil - var request *http.Request = nil - var err error - - if request, err = http.NewRequest( - "POST", - lgu.URL, - s, - ); err != nil { - lgu.validLock.Lock() - lgu.valid = false - lgu.validLock.Unlock() - return false - } - - // Used to disable compression - request.Header.Set("Accept-Encoding", "identity") - request.Header.Set("User-Agent", utilities.UserAgent()) - - lgu.uploadStartTime = time.Now() - lgu.lastIntervalEnd = 0 - - if resp, err = lgu.client.Do(request); err != nil { - lgu.validLock.Lock() - lgu.valid = false - lgu.validLock.Unlock() - return false - } - - resp.Body.Close() - if debug.IsDebug(lgu.debug) { - fmt.Printf("Ending a load-generating upload.\n") - } - return true -} - -func (lgu *LoadGeneratingConnectionUpload) Start( - parentCtx context.Context, - debugLevel debug.DebugLevel, -) bool { - lgu.uploaded = 0 - lgu.clientId = utilities.GenerateUniqueId() - lgu.debug = debugLevel - - transport := &http.Transport{ - Proxy: http.ProxyFromEnvironment, - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: lgu.InsecureSkipVerify, - }, - } - - if !utilities.IsInterfaceNil(lgu.KeyLogger) { - if debug.IsDebug(lgu.debug) { - fmt.Printf( - "Using an SSL Key Logger for this load-generating upload.\n", - ) - } - transport.TLSClientConfig.KeyLogWriter = lgu.KeyLogger - } - - utilities.OverrideHostTransport(transport, lgu.ConnectToAddr) - - lgu.client = &http.Client{Transport: transport} - - lgu.validLock.Lock() - lgu.valid = true - lgu.validLock.Unlock() - - if debug.IsDebug(lgu.debug) { - fmt.Printf("Started a load-generating upload (id: %v).\n", lgu.clientId) - } - - go lgu.doUpload(parentCtx) - return true -} - -func (lgu *LoadGeneratingConnectionUpload) Stats() *stats.TraceStats { - // Get all your stats from the download side of the LGC. - return nil -} +const ( + LGC_STATUS_NOT_STARTED LgcStatus = iota + LGC_STATUS_RUNNING + LGC_STATUS_DONE + LGC_STATUS_ERROR +) diff --git a/lgc/upload.go b/lgc/upload.go new file mode 100644 index 0000000..f0c772e --- /dev/null +++ b/lgc/upload.go @@ -0,0 +1,206 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package lgc + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net/http" + "sync" + "sync/atomic" + "time" + + "github.com/network-quality/goresponsiveness/debug" + "github.com/network-quality/goresponsiveness/stats" + "github.com/network-quality/goresponsiveness/utilities" +) + +// TODO: All 64-bit fields that are accessed atomically must +// appear at the top of this struct. +type LoadGeneratingConnectionUpload struct { + uploaded uint64 + lastIntervalEnd int64 + URL string + ConnectToAddr string + uploadStartTime time.Time + lastUploaded uint64 + client *http.Client + debug debug.DebugLevel + InsecureSkipVerify bool + KeyLogger io.Writer + clientId uint64 + status LgcStatus + statusLock *sync.Mutex + statusWaiter *sync.Cond +} + +func NewLoadGeneratingConnectionUpload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionUpload { + lgu := LoadGeneratingConnectionUpload{ + URL: url, + KeyLogger: keyLogger, + ConnectToAddr: connectToAddr, + InsecureSkipVerify: insecureSkipVerify, + statusLock: &sync.Mutex{}, + } + lgu.status = LGC_STATUS_NOT_STARTED + lgu.statusWaiter = sync.NewCond(lgu.statusLock) + return lgu +} + +func (lgu *LoadGeneratingConnectionUpload) WaitUntilStarted(ctxt context.Context) bool { + conditional := func() bool { return lgu.status != LGC_STATUS_NOT_STARTED } + go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgu.statusWaiter) + return utilities.WaitWithContext(ctxt, &conditional, lgu.statusLock, lgu.statusWaiter) +} + +func (lgu *LoadGeneratingConnectionUpload) ClientId() uint64 { + return lgu.clientId +} + +func (lgu *LoadGeneratingConnectionUpload) TransferredInInterval() (uint64, time.Duration) { + transferred := atomic.SwapUint64(&lgu.uploaded, 0) + newIntervalEnd := (time.Now().Sub(lgu.uploadStartTime)).Nanoseconds() + previousIntervalEnd := atomic.SwapInt64(&lgu.lastIntervalEnd, newIntervalEnd) + intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd) + if debug.IsDebug(lgu.debug) { + fmt.Printf("upload: Transferred: %v bytes in %v.\n", transferred, intervalLength) + } + return transferred, intervalLength +} + +func (lgu *LoadGeneratingConnectionUpload) Client() *http.Client { + return lgu.client +} + +func (lgu *LoadGeneratingConnectionUpload) Status() LgcStatus { + return lgu.status +} + +type syntheticCountingReader struct { + n *uint64 + ctx context.Context + lgu *LoadGeneratingConnectionUpload +} + +func (s *syntheticCountingReader) Read(p []byte) (n int, err error) { + if s.ctx.Err() != nil { + return 0, io.EOF + } + if n == 0 { + s.lgu.statusLock.Lock() + s.lgu.status = LGC_STATUS_RUNNING + s.lgu.statusWaiter.Broadcast() + s.lgu.statusLock.Unlock() + } + err = nil + n = len(p) + + atomic.AddUint64(s.n, uint64(n)) + return +} + +func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) error { + lgu.uploaded = 0 + s := &syntheticCountingReader{n: &lgu.uploaded, ctx: ctx, lgu: lgu} + var resp *http.Response = nil + var request *http.Request = nil + var err error + + if request, err = http.NewRequest( + "POST", + lgu.URL, + s, + ); err != nil { + lgu.statusLock.Lock() + lgu.status = LGC_STATUS_ERROR + lgu.statusWaiter.Broadcast() + lgu.statusLock.Unlock() + return err + } + + // Used to disable compression + request.Header.Set("Accept-Encoding", "identity") + request.Header.Set("User-Agent", utilities.UserAgent()) + + lgu.uploadStartTime = time.Now() + lgu.lastIntervalEnd = 0 + + lgu.statusLock.Lock() + lgu.status = LGC_STATUS_RUNNING + lgu.statusWaiter.Broadcast() + lgu.statusLock.Unlock() + + if resp, err = lgu.client.Do(request); err != nil { + lgu.statusLock.Lock() + lgu.status = LGC_STATUS_ERROR + lgu.statusWaiter.Broadcast() + lgu.statusLock.Unlock() + return err + } + + lgu.statusLock.Lock() + lgu.status = LGC_STATUS_DONE + lgu.statusWaiter.Broadcast() + lgu.statusLock.Unlock() + + resp.Body.Close() + if debug.IsDebug(lgu.debug) { + fmt.Printf("Ending a load-generating upload.\n") + } + return nil +} + +func (lgu *LoadGeneratingConnectionUpload) Start( + parentCtx context.Context, + debugLevel debug.DebugLevel, +) bool { + lgu.uploaded = 0 + lgu.clientId = utilities.GenerateUniqueId() + lgu.debug = debugLevel + + transport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: lgu.InsecureSkipVerify, + }, + } + + if !utilities.IsInterfaceNil(lgu.KeyLogger) { + if debug.IsDebug(lgu.debug) { + fmt.Printf( + "Using an SSL Key Logger for this load-generating upload.\n", + ) + } + transport.TLSClientConfig.KeyLogWriter = lgu.KeyLogger + } + + utilities.OverrideHostTransport(transport, lgu.ConnectToAddr) + + lgu.client = &http.Client{Transport: transport} + + if debug.IsDebug(lgu.debug) { + fmt.Printf("Started a load-generating upload (id: %v).\n", lgu.clientId) + } + + go lgu.doUpload(parentCtx) + return true +} + +func (lgu *LoadGeneratingConnectionUpload) Stats() *stats.TraceStats { + // Get all your stats from the download side of the LGC. + return nil +} |
