diff options
| -rw-r--r-- | lgc/collection.go | 54 | ||||
| -rw-r--r-- | lgc/download.go | 373 | ||||
| -rw-r--r-- | lgc/lgc.go | 533 | ||||
| -rw-r--r-- | lgc/upload.go | 206 | ||||
| -rw-r--r-- | networkQuality.go | 44 | ||||
| -rw-r--r-- | rpm/rpm.go | 596 | ||||
| -rw-r--r-- | stabilizer/rev3.go | 3 |
7 files changed, 748 insertions, 1061 deletions
diff --git a/lgc/collection.go b/lgc/collection.go new file mode 100644 index 0000000..7560186 --- /dev/null +++ b/lgc/collection.go @@ -0,0 +1,54 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package lgc + +import ( + "fmt" + "sync" +) + +type LoadGeneratingConnectionCollection struct { + Lock sync.Mutex + LGCs *[]LoadGeneratingConnection +} + +func NewLoadGeneratingConnectionCollection() LoadGeneratingConnectionCollection { + return LoadGeneratingConnectionCollection{LGCs: new([]LoadGeneratingConnection)} +} + +func (collection *LoadGeneratingConnectionCollection) Get(idx int) (*LoadGeneratingConnection, error) { + if collection.Lock.TryLock() { + collection.Lock.Unlock() + return nil, fmt.Errorf("collection is unlocked") + } + + if idx > len(*collection.LGCs) { + return nil, fmt.Errorf("index too large") + } + return &(*collection.LGCs)[idx], nil +} + +func (collection *LoadGeneratingConnectionCollection) Append(conn LoadGeneratingConnection) error { + if collection.Lock.TryLock() { + collection.Lock.Unlock() + return fmt.Errorf("collection is unlocked") + } + *collection.LGCs = append(*collection.LGCs, conn) + return nil +} + +func (collection *LoadGeneratingConnectionCollection) Len() int { + return len(*collection.LGCs) +} diff --git a/lgc/download.go b/lgc/download.go new file mode 100644 index 0000000..71ed647 --- /dev/null +++ b/lgc/download.go @@ -0,0 +1,373 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package lgc + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net/http" + "net/http/httptrace" + "sync" + "sync/atomic" + "time" + + "github.com/network-quality/goresponsiveness/debug" + "github.com/network-quality/goresponsiveness/stats" + "github.com/network-quality/goresponsiveness/traceable" + "github.com/network-quality/goresponsiveness/utilities" +) + +// TODO: All 64-bit fields that are accessed atomically must +// appear at the top of this struct. +type LoadGeneratingConnectionDownload struct { + downloaded uint64 + lastIntervalEnd int64 + ConnectToAddr string + URL string + downloadStartTime time.Time + lastDownloaded uint64 + client *http.Client + debug debug.DebugLevel + InsecureSkipVerify bool + KeyLogger io.Writer + clientId uint64 + tracer *httptrace.ClientTrace + stats stats.TraceStats + status LgcStatus + statusLock *sync.Mutex + statusWaiter *sync.Cond +} + +func NewLoadGeneratingConnectionDownload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionDownload { + lgd := LoadGeneratingConnectionDownload{ + URL: url, + KeyLogger: keyLogger, + ConnectToAddr: connectToAddr, + InsecureSkipVerify: insecureSkipVerify, + statusLock: &sync.Mutex{}, + } + lgd.statusWaiter = sync.NewCond(lgd.statusLock) + return lgd +} + +func (lgd *LoadGeneratingConnectionDownload) WaitUntilStarted(ctxt context.Context) bool { + conditional := func() bool { return lgd.status != LGC_STATUS_NOT_STARTED } + go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgd.statusWaiter) + return utilities.WaitWithContext(ctxt, &conditional, lgd.statusLock, lgd.statusWaiter) +} + +func (lgd *LoadGeneratingConnectionDownload) SetDnsStartTimeInfo( + now time.Time, + dnsStartInfo httptrace.DNSStartInfo, +) { + lgd.stats.DnsStartTime = now + lgd.stats.DnsStart = dnsStartInfo + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "DNS Start for %v: %v\n", + lgd.ClientId(), + dnsStartInfo, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetDnsDoneTimeInfo( + now time.Time, + dnsDoneInfo httptrace.DNSDoneInfo, +) { + lgd.stats.DnsDoneTime = now + lgd.stats.DnsDone = dnsDoneInfo + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "DNS Done for %v: %v\n", + lgd.ClientId(), + lgd.stats.DnsDone, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetConnectStartTime( + now time.Time, +) { + lgd.stats.ConnectStartTime = now + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "TCP Start for %v at %v\n", + lgd.ClientId(), + lgd.stats.ConnectStartTime, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetConnectDoneTimeError( + now time.Time, + err error, +) { + lgd.stats.ConnectDoneTime = now + lgd.stats.ConnectDoneError = err + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "TCP Done for %v (with error %v) @ %v\n", + lgd.ClientId(), + lgd.stats.ConnectDoneError, + lgd.stats.ConnectDoneTime, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetGetConnTime(now time.Time) { + lgd.stats.GetConnectionStartTime = now + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Started getting connection for %v @ %v\n", + lgd.ClientId(), + lgd.stats.GetConnectionStartTime, + ) + } + lgd.statusLock.Lock() + lgd.status = LGC_STATUS_RUNNING + lgd.statusWaiter.Broadcast() + lgd.statusLock.Unlock() +} + +func (lgd *LoadGeneratingConnectionDownload) SetGotConnTimeInfo( + now time.Time, + gotConnInfo httptrace.GotConnInfo, +) { + if gotConnInfo.Reused { + fmt.Printf("Unexpectedly reusing a connection!\n") + panic(!gotConnInfo.Reused) + } + lgd.stats.GetConnectionDoneTime = now + lgd.stats.ConnInfo = gotConnInfo + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Got connection for %v at %v with info %v\n", + lgd.ClientId(), + lgd.stats.GetConnectionDoneTime, + lgd.stats.ConnInfo, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetTLSHandshakeStartTime( + now time.Time, +) { + lgd.stats.TLSStartTime = utilities.Some(now) + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Started TLS Handshake for %v @ %v\n", + lgd.ClientId(), + lgd.stats.TLSStartTime, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetTLSHandshakeDoneTimeState( + now time.Time, + connectionState tls.ConnectionState, +) { + lgd.stats.TLSDoneTime = utilities.Some(now) + lgd.stats.TLSConnInfo = connectionState + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Completed TLS handshake for %v at %v with info %v\n", + lgd.ClientId(), + lgd.stats.TLSDoneTime, + lgd.stats.TLSConnInfo, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetHttpWroteRequestTimeInfo( + now time.Time, + info httptrace.WroteRequestInfo, +) { + lgd.stats.HttpWroteRequestTime = now + lgd.stats.HttpInfo = info + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "(lgd) Http finished writing request for %v at %v with info %v\n", + lgd.ClientId(), + lgd.stats.HttpWroteRequestTime, + lgd.stats.HttpInfo, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetHttpResponseReadyTime( + now time.Time, +) { + lgd.stats.HttpResponseReadyTime = now + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Got the first byte of HTTP response headers for %v at %v\n", + lgd.ClientId(), + lgd.stats.HttpResponseReadyTime, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) ClientId() uint64 { + return lgd.clientId +} + +func (lgd *LoadGeneratingConnectionDownload) TransferredInInterval() (uint64, time.Duration) { + transferred := atomic.SwapUint64(&lgd.downloaded, 0) + newIntervalEnd := (time.Now().Sub(lgd.downloadStartTime)).Nanoseconds() + previousIntervalEnd := atomic.SwapInt64(&lgd.lastIntervalEnd, newIntervalEnd) + intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd) + if debug.IsDebug(lgd.debug) { + fmt.Printf("download: Transferred: %v bytes in %v.\n", transferred, intervalLength) + } + return transferred, intervalLength +} + +func (lgd *LoadGeneratingConnectionDownload) Client() *http.Client { + return lgd.client +} + +type countingReader struct { + n *uint64 + ctx context.Context + readable io.Reader +} + +func (cr *countingReader) Read(p []byte) (n int, err error) { + if cr.ctx.Err() != nil { + return 0, io.EOF + } + + n, err = cr.readable.Read(p) + atomic.AddUint64(cr.n, uint64(n)) + return +} + +func (lgd *LoadGeneratingConnectionDownload) Start( + parentCtx context.Context, + debugLevel debug.DebugLevel, +) bool { + lgd.downloaded = 0 + lgd.debug = debugLevel + lgd.clientId = utilities.GenerateUniqueId() + + transport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: lgd.InsecureSkipVerify, + }, + } + + if !utilities.IsInterfaceNil(lgd.KeyLogger) { + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Using an SSL Key Logger for this load-generating download.\n", + ) + } + + // The presence of a custom TLSClientConfig in a *generic* `transport` + // means that go will default to HTTP/1.1 and cowardly avoid HTTP/2: + // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278 + // Also, it would appear that the API's choice of HTTP vs HTTP2 can + // depend on whether the url contains + // https:// or http://: + // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L74 + transport.TLSClientConfig.KeyLogWriter = lgd.KeyLogger + } + transport.TLSClientConfig.InsecureSkipVerify = lgd.InsecureSkipVerify + + utilities.OverrideHostTransport(transport, lgd.ConnectToAddr) + + lgd.client = &http.Client{Transport: transport} + lgd.tracer = traceable.GenerateHttpTimingTracer(lgd, lgd.debug) + + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Started a load-generating download (id: %v).\n", + lgd.clientId, + ) + } + + go lgd.doDownload(parentCtx) + return true +} + +func (lgd *LoadGeneratingConnectionDownload) Status() LgcStatus { + return lgd.status +} + +func (lgd *LoadGeneratingConnectionDownload) Stats() *stats.TraceStats { + return &lgd.stats +} + +func (lgd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) error { + var request *http.Request = nil + var get *http.Response = nil + var err error = nil + + if request, err = http.NewRequestWithContext( + httptrace.WithClientTrace(ctx, lgd.tracer), + "GET", + lgd.URL, + nil, + ); err != nil { + lgd.statusLock.Lock() + lgd.status = LGC_STATUS_ERROR + lgd.statusWaiter.Broadcast() + lgd.statusLock.Unlock() + return err + } + + // Used to disable compression + request.Header.Set("Accept-Encoding", "identity") + request.Header.Set("User-Agent", utilities.UserAgent()) + + lgd.downloadStartTime = time.Now() + lgd.lastIntervalEnd = 0 + + if get, err = lgd.client.Do(request); err != nil { + lgd.statusLock.Lock() + lgd.status = LGC_STATUS_ERROR + lgd.statusWaiter.Broadcast() + lgd.statusLock.Unlock() + return err + } + + // Header.Get returns "" when not set + if get.Header.Get("Content-Encoding") != "" { + lgd.statusLock.Lock() + lgd.status = LGC_STATUS_ERROR + lgd.statusWaiter.Broadcast() + lgd.statusLock.Unlock() + fmt.Printf("Content-Encoding header was set (compression not allowed)") + return fmt.Errorf("Content-Encoding header was set (compression not allowed)") + } + cr := &countingReader{n: &lgd.downloaded, ctx: ctx, readable: get.Body} + _, _ = io.Copy(io.Discard, cr) + + lgd.statusLock.Lock() + lgd.status = LGC_STATUS_DONE + lgd.statusWaiter.Broadcast() + lgd.statusLock.Unlock() + + get.Body.Close() + if debug.IsDebug(lgd.debug) { + fmt.Printf("Ending a load-generating download.\n") + } + + return nil +} @@ -16,545 +16,28 @@ package lgc import ( "context" - "crypto/tls" - "fmt" - "io" "net/http" - "net/http/httptrace" - "sync" - "sync/atomic" "time" "github.com/network-quality/goresponsiveness/debug" "github.com/network-quality/goresponsiveness/stats" - "github.com/network-quality/goresponsiveness/traceable" - "github.com/network-quality/goresponsiveness/utilities" ) type LoadGeneratingConnection interface { Start(context.Context, debug.DebugLevel) bool TransferredInInterval() (uint64, time.Duration) Client() *http.Client - IsValid() bool + Status() LgcStatus ClientId() uint64 Stats() *stats.TraceStats WaitUntilStarted(context.Context) bool } -type LoadGeneratingConnectionCollection struct { - Lock sync.Mutex - LGCs *[]LoadGeneratingConnection -} - -func NewLoadGeneratingConnectionCollection() LoadGeneratingConnectionCollection { - return LoadGeneratingConnectionCollection{LGCs: new([]LoadGeneratingConnection)} -} - -func (collection *LoadGeneratingConnectionCollection) Get(idx int) (*LoadGeneratingConnection, error) { - if collection.Lock.TryLock() { - collection.Lock.Unlock() - return nil, fmt.Errorf("collection is unlocked") - } - - if idx > len(*collection.LGCs) { - return nil, fmt.Errorf("index too large") - } - return &(*collection.LGCs)[idx], nil -} - -func (collection *LoadGeneratingConnectionCollection) Append(conn LoadGeneratingConnection) error { - if collection.Lock.TryLock() { - collection.Lock.Unlock() - return fmt.Errorf("collection is unlocked") - } - *collection.LGCs = append(*collection.LGCs, conn) - return nil -} - -// TODO: All 64-bit fields that are accessed atomically must -// appear at the top of this struct. -type LoadGeneratingConnectionDownload struct { - downloaded uint64 - lastIntervalEnd int64 - ConnectToAddr string - URL string - downloadStartTime time.Time - lastDownloaded uint64 - client *http.Client - debug debug.DebugLevel - valid bool - validLock *sync.Mutex - InsecureSkipVerify bool - KeyLogger io.Writer - clientId uint64 - tracer *httptrace.ClientTrace - stats stats.TraceStats - validWaiter *sync.Cond -} - -func NewLoadGeneratingConnectionDownload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionDownload { - lgd := LoadGeneratingConnectionDownload{ - URL: url, - KeyLogger: keyLogger, - ConnectToAddr: connectToAddr, - InsecureSkipVerify: insecureSkipVerify, - validLock: &sync.Mutex{}, - } - lgd.validWaiter = sync.NewCond(lgd.validLock) - return lgd -} - -func (lgd *LoadGeneratingConnectionDownload) WaitUntilStarted(ctxt context.Context) bool { - conditional := func() bool { return lgd.valid } - go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgd.validWaiter) - return utilities.WaitWithContext(ctxt, &conditional, lgd.validLock, lgd.validWaiter) -} - -func (lgd *LoadGeneratingConnectionDownload) SetDnsStartTimeInfo( - now time.Time, - dnsStartInfo httptrace.DNSStartInfo, -) { - lgd.stats.DnsStartTime = now - lgd.stats.DnsStart = dnsStartInfo - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "DNS Start for %v: %v\n", - lgd.ClientId(), - dnsStartInfo, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) SetDnsDoneTimeInfo( - now time.Time, - dnsDoneInfo httptrace.DNSDoneInfo, -) { - lgd.stats.DnsDoneTime = now - lgd.stats.DnsDone = dnsDoneInfo - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "DNS Done for %v: %v\n", - lgd.ClientId(), - lgd.stats.DnsDone, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) SetConnectStartTime( - now time.Time, -) { - lgd.stats.ConnectStartTime = now - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "TCP Start for %v at %v\n", - lgd.ClientId(), - lgd.stats.ConnectStartTime, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) SetConnectDoneTimeError( - now time.Time, - err error, -) { - lgd.stats.ConnectDoneTime = now - lgd.stats.ConnectDoneError = err - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "TCP Done for %v (with error %v) @ %v\n", - lgd.ClientId(), - lgd.stats.ConnectDoneError, - lgd.stats.ConnectDoneTime, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) SetGetConnTime(now time.Time) { - lgd.stats.GetConnectionStartTime = now - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "Started getting connection for %v @ %v\n", - lgd.ClientId(), - lgd.stats.GetConnectionStartTime, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) SetGotConnTimeInfo( - now time.Time, - gotConnInfo httptrace.GotConnInfo, -) { - if gotConnInfo.Reused { - fmt.Printf("Unexpectedly reusing a connection!\n") - panic(!gotConnInfo.Reused) - } - lgd.stats.GetConnectionDoneTime = now - lgd.stats.ConnInfo = gotConnInfo - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "Got connection for %v at %v with info %v\n", - lgd.ClientId(), - lgd.stats.GetConnectionDoneTime, - lgd.stats.ConnInfo, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) SetTLSHandshakeStartTime( - now time.Time, -) { - lgd.stats.TLSStartTime = utilities.Some(now) - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "Started TLS Handshake for %v @ %v\n", - lgd.ClientId(), - lgd.stats.TLSStartTime, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) SetTLSHandshakeDoneTimeState( - now time.Time, - connectionState tls.ConnectionState, -) { - lgd.stats.TLSDoneTime = utilities.Some(now) - lgd.stats.TLSConnInfo = connectionState - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "Completed TLS handshake for %v at %v with info %v\n", - lgd.ClientId(), - lgd.stats.TLSDoneTime, - lgd.stats.TLSConnInfo, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) SetHttpWroteRequestTimeInfo( - now time.Time, - info httptrace.WroteRequestInfo, -) { - lgd.stats.HttpWroteRequestTime = now - lgd.stats.HttpInfo = info - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "(lgd) Http finished writing request for %v at %v with info %v\n", - lgd.ClientId(), - lgd.stats.HttpWroteRequestTime, - lgd.stats.HttpInfo, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) SetHttpResponseReadyTime( - now time.Time, -) { - lgd.stats.HttpResponseReadyTime = now - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "Got the first byte of HTTP response headers for %v at %v\n", - lgd.ClientId(), - lgd.stats.HttpResponseReadyTime, - ) - } -} - -func (lgd *LoadGeneratingConnectionDownload) ClientId() uint64 { - return lgd.clientId -} - -func (lgd *LoadGeneratingConnectionDownload) TransferredInInterval() (uint64, time.Duration) { - transferred := atomic.SwapUint64(&lgd.downloaded, 0) - newIntervalEnd := (time.Now().Sub(lgd.downloadStartTime)).Nanoseconds() - previousIntervalEnd := atomic.SwapInt64(&lgd.lastIntervalEnd, newIntervalEnd) - intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd) - if debug.IsDebug(lgd.debug) { - fmt.Printf("download: Transferred: %v bytes in %v.\n", transferred, intervalLength) - } - return transferred, intervalLength -} - -func (lgd *LoadGeneratingConnectionDownload) Client() *http.Client { - return lgd.client -} - -type countingReader struct { - n *uint64 - ctx context.Context - readable io.Reader -} - -func (cr *countingReader) Read(p []byte) (n int, err error) { - if cr.ctx.Err() != nil { - return 0, io.EOF - } - n, err = cr.readable.Read(p) - atomic.AddUint64(cr.n, uint64(n)) - return -} - -func (lgd *LoadGeneratingConnectionDownload) Start( - parentCtx context.Context, - debugLevel debug.DebugLevel, -) bool { - lgd.downloaded = 0 - lgd.debug = debugLevel - lgd.clientId = utilities.GenerateUniqueId() - - transport := &http.Transport{ - Proxy: http.ProxyFromEnvironment, - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: lgd.InsecureSkipVerify, - }, - } - - if !utilities.IsInterfaceNil(lgd.KeyLogger) { - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "Using an SSL Key Logger for this load-generating download.\n", - ) - } - - // The presence of a custom TLSClientConfig in a *generic* `transport` - // means that go will default to HTTP/1.1 and cowardly avoid HTTP/2: - // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278 - // Also, it would appear that the API's choice of HTTP vs HTTP2 can - // depend on whether the url contains - // https:// or http://: - // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L74 - transport.TLSClientConfig.KeyLogWriter = lgd.KeyLogger - } - transport.TLSClientConfig.InsecureSkipVerify = lgd.InsecureSkipVerify - - utilities.OverrideHostTransport(transport, lgd.ConnectToAddr) - - lgd.client = &http.Client{Transport: transport} - lgd.validLock.Lock() - lgd.valid = true - lgd.validLock.Unlock() - lgd.tracer = traceable.GenerateHttpTimingTracer(lgd, lgd.debug) - - if debug.IsDebug(lgd.debug) { - fmt.Printf( - "Started a load-generating download (id: %v).\n", - lgd.clientId, - ) - } +type LgcStatus int - go lgd.doDownload(parentCtx) - return true -} - -func (lgd *LoadGeneratingConnectionDownload) IsValid() bool { - return lgd.valid -} - -func (lgd *LoadGeneratingConnectionDownload) Stats() *stats.TraceStats { - return &lgd.stats -} - -func (lgd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) { - var request *http.Request = nil - var get *http.Response = nil - var err error = nil - - if request, err = http.NewRequestWithContext( - httptrace.WithClientTrace(ctx, lgd.tracer), - "GET", - lgd.URL, - nil, - ); err != nil { - lgd.validLock.Lock() - lgd.valid = false - lgd.validLock.Unlock() - return - } - - // Used to disable compression - request.Header.Set("Accept-Encoding", "identity") - request.Header.Set("User-Agent", utilities.UserAgent()) - - lgd.downloadStartTime = time.Now() - lgd.lastIntervalEnd = 0 - - if get, err = lgd.client.Do(request); err != nil { - lgd.validLock.Lock() - lgd.valid = false - lgd.validLock.Unlock() - return - } - - // Header.Get returns "" when not set - if get.Header.Get("Content-Encoding") != "" { - lgd.validLock.Lock() - lgd.valid = false - lgd.validLock.Unlock() - fmt.Printf("Content-Encoding header was set (compression not allowed)") - return - } - cr := &countingReader{n: &lgd.downloaded, ctx: ctx, readable: get.Body} - _, _ = io.Copy(io.Discard, cr) - get.Body.Close() - if debug.IsDebug(lgd.debug) { - fmt.Printf("Ending a load-generating download.\n") - } -} - -// TODO: All 64-bit fields that are accessed atomically must -// appear at the top of this struct. -type LoadGeneratingConnectionUpload struct { - uploaded uint64 - lastIntervalEnd int64 - URL string - ConnectToAddr string - uploadStartTime time.Time - lastUploaded uint64 - client *http.Client - debug debug.DebugLevel - valid bool - validLock *sync.Mutex - InsecureSkipVerify bool - KeyLogger io.Writer - clientId uint64 - validWaiter *sync.Cond -} - -func NewLoadGeneratingConnectionUpload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionUpload { - lgu := LoadGeneratingConnectionUpload{ - URL: url, - KeyLogger: keyLogger, - ConnectToAddr: connectToAddr, - InsecureSkipVerify: insecureSkipVerify, - validLock: &sync.Mutex{}, - } - lgu.validWaiter = sync.NewCond(lgu.validLock) - return lgu -} - -func (lgu *LoadGeneratingConnectionUpload) WaitUntilStarted(ctxt context.Context) bool { - conditional := func() bool { return lgu.valid } - go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgu.validWaiter) - return utilities.WaitWithContext(ctxt, &conditional, lgu.validLock, lgu.validWaiter) -} - -func (lgu *LoadGeneratingConnectionUpload) ClientId() uint64 { - return lgu.clientId -} - -func (lgu *LoadGeneratingConnectionUpload) TransferredInInterval() (uint64, time.Duration) { - transferred := atomic.SwapUint64(&lgu.uploaded, 0) - newIntervalEnd := (time.Now().Sub(lgu.uploadStartTime)).Nanoseconds() - previousIntervalEnd := atomic.SwapInt64(&lgu.lastIntervalEnd, newIntervalEnd) - intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd) - if debug.IsDebug(lgu.debug) { - fmt.Printf("upload: Transferred: %v bytes in %v.\n", transferred, intervalLength) - } - return transferred, intervalLength -} - -func (lgu *LoadGeneratingConnectionUpload) Client() *http.Client { - return lgu.client -} - -func (lgu *LoadGeneratingConnectionUpload) IsValid() bool { - return lgu.valid -} - -type syntheticCountingReader struct { - n *uint64 - ctx context.Context -} - -func (s *syntheticCountingReader) Read(p []byte) (n int, err error) { - if s.ctx.Err() != nil { - return 0, io.EOF - } - err = nil - n = len(p) - - atomic.AddUint64(s.n, uint64(n)) - return -} - -func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) bool { - lgu.uploaded = 0 - s := &syntheticCountingReader{n: &lgu.uploaded, ctx: ctx} - var resp *http.Response = nil - var request *http.Request = nil - var err error - - if request, err = http.NewRequest( - "POST", - lgu.URL, - s, - ); err != nil { - lgu.validLock.Lock() - lgu.valid = false - lgu.validLock.Unlock() - return false - } - - // Used to disable compression - request.Header.Set("Accept-Encoding", "identity") - request.Header.Set("User-Agent", utilities.UserAgent()) - - lgu.uploadStartTime = time.Now() - lgu.lastIntervalEnd = 0 - - if resp, err = lgu.client.Do(request); err != nil { - lgu.validLock.Lock() - lgu.valid = false - lgu.validLock.Unlock() - return false - } - - resp.Body.Close() - if debug.IsDebug(lgu.debug) { - fmt.Printf("Ending a load-generating upload.\n") - } - return true -} - -func (lgu *LoadGeneratingConnectionUpload) Start( - parentCtx context.Context, - debugLevel debug.DebugLevel, -) bool { - lgu.uploaded = 0 - lgu.clientId = utilities.GenerateUniqueId() - lgu.debug = debugLevel - - transport := &http.Transport{ - Proxy: http.ProxyFromEnvironment, - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: lgu.InsecureSkipVerify, - }, - } - - if !utilities.IsInterfaceNil(lgu.KeyLogger) { - if debug.IsDebug(lgu.debug) { - fmt.Printf( - "Using an SSL Key Logger for this load-generating upload.\n", - ) - } - transport.TLSClientConfig.KeyLogWriter = lgu.KeyLogger - } - - utilities.OverrideHostTransport(transport, lgu.ConnectToAddr) - - lgu.client = &http.Client{Transport: transport} - - lgu.validLock.Lock() - lgu.valid = true - lgu.validLock.Unlock() - - if debug.IsDebug(lgu.debug) { - fmt.Printf("Started a load-generating upload (id: %v).\n", lgu.clientId) - } - - go lgu.doUpload(parentCtx) - return true -} - -func (lgu *LoadGeneratingConnectionUpload) Stats() *stats.TraceStats { - // Get all your stats from the download side of the LGC. - return nil -} +const ( + LGC_STATUS_NOT_STARTED LgcStatus = iota + LGC_STATUS_RUNNING + LGC_STATUS_DONE + LGC_STATUS_ERROR +) diff --git a/lgc/upload.go b/lgc/upload.go new file mode 100644 index 0000000..f0c772e --- /dev/null +++ b/lgc/upload.go @@ -0,0 +1,206 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package lgc + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net/http" + "sync" + "sync/atomic" + "time" + + "github.com/network-quality/goresponsiveness/debug" + "github.com/network-quality/goresponsiveness/stats" + "github.com/network-quality/goresponsiveness/utilities" +) + +// TODO: All 64-bit fields that are accessed atomically must +// appear at the top of this struct. +type LoadGeneratingConnectionUpload struct { + uploaded uint64 + lastIntervalEnd int64 + URL string + ConnectToAddr string + uploadStartTime time.Time + lastUploaded uint64 + client *http.Client + debug debug.DebugLevel + InsecureSkipVerify bool + KeyLogger io.Writer + clientId uint64 + status LgcStatus + statusLock *sync.Mutex + statusWaiter *sync.Cond +} + +func NewLoadGeneratingConnectionUpload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionUpload { + lgu := LoadGeneratingConnectionUpload{ + URL: url, + KeyLogger: keyLogger, + ConnectToAddr: connectToAddr, + InsecureSkipVerify: insecureSkipVerify, + statusLock: &sync.Mutex{}, + } + lgu.status = LGC_STATUS_NOT_STARTED + lgu.statusWaiter = sync.NewCond(lgu.statusLock) + return lgu +} + +func (lgu *LoadGeneratingConnectionUpload) WaitUntilStarted(ctxt context.Context) bool { + conditional := func() bool { return lgu.status != LGC_STATUS_NOT_STARTED } + go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgu.statusWaiter) + return utilities.WaitWithContext(ctxt, &conditional, lgu.statusLock, lgu.statusWaiter) +} + +func (lgu *LoadGeneratingConnectionUpload) ClientId() uint64 { + return lgu.clientId +} + +func (lgu *LoadGeneratingConnectionUpload) TransferredInInterval() (uint64, time.Duration) { + transferred := atomic.SwapUint64(&lgu.uploaded, 0) + newIntervalEnd := (time.Now().Sub(lgu.uploadStartTime)).Nanoseconds() + previousIntervalEnd := atomic.SwapInt64(&lgu.lastIntervalEnd, newIntervalEnd) + intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd) + if debug.IsDebug(lgu.debug) { + fmt.Printf("upload: Transferred: %v bytes in %v.\n", transferred, intervalLength) + } + return transferred, intervalLength +} + +func (lgu *LoadGeneratingConnectionUpload) Client() *http.Client { + return lgu.client +} + +func (lgu *LoadGeneratingConnectionUpload) Status() LgcStatus { + return lgu.status +} + +type syntheticCountingReader struct { + n *uint64 + ctx context.Context + lgu *LoadGeneratingConnectionUpload +} + +func (s *syntheticCountingReader) Read(p []byte) (n int, err error) { + if s.ctx.Err() != nil { + return 0, io.EOF + } + if n == 0 { + s.lgu.statusLock.Lock() + s.lgu.status = LGC_STATUS_RUNNING + s.lgu.statusWaiter.Broadcast() + s.lgu.statusLock.Unlock() + } + err = nil + n = len(p) + + atomic.AddUint64(s.n, uint64(n)) + return +} + +func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) error { + lgu.uploaded = 0 + s := &syntheticCountingReader{n: &lgu.uploaded, ctx: ctx, lgu: lgu} + var resp *http.Response = nil + var request *http.Request = nil + var err error + + if request, err = http.NewRequest( + "POST", + lgu.URL, + s, + ); err != nil { + lgu.statusLock.Lock() + lgu.status = LGC_STATUS_ERROR + lgu.statusWaiter.Broadcast() + lgu.statusLock.Unlock() + return err + } + + // Used to disable compression + request.Header.Set("Accept-Encoding", "identity") + request.Header.Set("User-Agent", utilities.UserAgent()) + + lgu.uploadStartTime = time.Now() + lgu.lastIntervalEnd = 0 + + lgu.statusLock.Lock() + lgu.status = LGC_STATUS_RUNNING + lgu.statusWaiter.Broadcast() + lgu.statusLock.Unlock() + + if resp, err = lgu.client.Do(request); err != nil { + lgu.statusLock.Lock() + lgu.status = LGC_STATUS_ERROR + lgu.statusWaiter.Broadcast() + lgu.statusLock.Unlock() + return err + } + + lgu.statusLock.Lock() + lgu.status = LGC_STATUS_DONE + lgu.statusWaiter.Broadcast() + lgu.statusLock.Unlock() + + resp.Body.Close() + if debug.IsDebug(lgu.debug) { + fmt.Printf("Ending a load-generating upload.\n") + } + return nil +} + +func (lgu *LoadGeneratingConnectionUpload) Start( + parentCtx context.Context, + debugLevel debug.DebugLevel, +) bool { + lgu.uploaded = 0 + lgu.clientId = utilities.GenerateUniqueId() + lgu.debug = debugLevel + + transport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: lgu.InsecureSkipVerify, + }, + } + + if !utilities.IsInterfaceNil(lgu.KeyLogger) { + if debug.IsDebug(lgu.debug) { + fmt.Printf( + "Using an SSL Key Logger for this load-generating upload.\n", + ) + } + transport.TLSClientConfig.KeyLogWriter = lgu.KeyLogger + } + + utilities.OverrideHostTransport(transport, lgu.ConnectToAddr) + + lgu.client = &http.Client{Transport: transport} + + if debug.IsDebug(lgu.debug) { + fmt.Printf("Started a load-generating upload (id: %v).\n", lgu.clientId) + } + + go lgu.doUpload(parentCtx) + return true +} + +func (lgu *LoadGeneratingConnectionUpload) Stats() *stats.TraceStats { + // Get all your stats from the download side of the LGC. + return nil +} diff --git a/networkQuality.go b/networkQuality.go index 06ac0a0..309aa94 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -32,6 +32,7 @@ import ( "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" "github.com/network-quality/goresponsiveness/ms" + "github.com/network-quality/goresponsiveness/probe" "github.com/network-quality/goresponsiveness/rpm" "github.com/network-quality/goresponsiveness/stabilizer" "github.com/network-quality/goresponsiveness/timeoutat" @@ -246,8 +247,8 @@ func main() { pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() } - var selfProbeDataLogger datalogger.DataLogger[rpm.ProbeDataPoint] = nil - var foreignProbeDataLogger datalogger.DataLogger[rpm.ProbeDataPoint] = nil + var selfProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] = nil + var foreignProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] = nil var downloadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil var uploadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil var granularThroughputDataLogger datalogger.DataLogger[rpm.GranularThroughputDataPoint] = nil @@ -275,7 +276,7 @@ func main() { "-throughput-granular-"+unique, ) - selfProbeDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ProbeDataPoint]( + selfProbeDataLogger, err = datalogger.CreateCSVDataLogger[probe.ProbeDataPoint]( dataLoggerSelfFilename, ) if err != nil { @@ -286,7 +287,7 @@ func main() { selfProbeDataLogger = nil } - foreignProbeDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ProbeDataPoint]( + foreignProbeDataLogger, err = datalogger.CreateCSVDataLogger[probe.ProbeDataPoint]( dataLoggerForeignFilename, ) if err != nil { @@ -333,10 +334,10 @@ func main() { // If, for some reason, the data loggers are nil, make them Null Data Loggers so that we don't have conditional // code later. if selfProbeDataLogger == nil { - selfProbeDataLogger = datalogger.CreateNullDataLogger[rpm.ProbeDataPoint]() + selfProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]() } if foreignProbeDataLogger == nil { - foreignProbeDataLogger = datalogger.CreateNullDataLogger[rpm.ProbeDataPoint]() + foreignProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]() } if downloadThroughputDataLogger == nil { downloadThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]() @@ -352,26 +353,26 @@ func main() { * Create (and then, ironically, name) two anonymous functions that, when invoked, * will create load-generating connections for upload/download */ - generate_lgd := func() lgc.LoadGeneratingConnection { + generateLgdc := func() lgc.LoadGeneratingConnection { lgd := lgc.NewLoadGeneratingConnectionDownload(config.Urls.LargeUrl, sslKeyFileConcurrentWriter, config.ConnectToAddr, *insecureSkipVerify) return &lgd } - generate_lgu := func() lgc.LoadGeneratingConnection { + generateLguc := func() lgc.LoadGeneratingConnection { lgu := lgc.NewLoadGeneratingConnectionUpload(config.Urls.UploadUrl, sslKeyFileConcurrentWriter, config.ConnectToAddr, *insecureSkipVerify) return &lgu } - generateSelfProbeConfiguration := func() rpm.ProbeConfiguration { - return rpm.ProbeConfiguration{ + generateSelfProbeConfiguration := func() probe.ProbeConfiguration { + return probe.ProbeConfiguration{ URL: config.Urls.SmallUrl, ConnectToAddr: config.ConnectToAddr, InsecureSkipVerify: *insecureSkipVerify, } } - generateForeignProbeConfiguration := func() rpm.ProbeConfiguration { - return rpm.ProbeConfiguration{ + generateForeignProbeConfiguration := func() probe.ProbeConfiguration { + return probe.ProbeConfiguration{ URL: config.Urls.SmallUrl, ConnectToAddr: config.ConnectToAddr, InsecureSkipVerify: *insecureSkipVerify, @@ -393,7 +394,7 @@ func main() { networkActivityCtx, downloadLoadGeneratorOperatorCtx, time.Second, - generate_lgd, + generateLgdc, &downloadLoadGeneratingConnectionCollection, *calculateExtendedStats, downloadDebugging, @@ -402,7 +403,7 @@ func main() { networkActivityCtx, uploadLoadGeneratorOperatorCtx, time.Second, - generate_lgu, + generateLguc, &uploadLoadGeneratingConnectionCollection, *calculateExtendedStats, uploadDebugging, @@ -410,7 +411,7 @@ func main() { // Handles for the first connection that the load-generating go routines (both up and // download) open are passed back on the self[Down|Up]ProbeConnectionCommunicationChannel - // so that we can then start probes on those handles. + // so that we can then start probes on those connections. selfDownProbeConnection := <-selfDownProbeConnectionCommunicationChannel selfUpProbeConnection := <-selfUpProbeConnectionCommunicationChannel @@ -532,7 +533,7 @@ timeout: fmt.Printf( "################# Responsiveness is instantaneously %s.\n", utilities.Conditional(responsivenessIsStable, "stable", "unstable")) } - if probeMeasurement.Type == rpm.Foreign { + if probeMeasurement.Type == probe.Foreign { // There may be more than one round trip accumulated together. If that is the case, // we will blow them apart in to three separate measurements and each one will just // be 1 / measurement.RoundTripCount of the total length. @@ -540,13 +541,13 @@ timeout: foreignRtts.AddElement(probeMeasurement.Duration.Seconds() / float64(probeMeasurement.RoundTripCount)) } - } else if probeMeasurement.Type == rpm.SelfDown || probeMeasurement.Type == rpm.SelfUp { + } else if probeMeasurement.Type == probe.SelfDown || probeMeasurement.Type == probe.SelfUp { selfRtts.AddElement(probeMeasurement.Duration.Seconds()) } - if probeMeasurement.Type == rpm.Foreign { + if probeMeasurement.Type == probe.Foreign { foreignProbeDataLogger.LogRecord(probeMeasurement) - } else if probeMeasurement.Type == rpm.SelfDown || probeMeasurement.Type == rpm.SelfUp { + } else if probeMeasurement.Type == probe.SelfDown || probeMeasurement.Type == probe.SelfUp { selfProbeDataLogger.LogRecord(probeMeasurement) } } @@ -592,10 +593,11 @@ timeout: defer downloadLoadGeneratingConnectionCollection.Lock.Unlock() // Note: We do not trace upload connections! - for i := 0; i < len(*downloadLoadGeneratingConnectionCollection.LGCs); i++ { + for i := 0; i < downloadLoadGeneratingConnectionCollection.Len(); i++ { // Assume that extended statistics are available -- the check was done explicitly at // program startup if the calculateExtendedStats flag was set by the user on the command line. - if err := extendedStats.IncorporateConnectionStats((*downloadLoadGeneratingConnectionCollection.LGCs)[i].Stats().ConnInfo.Conn); err != nil { + currentLgc, _ := downloadLoadGeneratingConnectionCollection.Get(i) + if err := extendedStats.IncorporateConnectionStats((*currentLgc).Stats().ConnInfo.Conn); err != nil { fmt.Fprintf( os.Stderr, "Warning: Could not add extended stats for the connection: %v\n", @@ -20,7 +20,6 @@ import ( "fmt" "io" "net/http" - "net/http/httptrace" "os" "sync" "time" @@ -29,8 +28,7 @@ import ( "github.com/network-quality/goresponsiveness/debug" "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" - "github.com/network-quality/goresponsiveness/stats" - "github.com/network-quality/goresponsiveness/traceable" + "github.com/network-quality/goresponsiveness/probe" "github.com/network-quality/goresponsiveness/utilities" ) @@ -59,22 +57,6 @@ func addFlows( return toAdd } -type ProbeConfiguration struct { - ConnectToAddr string - URL string - Host string - InsecureSkipVerify bool -} - -type ProbeDataPoint struct { - Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"` - RoundTripCount uint64 `Description:"The number of round trips measured by this data point."` - Duration time.Duration `Description:"The duration for this measurement." Formatter:"Seconds"` - TCPRtt time.Duration `Description:"The underlying connection's RTT at probe time." Formatter:"Seconds"` - TCPCwnd uint32 `Description:"The underlying connection's congestion window at probe time."` - Type ProbeType `Description:"The type of the probe." Formatter:"Value"` -} - type GranularThroughputDataPoint struct { Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"` Throughput float64 `Description:"Instantaneous throughput (B/s)."` @@ -94,182 +76,26 @@ type ThroughputDataPoint struct { type SelfDataCollectionResult struct { RateBps float64 LGCs []lgc.LoadGeneratingConnection - ProbeDataPoints []ProbeDataPoint + ProbeDataPoints []probe.ProbeDataPoint LoggingContinuation func() } -type ProbeType int64 - -const ( - SelfUp ProbeType = iota - SelfDown - Foreign -) - -type ProbeRoundTripCountType uint16 - -const ( - DefaultDownRoundTripCount ProbeRoundTripCountType = 1 - SelfUpRoundTripCount ProbeRoundTripCountType = 1 - SelfDownRoundTripCount ProbeRoundTripCountType = 1 - ForeignRoundTripCount ProbeRoundTripCountType = 3 -) - -func (pt ProbeType) Value() string { - if pt == SelfUp { - return "SelfUp" - } else if pt == SelfDown { - return "SelfDown" - } - return "Foreign" -} - -func Probe( - managingCtx context.Context, - waitGroup *sync.WaitGroup, - client *http.Client, - probeUrl string, - probeHost string, // optional: for use with a test_endpoint - probeType ProbeType, - result *chan ProbeDataPoint, - captureExtendedStats bool, - debugging *debug.DebugWithPrefix, -) error { - - if waitGroup != nil { - waitGroup.Add(1) - defer waitGroup.Done() - } - - if client == nil { - return fmt.Errorf("cannot start a probe with a nil client") - } - - probeId := utilities.GenerateUniqueId() - probeTracer := NewProbeTracer(client, probeType, probeId, debugging) - time_before_probe := time.Now() - probe_req, err := http.NewRequestWithContext( - httptrace.WithClientTrace(managingCtx, probeTracer.trace), - "GET", - probeUrl, - nil, - ) - if err != nil { - return err - } - - // Used to disable compression - probe_req.Header.Set("Accept-Encoding", "identity") - probe_req.Header.Set("User-Agent", utilities.UserAgent()) - - probe_resp, err := client.Do(probe_req) - if err != nil { - return err - } - - // Header.Get returns "" when not set - if probe_resp.Header.Get("Content-Encoding") != "" { - return fmt.Errorf("Content-Encoding header was set (compression not allowed)") - } - - // TODO: Make this interruptable somehow by using _ctx_. - _, err = io.ReadAll(probe_resp.Body) - if err != nil { - return err - } - time_after_probe := time.Now() - - // Depending on whether we think that Close() requires another RTT (via TCP), we - // may need to move this before/after capturing the after time. - probe_resp.Body.Close() - - sanity := time_after_probe.Sub(time_before_probe) - - // When the probe is run on a load-generating connection (a self probe) there should - // only be a single round trip that is measured. We will take the accumulation of all these - // values just to be sure, though. Because of how this traced connection was launched, most - // of the values will be 0 (or very small where the time that go takes for delivering callbacks - // and doing context switches pokes through). When it is !isSelfProbe then the values will - // be significant and we want to add them regardless! - totalDelay := probeTracer.GetTLSAndHttpHeaderDelta() + probeTracer.GetHttpDownloadDelta( - time_after_probe, - ) + probeTracer.GetTCPDelta() - - // We must have reused the connection if we are a self probe! - if (probeType == SelfUp || probeType == SelfDown) && !probeTracer.stats.ConnectionReused { - panic(!probeTracer.stats.ConnectionReused) - } - - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "(%s) (%s Probe %v) sanity vs total: %v vs %v\n", - debugging.Prefix, - probeType.Value(), - probeId, - sanity, - totalDelay, - ) - } - roundTripCount := DefaultDownRoundTripCount - if probeType == Foreign { - roundTripCount = ForeignRoundTripCount - } - // Careful!!! It's possible that this channel has been closed because the Prober that - // started it has been stopped. Writing to a closed channel will cause a panic. It might not - // matter because a panic just stops the go thread containing the paniced code and we are in - // a go thread that executes only this function. - defer func() { - isThreadPanicing := recover() - if isThreadPanicing != nil && debug.IsDebug(debugging.Level) { - fmt.Printf( - "(%s) (%s Probe %v) Probe attempted to write to the result channel after its invoker ended (official reason: %v).\n", - debugging.Prefix, - probeType.Value(), - probeId, - isThreadPanicing, - ) - } - }() - tcpRtt := time.Duration(0 * time.Second) - tcpCwnd := uint32(0) - // TODO: Only get the extended stats for a connection if the user has requested them overall. - if captureExtendedStats && extendedstats.ExtendedStatsAvailable() { - tcpInfo, err := extendedstats.GetTCPInfo(probeTracer.stats.ConnInfo.Conn) - if err == nil { - tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond - tcpCwnd = tcpInfo.Snd_cwnd - } else { - fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err) - } - } - dataPoint := ProbeDataPoint{ - Time: time_before_probe, - RoundTripCount: uint64(roundTripCount), - Duration: totalDelay, - TCPRtt: tcpRtt, - TCPCwnd: tcpCwnd, - Type: probeType, - } - *result <- dataPoint - return nil -} - func CombinedProber( proberCtx context.Context, networkActivityCtx context.Context, - foreignProbeConfigurationGenerator func() ProbeConfiguration, - selfProbeConfigurationGenerator func() ProbeConfiguration, + foreignProbeConfigurationGenerator func() probe.ProbeConfiguration, + selfProbeConfigurationGenerator func() probe.ProbeConfiguration, selfDownProbeConnection lgc.LoadGeneratingConnection, selfUpProbeConnection lgc.LoadGeneratingConnection, probeInterval time.Duration, keyLogger io.Writer, captureExtendedStats bool, debugging *debug.DebugWithPrefix, -) (dataPoints chan ProbeDataPoint) { +) (dataPoints chan probe.ProbeDataPoint) { // Make a channel to send back all the generated data points // when we are probing. - dataPoints = make(chan ProbeDataPoint) + dataPoints = make(chan probe.ProbeDataPoint) go func() { wg := sync.WaitGroup{} @@ -319,39 +145,39 @@ func CombinedProber( // Start Foreign Connection Prober probeCount++ - go Probe( + go probe.Probe( networkActivityCtx, &wg, foreignProbeClient, foreignProbeConfiguration.URL, foreignProbeConfiguration.Host, - Foreign, + probe.Foreign, &dataPoints, captureExtendedStats, debugging, ) // Start Self Download Connection Prober - go Probe( + go probe.Probe( networkActivityCtx, &wg, selfDownProbeConnection.Client(), selfProbeConfiguration.URL, selfProbeConfiguration.Host, - SelfDown, + probe.SelfDown, &dataPoints, captureExtendedStats, debugging, ) // Start Self Upload Connection Prober - go Probe( + go probe.Probe( proberCtx, &wg, selfUpProbeConnection.Client(), selfProbeConfiguration.URL, selfProbeConfiguration.Host, - SelfUp, + probe.SelfUp, &dataPoints, captureExtendedStats, debugging, @@ -380,7 +206,7 @@ func LoadGenerator( loadGeneratorCtx context.Context, // Stop our activity when we no longer need to generate load. rampupInterval time.Duration, lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection. - loadGeneratingConnections *lgc.LoadGeneratingConnectionCollection, + loadGeneratingConnectionsCollection *lgc.LoadGeneratingConnectionCollection, captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections? debugging *debug.DebugWithPrefix, // How can we forget debugging? ) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes. @@ -400,7 +226,7 @@ func LoadGenerator( flowsCreated += addFlows( networkActivityCtx, constants.StartingNumberOfLoadGeneratingConnections, - loadGeneratingConnections, + loadGeneratingConnectionsCollection, lgcGenerator, debugging.Level, ) @@ -408,9 +234,9 @@ func LoadGenerator( // We have at least a single load-generating channel. This channel will be the one that // the self probes use. Let's send it back to the caller so that they can pass it on if they need to. go func() { - loadGeneratingConnections.Lock.Lock() - zerothConnection, err := loadGeneratingConnections.Get(0) - loadGeneratingConnections.Lock.Unlock() + loadGeneratingConnectionsCollection.Lock.Lock() + zerothConnection, err := loadGeneratingConnectionsCollection.Get(0) + loadGeneratingConnectionsCollection.Lock.Unlock() if err != nil { panic("Could not get the zeroth connection!\n") } @@ -452,56 +278,76 @@ func LoadGenerator( granularThroughputDatapoints := make([]GranularThroughputDataPoint, 0) now = time.Now() // Used to align granular throughput data allInvalid := true - for i := range *loadGeneratingConnections.LGCs { - if !(*loadGeneratingConnections.LGCs)[i].IsValid() { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Load-generating connection with id %d is invalid ... skipping.\n", + for i := range *loadGeneratingConnectionsCollection.LGCs { + loadGeneratingConnectionsCollection.Lock.Lock() + connectionState := (*loadGeneratingConnectionsCollection.LGCs)[i].Status() + loadGeneratingConnectionsCollection.Lock.Unlock() + switch connectionState { + default: + { + error := fmt.Sprintf( + "%v: Load-generating connection with id %d is in an unrecognizable state.\n", debugging, - (*loadGeneratingConnections.LGCs)[i].ClientId(), + (*loadGeneratingConnectionsCollection.LGCs)[i].ClientId()) + fmt.Fprintf(os.Stderr, "%s", error) + panic(error) + } + case lgc.LGC_STATUS_ERROR, + lgc.LGC_STATUS_DONE: + { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Load-generating connection with id %d is invalid or complete ... skipping.\n", + debugging, + (*loadGeneratingConnectionsCollection.LGCs)[i].ClientId(), + ) + } + // TODO: Do we add null connection to throughput? and how do we define it? Throughput -1 or 0? + granularThroughputDatapoints = append( + granularThroughputDatapoints, + GranularThroughputDataPoint{now, 0, uint32(i), 0, 0, ""}, ) + continue } - // TODO: Do we add null connection to throughput? and how do we define it? Throughput -1 or 0? - granularThroughputDatapoints = append( - granularThroughputDatapoints, - GranularThroughputDataPoint{now, 0, uint32(i), 0, 0, ""}, - ) - continue - } - allInvalid = false - currentTransferred, currentInterval := (*loadGeneratingConnections.LGCs)[i].TransferredInInterval() - // normalize to a second-long interval! - instantaneousConnectionThroughput := float64( - currentTransferred, - ) / float64( - currentInterval.Seconds(), - ) - instantaneousTotalThroughput += instantaneousConnectionThroughput + case lgc.LGC_STATUS_RUNNING: + { + allInvalid = false + currentTransferred, currentInterval := (*loadGeneratingConnectionsCollection.LGCs)[i].TransferredInInterval() + // normalize to a second-long interval! + instantaneousConnectionThroughput := float64( + currentTransferred, + ) / float64( + currentInterval.Seconds(), + ) + instantaneousTotalThroughput += instantaneousConnectionThroughput - tcpRtt := time.Duration(0 * time.Second) - tcpCwnd := uint32(0) - if captureExtendedStats && extendedstats.ExtendedStatsAvailable() { - if stats := (*loadGeneratingConnections.LGCs)[i].Stats(); stats != nil { - tcpInfo, err := extendedstats.GetTCPInfo(stats.ConnInfo.Conn) - if err == nil { - tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond - tcpCwnd = tcpInfo.Snd_cwnd - } else { - fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err) + tcpRtt := time.Duration(0 * time.Second) + tcpCwnd := uint32(0) + if captureExtendedStats && extendedstats.ExtendedStatsAvailable() { + if stats := (*loadGeneratingConnectionsCollection.LGCs)[i].Stats(); stats != nil { + tcpInfo, err := extendedstats.GetTCPInfo(stats.ConnInfo.Conn) + if err == nil { + tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond + tcpCwnd = tcpInfo.Snd_cwnd + } else { + fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err) + } + } } + granularThroughputDatapoints = append( + granularThroughputDatapoints, + GranularThroughputDataPoint{ + now, + instantaneousConnectionThroughput, + uint32(i), + tcpRtt, + tcpCwnd, + "", + }, + ) } + } - granularThroughputDatapoints = append( - granularThroughputDatapoints, - GranularThroughputDataPoint{ - now, - instantaneousConnectionThroughput, - uint32(i), - tcpRtt, - tcpCwnd, - "", - }, - ) } // For some reason, all the lgcs are invalid. This likely means that @@ -520,7 +366,7 @@ func LoadGenerator( throughputDataPoint := ThroughputDataPoint{ time.Now(), instantaneousTotalThroughput, - len(*loadGeneratingConnections.LGCs), + len(*loadGeneratingConnectionsCollection.LGCs), granularThroughputDatapoints, } throughputCalculations <- throughputDataPoint @@ -529,7 +375,7 @@ func LoadGenerator( flowsCreated += addFlows( networkActivityCtx, constants.AdditiveNumberOfLoadGeneratingConnections, - loadGeneratingConnections, + loadGeneratingConnectionsCollection, lgcGenerator, debugging.Level, ) @@ -543,281 +389,3 @@ func LoadGenerator( }() return } - -type ProbeTracer struct { - client *http.Client - stats *stats.TraceStats - trace *httptrace.ClientTrace - debug debug.DebugLevel - probeid uint64 - probeType ProbeType -} - -func (p *ProbeTracer) String() string { - return fmt.Sprintf("(Probe %v): stats: %v\n", p.probeid, p.stats) -} - -func (p *ProbeTracer) ProbeId() uint64 { - return p.probeid -} - -func (p *ProbeTracer) GetTrace() *httptrace.ClientTrace { - return p.trace -} - -func (p *ProbeTracer) GetDnsDelta() time.Duration { - if p.stats.ConnectionReused { - return time.Duration(0) - } - delta := p.stats.DnsDoneTime.Sub(p.stats.DnsStartTime) - if debug.IsDebug(p.debug) { - fmt.Printf("(Probe %v): DNS Time: %v\n", p.probeid, delta) - } - return delta -} - -func (p *ProbeTracer) GetTCPDelta() time.Duration { - if p.stats.ConnectionReused { - return time.Duration(0) - } - delta := p.stats.ConnectDoneTime.Sub(p.stats.ConnectStartTime) - if debug.IsDebug(p.debug) { - fmt.Printf("(Probe %v): TCP Connection Time: %v\n", p.probeid, delta) - } - return delta -} - -func (p *ProbeTracer) GetTLSDelta() time.Duration { - if utilities.IsSome(p.stats.TLSDoneTime) { - panic("There should not be TLS information, but there is.") - } - delta := time.Duration(0) - if debug.IsDebug(p.debug) { - fmt.Printf("(Probe %v): TLS Time: %v\n", p.probeid, delta) - } - return delta -} - -func (p *ProbeTracer) GetTLSAndHttpHeaderDelta() time.Duration { - // Because the TLS handshake occurs *after* the TCP connection (ConnectDoneTime) - // and *before* the HTTP transaction, we know that the delta between the time - // that the first HTTP response byte is available and the time that the TCP - // connection was established includes both the time for the HTTP header RTT - // *and* the TLS handshake RTT, whether we can specifically measure the latter - // or not. Eventually when TLS handshake tracing is fixed, we can break these - // into separate buckets, but for now this workaround is reasonable. - before := p.stats.ConnectDoneTime - if p.stats.ConnectionReused { - // When we reuse a connection there will be no time logged for when the - // TCP connection was established (obviously). So, fall back to the time - // when we were notified about reusing a connection (as a close approximation!). - before = p.stats.GetConnectionDoneTime - } - delta := p.stats.HttpResponseReadyTime.Sub(before) - if debug.IsDebug(p.debug) { - fmt.Printf("(Probe %v): Http TLS and Header Time: %v\n", p.probeid, delta) - } - return delta -} - -func (p *ProbeTracer) GetHttpHeaderDelta() time.Duration { - panic( - "Unusable until TLS tracing support is enabled! Use GetTLSAndHttpHeaderDelta() instead.\n", - ) - delta := p.stats.HttpResponseReadyTime.Sub(utilities.GetSome(p.stats.TLSDoneTime)) - if debug.IsDebug(p.debug) { - fmt.Printf("(Probe %v): Http Header Time: %v\n", p.probeid, delta) - } - return delta -} - -func (p *ProbeTracer) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration { - delta := httpDoneTime.Sub(p.stats.HttpResponseReadyTime) - if debug.IsDebug(p.debug) { - fmt.Printf("(Probe %v): Http Download Time: %v\n", p.probeid, delta) - } - return delta -} - -func NewProbeTracer( - client *http.Client, - probeType ProbeType, - probeId uint64, - debugging *debug.DebugWithPrefix, -) *ProbeTracer { - probe := &ProbeTracer{ - client: client, - stats: &stats.TraceStats{}, - trace: nil, - debug: debugging.Level, - probeid: probeId, - probeType: probeType, - } - trace := traceable.GenerateHttpTimingTracer(probe, debugging.Level) - - probe.trace = trace - return probe -} - -func (probe *ProbeTracer) SetDnsStartTimeInfo( - now time.Time, - dnsStartInfo httptrace.DNSStartInfo, -) { - probe.stats.DnsStartTime = now - probe.stats.DnsStart = dnsStartInfo - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) DNS Start for Probe %v: %v\n", - probe.probeType.Value(), - probe.ProbeId(), - dnsStartInfo, - ) - } -} - -func (probe *ProbeTracer) SetDnsDoneTimeInfo( - now time.Time, - dnsDoneInfo httptrace.DNSDoneInfo, -) { - probe.stats.DnsDoneTime = now - probe.stats.DnsDone = dnsDoneInfo - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) DNS Done for Probe %v: %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.DnsDone, - ) - } -} - -func (probe *ProbeTracer) SetConnectStartTime( - now time.Time, -) { - probe.stats.ConnectStartTime = now - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) TCP Start for Probe %v at %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.ConnectStartTime, - ) - } -} - -func (probe *ProbeTracer) SetConnectDoneTimeError( - now time.Time, - err error, -) { - probe.stats.ConnectDoneTime = now - probe.stats.ConnectDoneError = err - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) TCP Done for Probe %v (with error %v) @ %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.ConnectDoneError, - probe.stats.ConnectDoneTime, - ) - } -} - -func (probe *ProbeTracer) SetGetConnTime(now time.Time) { - probe.stats.GetConnectionStartTime = now - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) Started getting connection for Probe %v @ %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.GetConnectionStartTime, - ) - } -} - -func (probe *ProbeTracer) SetGotConnTimeInfo( - now time.Time, - gotConnInfo httptrace.GotConnInfo, -) { - probe.stats.GetConnectionDoneTime = now - probe.stats.ConnInfo = gotConnInfo - probe.stats.ConnectionReused = gotConnInfo.Reused - if (probe.probeType == SelfUp || probe.probeType == SelfDown) && !gotConnInfo.Reused { - fmt.Fprintf( - os.Stderr, - "A self probe sent using a new connection!\n", - ) - } - if gotConnInfo.Reused { - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) Got a reused connection for Probe %v at %v with info %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.GetConnectionDoneTime, - probe.stats.ConnInfo, - ) - } - } -} - -func (probe *ProbeTracer) SetTLSHandshakeStartTime( - now time.Time, -) { - probe.stats.TLSStartTime = utilities.Some(now) - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) Started TLS Handshake for Probe %v @ %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.TLSStartTime, - ) - } -} - -func (probe *ProbeTracer) SetTLSHandshakeDoneTimeState( - now time.Time, - connectionState tls.ConnectionState, -) { - probe.stats.TLSDoneTime = utilities.Some(now) - probe.stats.TLSConnInfo = connectionState - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) Completed TLS handshake for Probe %v at %v with info %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.TLSDoneTime, - probe.stats.TLSConnInfo, - ) - } -} - -func (probe *ProbeTracer) SetHttpWroteRequestTimeInfo( - now time.Time, - info httptrace.WroteRequestInfo, -) { - probe.stats.HttpWroteRequestTime = now - probe.stats.HttpInfo = info - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) Http finished writing request for Probe %v at %v with info %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.HttpWroteRequestTime, - probe.stats.HttpInfo, - ) - } -} - -func (probe *ProbeTracer) SetHttpResponseReadyTime( - now time.Time, -) { - probe.stats.HttpResponseReadyTime = now - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) Http response is ready for Probe %v at %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.HttpResponseReadyTime, - ) - } -} diff --git a/stabilizer/rev3.go b/stabilizer/rev3.go index 4ab0bd9..5d1aeec 100644 --- a/stabilizer/rev3.go +++ b/stabilizer/rev3.go @@ -6,6 +6,7 @@ import ( "github.com/network-quality/goresponsiveness/debug" "github.com/network-quality/goresponsiveness/ms" + "github.com/network-quality/goresponsiveness/probe" "github.com/network-quality/goresponsiveness/rpm" "github.com/network-quality/goresponsiveness/utilities" ) @@ -55,7 +56,7 @@ func NewProbeStabilizer( dbgLevel: debugLevel} } -func (r3 *ProbeStabilizer) AddMeasurement(measurement rpm.ProbeDataPoint) { +func (r3 *ProbeStabilizer) AddMeasurement(measurement probe.ProbeDataPoint) { r3.m.Lock() defer r3.m.Unlock() |
