summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2023-04-22 01:27:59 -0400
committerWill Hawkins <[email protected]>2023-04-22 01:27:59 -0400
commitd5ec3aba77624387711ffa90e6960e406e9790e6 (patch)
tree6c32da06f26e9abc0bc1821b2457a5f32d5cb098
parentb2e528e07842488e573aefe783f1da755f818ffa (diff)
[Refactor] Move components into separate packages
A long-overdue change to split certain packages once smashed into the RPM package into their own package. The resulting code should make it easier for people to navigate the source code. In the process, fixed a bug where a self probe being started on a load-generating connection races with the establishment of the load-generating connection and causes a panic because the self probe is not establishing a connection on an already established connection.
-rw-r--r--lgc/collection.go54
-rw-r--r--lgc/download.go373
-rw-r--r--lgc/lgc.go533
-rw-r--r--lgc/upload.go206
-rw-r--r--networkQuality.go44
-rw-r--r--rpm/rpm.go596
-rw-r--r--stabilizer/rev3.go3
7 files changed, 748 insertions, 1061 deletions
diff --git a/lgc/collection.go b/lgc/collection.go
new file mode 100644
index 0000000..7560186
--- /dev/null
+++ b/lgc/collection.go
@@ -0,0 +1,54 @@
+/*
+ * This file is part of Go Responsiveness.
+ *
+ * Go Responsiveness is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software Foundation,
+ * either version 2 of the License, or (at your option) any later version.
+ * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package lgc
+
+import (
+ "fmt"
+ "sync"
+)
+
+type LoadGeneratingConnectionCollection struct {
+ Lock sync.Mutex
+ LGCs *[]LoadGeneratingConnection
+}
+
+func NewLoadGeneratingConnectionCollection() LoadGeneratingConnectionCollection {
+ return LoadGeneratingConnectionCollection{LGCs: new([]LoadGeneratingConnection)}
+}
+
+func (collection *LoadGeneratingConnectionCollection) Get(idx int) (*LoadGeneratingConnection, error) {
+ if collection.Lock.TryLock() {
+ collection.Lock.Unlock()
+ return nil, fmt.Errorf("collection is unlocked")
+ }
+
+ if idx > len(*collection.LGCs) {
+ return nil, fmt.Errorf("index too large")
+ }
+ return &(*collection.LGCs)[idx], nil
+}
+
+func (collection *LoadGeneratingConnectionCollection) Append(conn LoadGeneratingConnection) error {
+ if collection.Lock.TryLock() {
+ collection.Lock.Unlock()
+ return fmt.Errorf("collection is unlocked")
+ }
+ *collection.LGCs = append(*collection.LGCs, conn)
+ return nil
+}
+
+func (collection *LoadGeneratingConnectionCollection) Len() int {
+ return len(*collection.LGCs)
+}
diff --git a/lgc/download.go b/lgc/download.go
new file mode 100644
index 0000000..71ed647
--- /dev/null
+++ b/lgc/download.go
@@ -0,0 +1,373 @@
+/*
+ * This file is part of Go Responsiveness.
+ *
+ * Go Responsiveness is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software Foundation,
+ * either version 2 of the License, or (at your option) any later version.
+ * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package lgc
+
+import (
+ "context"
+ "crypto/tls"
+ "fmt"
+ "io"
+ "net/http"
+ "net/http/httptrace"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/network-quality/goresponsiveness/debug"
+ "github.com/network-quality/goresponsiveness/stats"
+ "github.com/network-quality/goresponsiveness/traceable"
+ "github.com/network-quality/goresponsiveness/utilities"
+)
+
+// TODO: All 64-bit fields that are accessed atomically must
+// appear at the top of this struct.
+type LoadGeneratingConnectionDownload struct {
+ downloaded uint64
+ lastIntervalEnd int64
+ ConnectToAddr string
+ URL string
+ downloadStartTime time.Time
+ lastDownloaded uint64
+ client *http.Client
+ debug debug.DebugLevel
+ InsecureSkipVerify bool
+ KeyLogger io.Writer
+ clientId uint64
+ tracer *httptrace.ClientTrace
+ stats stats.TraceStats
+ status LgcStatus
+ statusLock *sync.Mutex
+ statusWaiter *sync.Cond
+}
+
+func NewLoadGeneratingConnectionDownload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionDownload {
+ lgd := LoadGeneratingConnectionDownload{
+ URL: url,
+ KeyLogger: keyLogger,
+ ConnectToAddr: connectToAddr,
+ InsecureSkipVerify: insecureSkipVerify,
+ statusLock: &sync.Mutex{},
+ }
+ lgd.statusWaiter = sync.NewCond(lgd.statusLock)
+ return lgd
+}
+
+func (lgd *LoadGeneratingConnectionDownload) WaitUntilStarted(ctxt context.Context) bool {
+ conditional := func() bool { return lgd.status != LGC_STATUS_NOT_STARTED }
+ go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgd.statusWaiter)
+ return utilities.WaitWithContext(ctxt, &conditional, lgd.statusLock, lgd.statusWaiter)
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetDnsStartTimeInfo(
+ now time.Time,
+ dnsStartInfo httptrace.DNSStartInfo,
+) {
+ lgd.stats.DnsStartTime = now
+ lgd.stats.DnsStart = dnsStartInfo
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "DNS Start for %v: %v\n",
+ lgd.ClientId(),
+ dnsStartInfo,
+ )
+ }
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetDnsDoneTimeInfo(
+ now time.Time,
+ dnsDoneInfo httptrace.DNSDoneInfo,
+) {
+ lgd.stats.DnsDoneTime = now
+ lgd.stats.DnsDone = dnsDoneInfo
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "DNS Done for %v: %v\n",
+ lgd.ClientId(),
+ lgd.stats.DnsDone,
+ )
+ }
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetConnectStartTime(
+ now time.Time,
+) {
+ lgd.stats.ConnectStartTime = now
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "TCP Start for %v at %v\n",
+ lgd.ClientId(),
+ lgd.stats.ConnectStartTime,
+ )
+ }
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetConnectDoneTimeError(
+ now time.Time,
+ err error,
+) {
+ lgd.stats.ConnectDoneTime = now
+ lgd.stats.ConnectDoneError = err
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "TCP Done for %v (with error %v) @ %v\n",
+ lgd.ClientId(),
+ lgd.stats.ConnectDoneError,
+ lgd.stats.ConnectDoneTime,
+ )
+ }
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetGetConnTime(now time.Time) {
+ lgd.stats.GetConnectionStartTime = now
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "Started getting connection for %v @ %v\n",
+ lgd.ClientId(),
+ lgd.stats.GetConnectionStartTime,
+ )
+ }
+ lgd.statusLock.Lock()
+ lgd.status = LGC_STATUS_RUNNING
+ lgd.statusWaiter.Broadcast()
+ lgd.statusLock.Unlock()
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetGotConnTimeInfo(
+ now time.Time,
+ gotConnInfo httptrace.GotConnInfo,
+) {
+ if gotConnInfo.Reused {
+ fmt.Printf("Unexpectedly reusing a connection!\n")
+ panic(!gotConnInfo.Reused)
+ }
+ lgd.stats.GetConnectionDoneTime = now
+ lgd.stats.ConnInfo = gotConnInfo
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "Got connection for %v at %v with info %v\n",
+ lgd.ClientId(),
+ lgd.stats.GetConnectionDoneTime,
+ lgd.stats.ConnInfo,
+ )
+ }
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetTLSHandshakeStartTime(
+ now time.Time,
+) {
+ lgd.stats.TLSStartTime = utilities.Some(now)
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "Started TLS Handshake for %v @ %v\n",
+ lgd.ClientId(),
+ lgd.stats.TLSStartTime,
+ )
+ }
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetTLSHandshakeDoneTimeState(
+ now time.Time,
+ connectionState tls.ConnectionState,
+) {
+ lgd.stats.TLSDoneTime = utilities.Some(now)
+ lgd.stats.TLSConnInfo = connectionState
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "Completed TLS handshake for %v at %v with info %v\n",
+ lgd.ClientId(),
+ lgd.stats.TLSDoneTime,
+ lgd.stats.TLSConnInfo,
+ )
+ }
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetHttpWroteRequestTimeInfo(
+ now time.Time,
+ info httptrace.WroteRequestInfo,
+) {
+ lgd.stats.HttpWroteRequestTime = now
+ lgd.stats.HttpInfo = info
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "(lgd) Http finished writing request for %v at %v with info %v\n",
+ lgd.ClientId(),
+ lgd.stats.HttpWroteRequestTime,
+ lgd.stats.HttpInfo,
+ )
+ }
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetHttpResponseReadyTime(
+ now time.Time,
+) {
+ lgd.stats.HttpResponseReadyTime = now
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "Got the first byte of HTTP response headers for %v at %v\n",
+ lgd.ClientId(),
+ lgd.stats.HttpResponseReadyTime,
+ )
+ }
+}
+
+func (lgd *LoadGeneratingConnectionDownload) ClientId() uint64 {
+ return lgd.clientId
+}
+
+func (lgd *LoadGeneratingConnectionDownload) TransferredInInterval() (uint64, time.Duration) {
+ transferred := atomic.SwapUint64(&lgd.downloaded, 0)
+ newIntervalEnd := (time.Now().Sub(lgd.downloadStartTime)).Nanoseconds()
+ previousIntervalEnd := atomic.SwapInt64(&lgd.lastIntervalEnd, newIntervalEnd)
+ intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd)
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf("download: Transferred: %v bytes in %v.\n", transferred, intervalLength)
+ }
+ return transferred, intervalLength
+}
+
+func (lgd *LoadGeneratingConnectionDownload) Client() *http.Client {
+ return lgd.client
+}
+
+type countingReader struct {
+ n *uint64
+ ctx context.Context
+ readable io.Reader
+}
+
+func (cr *countingReader) Read(p []byte) (n int, err error) {
+ if cr.ctx.Err() != nil {
+ return 0, io.EOF
+ }
+
+ n, err = cr.readable.Read(p)
+ atomic.AddUint64(cr.n, uint64(n))
+ return
+}
+
+func (lgd *LoadGeneratingConnectionDownload) Start(
+ parentCtx context.Context,
+ debugLevel debug.DebugLevel,
+) bool {
+ lgd.downloaded = 0
+ lgd.debug = debugLevel
+ lgd.clientId = utilities.GenerateUniqueId()
+
+ transport := &http.Transport{
+ Proxy: http.ProxyFromEnvironment,
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: lgd.InsecureSkipVerify,
+ },
+ }
+
+ if !utilities.IsInterfaceNil(lgd.KeyLogger) {
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "Using an SSL Key Logger for this load-generating download.\n",
+ )
+ }
+
+ // The presence of a custom TLSClientConfig in a *generic* `transport`
+ // means that go will default to HTTP/1.1 and cowardly avoid HTTP/2:
+ // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278
+ // Also, it would appear that the API's choice of HTTP vs HTTP2 can
+ // depend on whether the url contains
+ // https:// or http://:
+ // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L74
+ transport.TLSClientConfig.KeyLogWriter = lgd.KeyLogger
+ }
+ transport.TLSClientConfig.InsecureSkipVerify = lgd.InsecureSkipVerify
+
+ utilities.OverrideHostTransport(transport, lgd.ConnectToAddr)
+
+ lgd.client = &http.Client{Transport: transport}
+ lgd.tracer = traceable.GenerateHttpTimingTracer(lgd, lgd.debug)
+
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "Started a load-generating download (id: %v).\n",
+ lgd.clientId,
+ )
+ }
+
+ go lgd.doDownload(parentCtx)
+ return true
+}
+
+func (lgd *LoadGeneratingConnectionDownload) Status() LgcStatus {
+ return lgd.status
+}
+
+func (lgd *LoadGeneratingConnectionDownload) Stats() *stats.TraceStats {
+ return &lgd.stats
+}
+
+func (lgd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) error {
+ var request *http.Request = nil
+ var get *http.Response = nil
+ var err error = nil
+
+ if request, err = http.NewRequestWithContext(
+ httptrace.WithClientTrace(ctx, lgd.tracer),
+ "GET",
+ lgd.URL,
+ nil,
+ ); err != nil {
+ lgd.statusLock.Lock()
+ lgd.status = LGC_STATUS_ERROR
+ lgd.statusWaiter.Broadcast()
+ lgd.statusLock.Unlock()
+ return err
+ }
+
+ // Used to disable compression
+ request.Header.Set("Accept-Encoding", "identity")
+ request.Header.Set("User-Agent", utilities.UserAgent())
+
+ lgd.downloadStartTime = time.Now()
+ lgd.lastIntervalEnd = 0
+
+ if get, err = lgd.client.Do(request); err != nil {
+ lgd.statusLock.Lock()
+ lgd.status = LGC_STATUS_ERROR
+ lgd.statusWaiter.Broadcast()
+ lgd.statusLock.Unlock()
+ return err
+ }
+
+ // Header.Get returns "" when not set
+ if get.Header.Get("Content-Encoding") != "" {
+ lgd.statusLock.Lock()
+ lgd.status = LGC_STATUS_ERROR
+ lgd.statusWaiter.Broadcast()
+ lgd.statusLock.Unlock()
+ fmt.Printf("Content-Encoding header was set (compression not allowed)")
+ return fmt.Errorf("Content-Encoding header was set (compression not allowed)")
+ }
+ cr := &countingReader{n: &lgd.downloaded, ctx: ctx, readable: get.Body}
+ _, _ = io.Copy(io.Discard, cr)
+
+ lgd.statusLock.Lock()
+ lgd.status = LGC_STATUS_DONE
+ lgd.statusWaiter.Broadcast()
+ lgd.statusLock.Unlock()
+
+ get.Body.Close()
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf("Ending a load-generating download.\n")
+ }
+
+ return nil
+}
diff --git a/lgc/lgc.go b/lgc/lgc.go
index 16f67d2..b16eb76 100644
--- a/lgc/lgc.go
+++ b/lgc/lgc.go
@@ -16,545 +16,28 @@ package lgc
import (
"context"
- "crypto/tls"
- "fmt"
- "io"
"net/http"
- "net/http/httptrace"
- "sync"
- "sync/atomic"
"time"
"github.com/network-quality/goresponsiveness/debug"
"github.com/network-quality/goresponsiveness/stats"
- "github.com/network-quality/goresponsiveness/traceable"
- "github.com/network-quality/goresponsiveness/utilities"
)
type LoadGeneratingConnection interface {
Start(context.Context, debug.DebugLevel) bool
TransferredInInterval() (uint64, time.Duration)
Client() *http.Client
- IsValid() bool
+ Status() LgcStatus
ClientId() uint64
Stats() *stats.TraceStats
WaitUntilStarted(context.Context) bool
}
-type LoadGeneratingConnectionCollection struct {
- Lock sync.Mutex
- LGCs *[]LoadGeneratingConnection
-}
-
-func NewLoadGeneratingConnectionCollection() LoadGeneratingConnectionCollection {
- return LoadGeneratingConnectionCollection{LGCs: new([]LoadGeneratingConnection)}
-}
-
-func (collection *LoadGeneratingConnectionCollection) Get(idx int) (*LoadGeneratingConnection, error) {
- if collection.Lock.TryLock() {
- collection.Lock.Unlock()
- return nil, fmt.Errorf("collection is unlocked")
- }
-
- if idx > len(*collection.LGCs) {
- return nil, fmt.Errorf("index too large")
- }
- return &(*collection.LGCs)[idx], nil
-}
-
-func (collection *LoadGeneratingConnectionCollection) Append(conn LoadGeneratingConnection) error {
- if collection.Lock.TryLock() {
- collection.Lock.Unlock()
- return fmt.Errorf("collection is unlocked")
- }
- *collection.LGCs = append(*collection.LGCs, conn)
- return nil
-}
-
-// TODO: All 64-bit fields that are accessed atomically must
-// appear at the top of this struct.
-type LoadGeneratingConnectionDownload struct {
- downloaded uint64
- lastIntervalEnd int64
- ConnectToAddr string
- URL string
- downloadStartTime time.Time
- lastDownloaded uint64
- client *http.Client
- debug debug.DebugLevel
- valid bool
- validLock *sync.Mutex
- InsecureSkipVerify bool
- KeyLogger io.Writer
- clientId uint64
- tracer *httptrace.ClientTrace
- stats stats.TraceStats
- validWaiter *sync.Cond
-}
-
-func NewLoadGeneratingConnectionDownload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionDownload {
- lgd := LoadGeneratingConnectionDownload{
- URL: url,
- KeyLogger: keyLogger,
- ConnectToAddr: connectToAddr,
- InsecureSkipVerify: insecureSkipVerify,
- validLock: &sync.Mutex{},
- }
- lgd.validWaiter = sync.NewCond(lgd.validLock)
- return lgd
-}
-
-func (lgd *LoadGeneratingConnectionDownload) WaitUntilStarted(ctxt context.Context) bool {
- conditional := func() bool { return lgd.valid }
- go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgd.validWaiter)
- return utilities.WaitWithContext(ctxt, &conditional, lgd.validLock, lgd.validWaiter)
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetDnsStartTimeInfo(
- now time.Time,
- dnsStartInfo httptrace.DNSStartInfo,
-) {
- lgd.stats.DnsStartTime = now
- lgd.stats.DnsStart = dnsStartInfo
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "DNS Start for %v: %v\n",
- lgd.ClientId(),
- dnsStartInfo,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetDnsDoneTimeInfo(
- now time.Time,
- dnsDoneInfo httptrace.DNSDoneInfo,
-) {
- lgd.stats.DnsDoneTime = now
- lgd.stats.DnsDone = dnsDoneInfo
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "DNS Done for %v: %v\n",
- lgd.ClientId(),
- lgd.stats.DnsDone,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetConnectStartTime(
- now time.Time,
-) {
- lgd.stats.ConnectStartTime = now
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "TCP Start for %v at %v\n",
- lgd.ClientId(),
- lgd.stats.ConnectStartTime,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetConnectDoneTimeError(
- now time.Time,
- err error,
-) {
- lgd.stats.ConnectDoneTime = now
- lgd.stats.ConnectDoneError = err
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "TCP Done for %v (with error %v) @ %v\n",
- lgd.ClientId(),
- lgd.stats.ConnectDoneError,
- lgd.stats.ConnectDoneTime,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetGetConnTime(now time.Time) {
- lgd.stats.GetConnectionStartTime = now
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "Started getting connection for %v @ %v\n",
- lgd.ClientId(),
- lgd.stats.GetConnectionStartTime,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetGotConnTimeInfo(
- now time.Time,
- gotConnInfo httptrace.GotConnInfo,
-) {
- if gotConnInfo.Reused {
- fmt.Printf("Unexpectedly reusing a connection!\n")
- panic(!gotConnInfo.Reused)
- }
- lgd.stats.GetConnectionDoneTime = now
- lgd.stats.ConnInfo = gotConnInfo
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "Got connection for %v at %v with info %v\n",
- lgd.ClientId(),
- lgd.stats.GetConnectionDoneTime,
- lgd.stats.ConnInfo,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetTLSHandshakeStartTime(
- now time.Time,
-) {
- lgd.stats.TLSStartTime = utilities.Some(now)
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "Started TLS Handshake for %v @ %v\n",
- lgd.ClientId(),
- lgd.stats.TLSStartTime,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetTLSHandshakeDoneTimeState(
- now time.Time,
- connectionState tls.ConnectionState,
-) {
- lgd.stats.TLSDoneTime = utilities.Some(now)
- lgd.stats.TLSConnInfo = connectionState
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "Completed TLS handshake for %v at %v with info %v\n",
- lgd.ClientId(),
- lgd.stats.TLSDoneTime,
- lgd.stats.TLSConnInfo,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetHttpWroteRequestTimeInfo(
- now time.Time,
- info httptrace.WroteRequestInfo,
-) {
- lgd.stats.HttpWroteRequestTime = now
- lgd.stats.HttpInfo = info
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "(lgd) Http finished writing request for %v at %v with info %v\n",
- lgd.ClientId(),
- lgd.stats.HttpWroteRequestTime,
- lgd.stats.HttpInfo,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetHttpResponseReadyTime(
- now time.Time,
-) {
- lgd.stats.HttpResponseReadyTime = now
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "Got the first byte of HTTP response headers for %v at %v\n",
- lgd.ClientId(),
- lgd.stats.HttpResponseReadyTime,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) ClientId() uint64 {
- return lgd.clientId
-}
-
-func (lgd *LoadGeneratingConnectionDownload) TransferredInInterval() (uint64, time.Duration) {
- transferred := atomic.SwapUint64(&lgd.downloaded, 0)
- newIntervalEnd := (time.Now().Sub(lgd.downloadStartTime)).Nanoseconds()
- previousIntervalEnd := atomic.SwapInt64(&lgd.lastIntervalEnd, newIntervalEnd)
- intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd)
- if debug.IsDebug(lgd.debug) {
- fmt.Printf("download: Transferred: %v bytes in %v.\n", transferred, intervalLength)
- }
- return transferred, intervalLength
-}
-
-func (lgd *LoadGeneratingConnectionDownload) Client() *http.Client {
- return lgd.client
-}
-
-type countingReader struct {
- n *uint64
- ctx context.Context
- readable io.Reader
-}
-
-func (cr *countingReader) Read(p []byte) (n int, err error) {
- if cr.ctx.Err() != nil {
- return 0, io.EOF
- }
- n, err = cr.readable.Read(p)
- atomic.AddUint64(cr.n, uint64(n))
- return
-}
-
-func (lgd *LoadGeneratingConnectionDownload) Start(
- parentCtx context.Context,
- debugLevel debug.DebugLevel,
-) bool {
- lgd.downloaded = 0
- lgd.debug = debugLevel
- lgd.clientId = utilities.GenerateUniqueId()
-
- transport := &http.Transport{
- Proxy: http.ProxyFromEnvironment,
- TLSClientConfig: &tls.Config{
- InsecureSkipVerify: lgd.InsecureSkipVerify,
- },
- }
-
- if !utilities.IsInterfaceNil(lgd.KeyLogger) {
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "Using an SSL Key Logger for this load-generating download.\n",
- )
- }
-
- // The presence of a custom TLSClientConfig in a *generic* `transport`
- // means that go will default to HTTP/1.1 and cowardly avoid HTTP/2:
- // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278
- // Also, it would appear that the API's choice of HTTP vs HTTP2 can
- // depend on whether the url contains
- // https:// or http://:
- // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L74
- transport.TLSClientConfig.KeyLogWriter = lgd.KeyLogger
- }
- transport.TLSClientConfig.InsecureSkipVerify = lgd.InsecureSkipVerify
-
- utilities.OverrideHostTransport(transport, lgd.ConnectToAddr)
-
- lgd.client = &http.Client{Transport: transport}
- lgd.validLock.Lock()
- lgd.valid = true
- lgd.validLock.Unlock()
- lgd.tracer = traceable.GenerateHttpTimingTracer(lgd, lgd.debug)
-
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "Started a load-generating download (id: %v).\n",
- lgd.clientId,
- )
- }
+type LgcStatus int
- go lgd.doDownload(parentCtx)
- return true
-}
-
-func (lgd *LoadGeneratingConnectionDownload) IsValid() bool {
- return lgd.valid
-}
-
-func (lgd *LoadGeneratingConnectionDownload) Stats() *stats.TraceStats {
- return &lgd.stats
-}
-
-func (lgd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) {
- var request *http.Request = nil
- var get *http.Response = nil
- var err error = nil
-
- if request, err = http.NewRequestWithContext(
- httptrace.WithClientTrace(ctx, lgd.tracer),
- "GET",
- lgd.URL,
- nil,
- ); err != nil {
- lgd.validLock.Lock()
- lgd.valid = false
- lgd.validLock.Unlock()
- return
- }
-
- // Used to disable compression
- request.Header.Set("Accept-Encoding", "identity")
- request.Header.Set("User-Agent", utilities.UserAgent())
-
- lgd.downloadStartTime = time.Now()
- lgd.lastIntervalEnd = 0
-
- if get, err = lgd.client.Do(request); err != nil {
- lgd.validLock.Lock()
- lgd.valid = false
- lgd.validLock.Unlock()
- return
- }
-
- // Header.Get returns "" when not set
- if get.Header.Get("Content-Encoding") != "" {
- lgd.validLock.Lock()
- lgd.valid = false
- lgd.validLock.Unlock()
- fmt.Printf("Content-Encoding header was set (compression not allowed)")
- return
- }
- cr := &countingReader{n: &lgd.downloaded, ctx: ctx, readable: get.Body}
- _, _ = io.Copy(io.Discard, cr)
- get.Body.Close()
- if debug.IsDebug(lgd.debug) {
- fmt.Printf("Ending a load-generating download.\n")
- }
-}
-
-// TODO: All 64-bit fields that are accessed atomically must
-// appear at the top of this struct.
-type LoadGeneratingConnectionUpload struct {
- uploaded uint64
- lastIntervalEnd int64
- URL string
- ConnectToAddr string
- uploadStartTime time.Time
- lastUploaded uint64
- client *http.Client
- debug debug.DebugLevel
- valid bool
- validLock *sync.Mutex
- InsecureSkipVerify bool
- KeyLogger io.Writer
- clientId uint64
- validWaiter *sync.Cond
-}
-
-func NewLoadGeneratingConnectionUpload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionUpload {
- lgu := LoadGeneratingConnectionUpload{
- URL: url,
- KeyLogger: keyLogger,
- ConnectToAddr: connectToAddr,
- InsecureSkipVerify: insecureSkipVerify,
- validLock: &sync.Mutex{},
- }
- lgu.validWaiter = sync.NewCond(lgu.validLock)
- return lgu
-}
-
-func (lgu *LoadGeneratingConnectionUpload) WaitUntilStarted(ctxt context.Context) bool {
- conditional := func() bool { return lgu.valid }
- go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgu.validWaiter)
- return utilities.WaitWithContext(ctxt, &conditional, lgu.validLock, lgu.validWaiter)
-}
-
-func (lgu *LoadGeneratingConnectionUpload) ClientId() uint64 {
- return lgu.clientId
-}
-
-func (lgu *LoadGeneratingConnectionUpload) TransferredInInterval() (uint64, time.Duration) {
- transferred := atomic.SwapUint64(&lgu.uploaded, 0)
- newIntervalEnd := (time.Now().Sub(lgu.uploadStartTime)).Nanoseconds()
- previousIntervalEnd := atomic.SwapInt64(&lgu.lastIntervalEnd, newIntervalEnd)
- intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd)
- if debug.IsDebug(lgu.debug) {
- fmt.Printf("upload: Transferred: %v bytes in %v.\n", transferred, intervalLength)
- }
- return transferred, intervalLength
-}
-
-func (lgu *LoadGeneratingConnectionUpload) Client() *http.Client {
- return lgu.client
-}
-
-func (lgu *LoadGeneratingConnectionUpload) IsValid() bool {
- return lgu.valid
-}
-
-type syntheticCountingReader struct {
- n *uint64
- ctx context.Context
-}
-
-func (s *syntheticCountingReader) Read(p []byte) (n int, err error) {
- if s.ctx.Err() != nil {
- return 0, io.EOF
- }
- err = nil
- n = len(p)
-
- atomic.AddUint64(s.n, uint64(n))
- return
-}
-
-func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) bool {
- lgu.uploaded = 0
- s := &syntheticCountingReader{n: &lgu.uploaded, ctx: ctx}
- var resp *http.Response = nil
- var request *http.Request = nil
- var err error
-
- if request, err = http.NewRequest(
- "POST",
- lgu.URL,
- s,
- ); err != nil {
- lgu.validLock.Lock()
- lgu.valid = false
- lgu.validLock.Unlock()
- return false
- }
-
- // Used to disable compression
- request.Header.Set("Accept-Encoding", "identity")
- request.Header.Set("User-Agent", utilities.UserAgent())
-
- lgu.uploadStartTime = time.Now()
- lgu.lastIntervalEnd = 0
-
- if resp, err = lgu.client.Do(request); err != nil {
- lgu.validLock.Lock()
- lgu.valid = false
- lgu.validLock.Unlock()
- return false
- }
-
- resp.Body.Close()
- if debug.IsDebug(lgu.debug) {
- fmt.Printf("Ending a load-generating upload.\n")
- }
- return true
-}
-
-func (lgu *LoadGeneratingConnectionUpload) Start(
- parentCtx context.Context,
- debugLevel debug.DebugLevel,
-) bool {
- lgu.uploaded = 0
- lgu.clientId = utilities.GenerateUniqueId()
- lgu.debug = debugLevel
-
- transport := &http.Transport{
- Proxy: http.ProxyFromEnvironment,
- TLSClientConfig: &tls.Config{
- InsecureSkipVerify: lgu.InsecureSkipVerify,
- },
- }
-
- if !utilities.IsInterfaceNil(lgu.KeyLogger) {
- if debug.IsDebug(lgu.debug) {
- fmt.Printf(
- "Using an SSL Key Logger for this load-generating upload.\n",
- )
- }
- transport.TLSClientConfig.KeyLogWriter = lgu.KeyLogger
- }
-
- utilities.OverrideHostTransport(transport, lgu.ConnectToAddr)
-
- lgu.client = &http.Client{Transport: transport}
-
- lgu.validLock.Lock()
- lgu.valid = true
- lgu.validLock.Unlock()
-
- if debug.IsDebug(lgu.debug) {
- fmt.Printf("Started a load-generating upload (id: %v).\n", lgu.clientId)
- }
-
- go lgu.doUpload(parentCtx)
- return true
-}
-
-func (lgu *LoadGeneratingConnectionUpload) Stats() *stats.TraceStats {
- // Get all your stats from the download side of the LGC.
- return nil
-}
+const (
+ LGC_STATUS_NOT_STARTED LgcStatus = iota
+ LGC_STATUS_RUNNING
+ LGC_STATUS_DONE
+ LGC_STATUS_ERROR
+)
diff --git a/lgc/upload.go b/lgc/upload.go
new file mode 100644
index 0000000..f0c772e
--- /dev/null
+++ b/lgc/upload.go
@@ -0,0 +1,206 @@
+/*
+ * This file is part of Go Responsiveness.
+ *
+ * Go Responsiveness is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software Foundation,
+ * either version 2 of the License, or (at your option) any later version.
+ * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package lgc
+
+import (
+ "context"
+ "crypto/tls"
+ "fmt"
+ "io"
+ "net/http"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/network-quality/goresponsiveness/debug"
+ "github.com/network-quality/goresponsiveness/stats"
+ "github.com/network-quality/goresponsiveness/utilities"
+)
+
+// TODO: All 64-bit fields that are accessed atomically must
+// appear at the top of this struct.
+type LoadGeneratingConnectionUpload struct {
+ uploaded uint64
+ lastIntervalEnd int64
+ URL string
+ ConnectToAddr string
+ uploadStartTime time.Time
+ lastUploaded uint64
+ client *http.Client
+ debug debug.DebugLevel
+ InsecureSkipVerify bool
+ KeyLogger io.Writer
+ clientId uint64
+ status LgcStatus
+ statusLock *sync.Mutex
+ statusWaiter *sync.Cond
+}
+
+func NewLoadGeneratingConnectionUpload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionUpload {
+ lgu := LoadGeneratingConnectionUpload{
+ URL: url,
+ KeyLogger: keyLogger,
+ ConnectToAddr: connectToAddr,
+ InsecureSkipVerify: insecureSkipVerify,
+ statusLock: &sync.Mutex{},
+ }
+ lgu.status = LGC_STATUS_NOT_STARTED
+ lgu.statusWaiter = sync.NewCond(lgu.statusLock)
+ return lgu
+}
+
+func (lgu *LoadGeneratingConnectionUpload) WaitUntilStarted(ctxt context.Context) bool {
+ conditional := func() bool { return lgu.status != LGC_STATUS_NOT_STARTED }
+ go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgu.statusWaiter)
+ return utilities.WaitWithContext(ctxt, &conditional, lgu.statusLock, lgu.statusWaiter)
+}
+
+func (lgu *LoadGeneratingConnectionUpload) ClientId() uint64 {
+ return lgu.clientId
+}
+
+func (lgu *LoadGeneratingConnectionUpload) TransferredInInterval() (uint64, time.Duration) {
+ transferred := atomic.SwapUint64(&lgu.uploaded, 0)
+ newIntervalEnd := (time.Now().Sub(lgu.uploadStartTime)).Nanoseconds()
+ previousIntervalEnd := atomic.SwapInt64(&lgu.lastIntervalEnd, newIntervalEnd)
+ intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd)
+ if debug.IsDebug(lgu.debug) {
+ fmt.Printf("upload: Transferred: %v bytes in %v.\n", transferred, intervalLength)
+ }
+ return transferred, intervalLength
+}
+
+func (lgu *LoadGeneratingConnectionUpload) Client() *http.Client {
+ return lgu.client
+}
+
+func (lgu *LoadGeneratingConnectionUpload) Status() LgcStatus {
+ return lgu.status
+}
+
+type syntheticCountingReader struct {
+ n *uint64
+ ctx context.Context
+ lgu *LoadGeneratingConnectionUpload
+}
+
+func (s *syntheticCountingReader) Read(p []byte) (n int, err error) {
+ if s.ctx.Err() != nil {
+ return 0, io.EOF
+ }
+ if n == 0 {
+ s.lgu.statusLock.Lock()
+ s.lgu.status = LGC_STATUS_RUNNING
+ s.lgu.statusWaiter.Broadcast()
+ s.lgu.statusLock.Unlock()
+ }
+ err = nil
+ n = len(p)
+
+ atomic.AddUint64(s.n, uint64(n))
+ return
+}
+
+func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) error {
+ lgu.uploaded = 0
+ s := &syntheticCountingReader{n: &lgu.uploaded, ctx: ctx, lgu: lgu}
+ var resp *http.Response = nil
+ var request *http.Request = nil
+ var err error
+
+ if request, err = http.NewRequest(
+ "POST",
+ lgu.URL,
+ s,
+ ); err != nil {
+ lgu.statusLock.Lock()
+ lgu.status = LGC_STATUS_ERROR
+ lgu.statusWaiter.Broadcast()
+ lgu.statusLock.Unlock()
+ return err
+ }
+
+ // Used to disable compression
+ request.Header.Set("Accept-Encoding", "identity")
+ request.Header.Set("User-Agent", utilities.UserAgent())
+
+ lgu.uploadStartTime = time.Now()
+ lgu.lastIntervalEnd = 0
+
+ lgu.statusLock.Lock()
+ lgu.status = LGC_STATUS_RUNNING
+ lgu.statusWaiter.Broadcast()
+ lgu.statusLock.Unlock()
+
+ if resp, err = lgu.client.Do(request); err != nil {
+ lgu.statusLock.Lock()
+ lgu.status = LGC_STATUS_ERROR
+ lgu.statusWaiter.Broadcast()
+ lgu.statusLock.Unlock()
+ return err
+ }
+
+ lgu.statusLock.Lock()
+ lgu.status = LGC_STATUS_DONE
+ lgu.statusWaiter.Broadcast()
+ lgu.statusLock.Unlock()
+
+ resp.Body.Close()
+ if debug.IsDebug(lgu.debug) {
+ fmt.Printf("Ending a load-generating upload.\n")
+ }
+ return nil
+}
+
+func (lgu *LoadGeneratingConnectionUpload) Start(
+ parentCtx context.Context,
+ debugLevel debug.DebugLevel,
+) bool {
+ lgu.uploaded = 0
+ lgu.clientId = utilities.GenerateUniqueId()
+ lgu.debug = debugLevel
+
+ transport := &http.Transport{
+ Proxy: http.ProxyFromEnvironment,
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: lgu.InsecureSkipVerify,
+ },
+ }
+
+ if !utilities.IsInterfaceNil(lgu.KeyLogger) {
+ if debug.IsDebug(lgu.debug) {
+ fmt.Printf(
+ "Using an SSL Key Logger for this load-generating upload.\n",
+ )
+ }
+ transport.TLSClientConfig.KeyLogWriter = lgu.KeyLogger
+ }
+
+ utilities.OverrideHostTransport(transport, lgu.ConnectToAddr)
+
+ lgu.client = &http.Client{Transport: transport}
+
+ if debug.IsDebug(lgu.debug) {
+ fmt.Printf("Started a load-generating upload (id: %v).\n", lgu.clientId)
+ }
+
+ go lgu.doUpload(parentCtx)
+ return true
+}
+
+func (lgu *LoadGeneratingConnectionUpload) Stats() *stats.TraceStats {
+ // Get all your stats from the download side of the LGC.
+ return nil
+}
diff --git a/networkQuality.go b/networkQuality.go
index 06ac0a0..309aa94 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -32,6 +32,7 @@ import (
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
"github.com/network-quality/goresponsiveness/ms"
+ "github.com/network-quality/goresponsiveness/probe"
"github.com/network-quality/goresponsiveness/rpm"
"github.com/network-quality/goresponsiveness/stabilizer"
"github.com/network-quality/goresponsiveness/timeoutat"
@@ -246,8 +247,8 @@ func main() {
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
- var selfProbeDataLogger datalogger.DataLogger[rpm.ProbeDataPoint] = nil
- var foreignProbeDataLogger datalogger.DataLogger[rpm.ProbeDataPoint] = nil
+ var selfProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] = nil
+ var foreignProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] = nil
var downloadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil
var uploadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil
var granularThroughputDataLogger datalogger.DataLogger[rpm.GranularThroughputDataPoint] = nil
@@ -275,7 +276,7 @@ func main() {
"-throughput-granular-"+unique,
)
- selfProbeDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ProbeDataPoint](
+ selfProbeDataLogger, err = datalogger.CreateCSVDataLogger[probe.ProbeDataPoint](
dataLoggerSelfFilename,
)
if err != nil {
@@ -286,7 +287,7 @@ func main() {
selfProbeDataLogger = nil
}
- foreignProbeDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ProbeDataPoint](
+ foreignProbeDataLogger, err = datalogger.CreateCSVDataLogger[probe.ProbeDataPoint](
dataLoggerForeignFilename,
)
if err != nil {
@@ -333,10 +334,10 @@ func main() {
// If, for some reason, the data loggers are nil, make them Null Data Loggers so that we don't have conditional
// code later.
if selfProbeDataLogger == nil {
- selfProbeDataLogger = datalogger.CreateNullDataLogger[rpm.ProbeDataPoint]()
+ selfProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]()
}
if foreignProbeDataLogger == nil {
- foreignProbeDataLogger = datalogger.CreateNullDataLogger[rpm.ProbeDataPoint]()
+ foreignProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]()
}
if downloadThroughputDataLogger == nil {
downloadThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]()
@@ -352,26 +353,26 @@ func main() {
* Create (and then, ironically, name) two anonymous functions that, when invoked,
* will create load-generating connections for upload/download
*/
- generate_lgd := func() lgc.LoadGeneratingConnection {
+ generateLgdc := func() lgc.LoadGeneratingConnection {
lgd := lgc.NewLoadGeneratingConnectionDownload(config.Urls.LargeUrl, sslKeyFileConcurrentWriter, config.ConnectToAddr, *insecureSkipVerify)
return &lgd
}
- generate_lgu := func() lgc.LoadGeneratingConnection {
+ generateLguc := func() lgc.LoadGeneratingConnection {
lgu := lgc.NewLoadGeneratingConnectionUpload(config.Urls.UploadUrl, sslKeyFileConcurrentWriter, config.ConnectToAddr, *insecureSkipVerify)
return &lgu
}
- generateSelfProbeConfiguration := func() rpm.ProbeConfiguration {
- return rpm.ProbeConfiguration{
+ generateSelfProbeConfiguration := func() probe.ProbeConfiguration {
+ return probe.ProbeConfiguration{
URL: config.Urls.SmallUrl,
ConnectToAddr: config.ConnectToAddr,
InsecureSkipVerify: *insecureSkipVerify,
}
}
- generateForeignProbeConfiguration := func() rpm.ProbeConfiguration {
- return rpm.ProbeConfiguration{
+ generateForeignProbeConfiguration := func() probe.ProbeConfiguration {
+ return probe.ProbeConfiguration{
URL: config.Urls.SmallUrl,
ConnectToAddr: config.ConnectToAddr,
InsecureSkipVerify: *insecureSkipVerify,
@@ -393,7 +394,7 @@ func main() {
networkActivityCtx,
downloadLoadGeneratorOperatorCtx,
time.Second,
- generate_lgd,
+ generateLgdc,
&downloadLoadGeneratingConnectionCollection,
*calculateExtendedStats,
downloadDebugging,
@@ -402,7 +403,7 @@ func main() {
networkActivityCtx,
uploadLoadGeneratorOperatorCtx,
time.Second,
- generate_lgu,
+ generateLguc,
&uploadLoadGeneratingConnectionCollection,
*calculateExtendedStats,
uploadDebugging,
@@ -410,7 +411,7 @@ func main() {
// Handles for the first connection that the load-generating go routines (both up and
// download) open are passed back on the self[Down|Up]ProbeConnectionCommunicationChannel
- // so that we can then start probes on those handles.
+ // so that we can then start probes on those connections.
selfDownProbeConnection := <-selfDownProbeConnectionCommunicationChannel
selfUpProbeConnection := <-selfUpProbeConnectionCommunicationChannel
@@ -532,7 +533,7 @@ timeout:
fmt.Printf(
"################# Responsiveness is instantaneously %s.\n", utilities.Conditional(responsivenessIsStable, "stable", "unstable"))
}
- if probeMeasurement.Type == rpm.Foreign {
+ if probeMeasurement.Type == probe.Foreign {
// There may be more than one round trip accumulated together. If that is the case,
// we will blow them apart in to three separate measurements and each one will just
// be 1 / measurement.RoundTripCount of the total length.
@@ -540,13 +541,13 @@ timeout:
foreignRtts.AddElement(probeMeasurement.Duration.Seconds() / float64(probeMeasurement.RoundTripCount))
}
- } else if probeMeasurement.Type == rpm.SelfDown || probeMeasurement.Type == rpm.SelfUp {
+ } else if probeMeasurement.Type == probe.SelfDown || probeMeasurement.Type == probe.SelfUp {
selfRtts.AddElement(probeMeasurement.Duration.Seconds())
}
- if probeMeasurement.Type == rpm.Foreign {
+ if probeMeasurement.Type == probe.Foreign {
foreignProbeDataLogger.LogRecord(probeMeasurement)
- } else if probeMeasurement.Type == rpm.SelfDown || probeMeasurement.Type == rpm.SelfUp {
+ } else if probeMeasurement.Type == probe.SelfDown || probeMeasurement.Type == probe.SelfUp {
selfProbeDataLogger.LogRecord(probeMeasurement)
}
}
@@ -592,10 +593,11 @@ timeout:
defer downloadLoadGeneratingConnectionCollection.Lock.Unlock()
// Note: We do not trace upload connections!
- for i := 0; i < len(*downloadLoadGeneratingConnectionCollection.LGCs); i++ {
+ for i := 0; i < downloadLoadGeneratingConnectionCollection.Len(); i++ {
// Assume that extended statistics are available -- the check was done explicitly at
// program startup if the calculateExtendedStats flag was set by the user on the command line.
- if err := extendedStats.IncorporateConnectionStats((*downloadLoadGeneratingConnectionCollection.LGCs)[i].Stats().ConnInfo.Conn); err != nil {
+ currentLgc, _ := downloadLoadGeneratingConnectionCollection.Get(i)
+ if err := extendedStats.IncorporateConnectionStats((*currentLgc).Stats().ConnInfo.Conn); err != nil {
fmt.Fprintf(
os.Stderr,
"Warning: Could not add extended stats for the connection: %v\n",
diff --git a/rpm/rpm.go b/rpm/rpm.go
index db6f11e..99a52b6 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -20,7 +20,6 @@ import (
"fmt"
"io"
"net/http"
- "net/http/httptrace"
"os"
"sync"
"time"
@@ -29,8 +28,7 @@ import (
"github.com/network-quality/goresponsiveness/debug"
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
- "github.com/network-quality/goresponsiveness/stats"
- "github.com/network-quality/goresponsiveness/traceable"
+ "github.com/network-quality/goresponsiveness/probe"
"github.com/network-quality/goresponsiveness/utilities"
)
@@ -59,22 +57,6 @@ func addFlows(
return toAdd
}
-type ProbeConfiguration struct {
- ConnectToAddr string
- URL string
- Host string
- InsecureSkipVerify bool
-}
-
-type ProbeDataPoint struct {
- Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"`
- RoundTripCount uint64 `Description:"The number of round trips measured by this data point."`
- Duration time.Duration `Description:"The duration for this measurement." Formatter:"Seconds"`
- TCPRtt time.Duration `Description:"The underlying connection's RTT at probe time." Formatter:"Seconds"`
- TCPCwnd uint32 `Description:"The underlying connection's congestion window at probe time."`
- Type ProbeType `Description:"The type of the probe." Formatter:"Value"`
-}
-
type GranularThroughputDataPoint struct {
Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"`
Throughput float64 `Description:"Instantaneous throughput (B/s)."`
@@ -94,182 +76,26 @@ type ThroughputDataPoint struct {
type SelfDataCollectionResult struct {
RateBps float64
LGCs []lgc.LoadGeneratingConnection
- ProbeDataPoints []ProbeDataPoint
+ ProbeDataPoints []probe.ProbeDataPoint
LoggingContinuation func()
}
-type ProbeType int64
-
-const (
- SelfUp ProbeType = iota
- SelfDown
- Foreign
-)
-
-type ProbeRoundTripCountType uint16
-
-const (
- DefaultDownRoundTripCount ProbeRoundTripCountType = 1
- SelfUpRoundTripCount ProbeRoundTripCountType = 1
- SelfDownRoundTripCount ProbeRoundTripCountType = 1
- ForeignRoundTripCount ProbeRoundTripCountType = 3
-)
-
-func (pt ProbeType) Value() string {
- if pt == SelfUp {
- return "SelfUp"
- } else if pt == SelfDown {
- return "SelfDown"
- }
- return "Foreign"
-}
-
-func Probe(
- managingCtx context.Context,
- waitGroup *sync.WaitGroup,
- client *http.Client,
- probeUrl string,
- probeHost string, // optional: for use with a test_endpoint
- probeType ProbeType,
- result *chan ProbeDataPoint,
- captureExtendedStats bool,
- debugging *debug.DebugWithPrefix,
-) error {
-
- if waitGroup != nil {
- waitGroup.Add(1)
- defer waitGroup.Done()
- }
-
- if client == nil {
- return fmt.Errorf("cannot start a probe with a nil client")
- }
-
- probeId := utilities.GenerateUniqueId()
- probeTracer := NewProbeTracer(client, probeType, probeId, debugging)
- time_before_probe := time.Now()
- probe_req, err := http.NewRequestWithContext(
- httptrace.WithClientTrace(managingCtx, probeTracer.trace),
- "GET",
- probeUrl,
- nil,
- )
- if err != nil {
- return err
- }
-
- // Used to disable compression
- probe_req.Header.Set("Accept-Encoding", "identity")
- probe_req.Header.Set("User-Agent", utilities.UserAgent())
-
- probe_resp, err := client.Do(probe_req)
- if err != nil {
- return err
- }
-
- // Header.Get returns "" when not set
- if probe_resp.Header.Get("Content-Encoding") != "" {
- return fmt.Errorf("Content-Encoding header was set (compression not allowed)")
- }
-
- // TODO: Make this interruptable somehow by using _ctx_.
- _, err = io.ReadAll(probe_resp.Body)
- if err != nil {
- return err
- }
- time_after_probe := time.Now()
-
- // Depending on whether we think that Close() requires another RTT (via TCP), we
- // may need to move this before/after capturing the after time.
- probe_resp.Body.Close()
-
- sanity := time_after_probe.Sub(time_before_probe)
-
- // When the probe is run on a load-generating connection (a self probe) there should
- // only be a single round trip that is measured. We will take the accumulation of all these
- // values just to be sure, though. Because of how this traced connection was launched, most
- // of the values will be 0 (or very small where the time that go takes for delivering callbacks
- // and doing context switches pokes through). When it is !isSelfProbe then the values will
- // be significant and we want to add them regardless!
- totalDelay := probeTracer.GetTLSAndHttpHeaderDelta() + probeTracer.GetHttpDownloadDelta(
- time_after_probe,
- ) + probeTracer.GetTCPDelta()
-
- // We must have reused the connection if we are a self probe!
- if (probeType == SelfUp || probeType == SelfDown) && !probeTracer.stats.ConnectionReused {
- panic(!probeTracer.stats.ConnectionReused)
- }
-
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "(%s) (%s Probe %v) sanity vs total: %v vs %v\n",
- debugging.Prefix,
- probeType.Value(),
- probeId,
- sanity,
- totalDelay,
- )
- }
- roundTripCount := DefaultDownRoundTripCount
- if probeType == Foreign {
- roundTripCount = ForeignRoundTripCount
- }
- // Careful!!! It's possible that this channel has been closed because the Prober that
- // started it has been stopped. Writing to a closed channel will cause a panic. It might not
- // matter because a panic just stops the go thread containing the paniced code and we are in
- // a go thread that executes only this function.
- defer func() {
- isThreadPanicing := recover()
- if isThreadPanicing != nil && debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "(%s) (%s Probe %v) Probe attempted to write to the result channel after its invoker ended (official reason: %v).\n",
- debugging.Prefix,
- probeType.Value(),
- probeId,
- isThreadPanicing,
- )
- }
- }()
- tcpRtt := time.Duration(0 * time.Second)
- tcpCwnd := uint32(0)
- // TODO: Only get the extended stats for a connection if the user has requested them overall.
- if captureExtendedStats && extendedstats.ExtendedStatsAvailable() {
- tcpInfo, err := extendedstats.GetTCPInfo(probeTracer.stats.ConnInfo.Conn)
- if err == nil {
- tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond
- tcpCwnd = tcpInfo.Snd_cwnd
- } else {
- fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err)
- }
- }
- dataPoint := ProbeDataPoint{
- Time: time_before_probe,
- RoundTripCount: uint64(roundTripCount),
- Duration: totalDelay,
- TCPRtt: tcpRtt,
- TCPCwnd: tcpCwnd,
- Type: probeType,
- }
- *result <- dataPoint
- return nil
-}
-
func CombinedProber(
proberCtx context.Context,
networkActivityCtx context.Context,
- foreignProbeConfigurationGenerator func() ProbeConfiguration,
- selfProbeConfigurationGenerator func() ProbeConfiguration,
+ foreignProbeConfigurationGenerator func() probe.ProbeConfiguration,
+ selfProbeConfigurationGenerator func() probe.ProbeConfiguration,
selfDownProbeConnection lgc.LoadGeneratingConnection,
selfUpProbeConnection lgc.LoadGeneratingConnection,
probeInterval time.Duration,
keyLogger io.Writer,
captureExtendedStats bool,
debugging *debug.DebugWithPrefix,
-) (dataPoints chan ProbeDataPoint) {
+) (dataPoints chan probe.ProbeDataPoint) {
// Make a channel to send back all the generated data points
// when we are probing.
- dataPoints = make(chan ProbeDataPoint)
+ dataPoints = make(chan probe.ProbeDataPoint)
go func() {
wg := sync.WaitGroup{}
@@ -319,39 +145,39 @@ func CombinedProber(
// Start Foreign Connection Prober
probeCount++
- go Probe(
+ go probe.Probe(
networkActivityCtx,
&wg,
foreignProbeClient,
foreignProbeConfiguration.URL,
foreignProbeConfiguration.Host,
- Foreign,
+ probe.Foreign,
&dataPoints,
captureExtendedStats,
debugging,
)
// Start Self Download Connection Prober
- go Probe(
+ go probe.Probe(
networkActivityCtx,
&wg,
selfDownProbeConnection.Client(),
selfProbeConfiguration.URL,
selfProbeConfiguration.Host,
- SelfDown,
+ probe.SelfDown,
&dataPoints,
captureExtendedStats,
debugging,
)
// Start Self Upload Connection Prober
- go Probe(
+ go probe.Probe(
proberCtx,
&wg,
selfUpProbeConnection.Client(),
selfProbeConfiguration.URL,
selfProbeConfiguration.Host,
- SelfUp,
+ probe.SelfUp,
&dataPoints,
captureExtendedStats,
debugging,
@@ -380,7 +206,7 @@ func LoadGenerator(
loadGeneratorCtx context.Context, // Stop our activity when we no longer need to generate load.
rampupInterval time.Duration,
lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection.
- loadGeneratingConnections *lgc.LoadGeneratingConnectionCollection,
+ loadGeneratingConnectionsCollection *lgc.LoadGeneratingConnectionCollection,
captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections?
debugging *debug.DebugWithPrefix, // How can we forget debugging?
) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes.
@@ -400,7 +226,7 @@ func LoadGenerator(
flowsCreated += addFlows(
networkActivityCtx,
constants.StartingNumberOfLoadGeneratingConnections,
- loadGeneratingConnections,
+ loadGeneratingConnectionsCollection,
lgcGenerator,
debugging.Level,
)
@@ -408,9 +234,9 @@ func LoadGenerator(
// We have at least a single load-generating channel. This channel will be the one that
// the self probes use. Let's send it back to the caller so that they can pass it on if they need to.
go func() {
- loadGeneratingConnections.Lock.Lock()
- zerothConnection, err := loadGeneratingConnections.Get(0)
- loadGeneratingConnections.Lock.Unlock()
+ loadGeneratingConnectionsCollection.Lock.Lock()
+ zerothConnection, err := loadGeneratingConnectionsCollection.Get(0)
+ loadGeneratingConnectionsCollection.Lock.Unlock()
if err != nil {
panic("Could not get the zeroth connection!\n")
}
@@ -452,56 +278,76 @@ func LoadGenerator(
granularThroughputDatapoints := make([]GranularThroughputDataPoint, 0)
now = time.Now() // Used to align granular throughput data
allInvalid := true
- for i := range *loadGeneratingConnections.LGCs {
- if !(*loadGeneratingConnections.LGCs)[i].IsValid() {
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "%v: Load-generating connection with id %d is invalid ... skipping.\n",
+ for i := range *loadGeneratingConnectionsCollection.LGCs {
+ loadGeneratingConnectionsCollection.Lock.Lock()
+ connectionState := (*loadGeneratingConnectionsCollection.LGCs)[i].Status()
+ loadGeneratingConnectionsCollection.Lock.Unlock()
+ switch connectionState {
+ default:
+ {
+ error := fmt.Sprintf(
+ "%v: Load-generating connection with id %d is in an unrecognizable state.\n",
debugging,
- (*loadGeneratingConnections.LGCs)[i].ClientId(),
+ (*loadGeneratingConnectionsCollection.LGCs)[i].ClientId())
+ fmt.Fprintf(os.Stderr, "%s", error)
+ panic(error)
+ }
+ case lgc.LGC_STATUS_ERROR,
+ lgc.LGC_STATUS_DONE:
+ {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: Load-generating connection with id %d is invalid or complete ... skipping.\n",
+ debugging,
+ (*loadGeneratingConnectionsCollection.LGCs)[i].ClientId(),
+ )
+ }
+ // TODO: Do we add null connection to throughput? and how do we define it? Throughput -1 or 0?
+ granularThroughputDatapoints = append(
+ granularThroughputDatapoints,
+ GranularThroughputDataPoint{now, 0, uint32(i), 0, 0, ""},
)
+ continue
}
- // TODO: Do we add null connection to throughput? and how do we define it? Throughput -1 or 0?
- granularThroughputDatapoints = append(
- granularThroughputDatapoints,
- GranularThroughputDataPoint{now, 0, uint32(i), 0, 0, ""},
- )
- continue
- }
- allInvalid = false
- currentTransferred, currentInterval := (*loadGeneratingConnections.LGCs)[i].TransferredInInterval()
- // normalize to a second-long interval!
- instantaneousConnectionThroughput := float64(
- currentTransferred,
- ) / float64(
- currentInterval.Seconds(),
- )
- instantaneousTotalThroughput += instantaneousConnectionThroughput
+ case lgc.LGC_STATUS_RUNNING:
+ {
+ allInvalid = false
+ currentTransferred, currentInterval := (*loadGeneratingConnectionsCollection.LGCs)[i].TransferredInInterval()
+ // normalize to a second-long interval!
+ instantaneousConnectionThroughput := float64(
+ currentTransferred,
+ ) / float64(
+ currentInterval.Seconds(),
+ )
+ instantaneousTotalThroughput += instantaneousConnectionThroughput
- tcpRtt := time.Duration(0 * time.Second)
- tcpCwnd := uint32(0)
- if captureExtendedStats && extendedstats.ExtendedStatsAvailable() {
- if stats := (*loadGeneratingConnections.LGCs)[i].Stats(); stats != nil {
- tcpInfo, err := extendedstats.GetTCPInfo(stats.ConnInfo.Conn)
- if err == nil {
- tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond
- tcpCwnd = tcpInfo.Snd_cwnd
- } else {
- fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err)
+ tcpRtt := time.Duration(0 * time.Second)
+ tcpCwnd := uint32(0)
+ if captureExtendedStats && extendedstats.ExtendedStatsAvailable() {
+ if stats := (*loadGeneratingConnectionsCollection.LGCs)[i].Stats(); stats != nil {
+ tcpInfo, err := extendedstats.GetTCPInfo(stats.ConnInfo.Conn)
+ if err == nil {
+ tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond
+ tcpCwnd = tcpInfo.Snd_cwnd
+ } else {
+ fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err)
+ }
+ }
}
+ granularThroughputDatapoints = append(
+ granularThroughputDatapoints,
+ GranularThroughputDataPoint{
+ now,
+ instantaneousConnectionThroughput,
+ uint32(i),
+ tcpRtt,
+ tcpCwnd,
+ "",
+ },
+ )
}
+
}
- granularThroughputDatapoints = append(
- granularThroughputDatapoints,
- GranularThroughputDataPoint{
- now,
- instantaneousConnectionThroughput,
- uint32(i),
- tcpRtt,
- tcpCwnd,
- "",
- },
- )
}
// For some reason, all the lgcs are invalid. This likely means that
@@ -520,7 +366,7 @@ func LoadGenerator(
throughputDataPoint := ThroughputDataPoint{
time.Now(),
instantaneousTotalThroughput,
- len(*loadGeneratingConnections.LGCs),
+ len(*loadGeneratingConnectionsCollection.LGCs),
granularThroughputDatapoints,
}
throughputCalculations <- throughputDataPoint
@@ -529,7 +375,7 @@ func LoadGenerator(
flowsCreated += addFlows(
networkActivityCtx,
constants.AdditiveNumberOfLoadGeneratingConnections,
- loadGeneratingConnections,
+ loadGeneratingConnectionsCollection,
lgcGenerator,
debugging.Level,
)
@@ -543,281 +389,3 @@ func LoadGenerator(
}()
return
}
-
-type ProbeTracer struct {
- client *http.Client
- stats *stats.TraceStats
- trace *httptrace.ClientTrace
- debug debug.DebugLevel
- probeid uint64
- probeType ProbeType
-}
-
-func (p *ProbeTracer) String() string {
- return fmt.Sprintf("(Probe %v): stats: %v\n", p.probeid, p.stats)
-}
-
-func (p *ProbeTracer) ProbeId() uint64 {
- return p.probeid
-}
-
-func (p *ProbeTracer) GetTrace() *httptrace.ClientTrace {
- return p.trace
-}
-
-func (p *ProbeTracer) GetDnsDelta() time.Duration {
- if p.stats.ConnectionReused {
- return time.Duration(0)
- }
- delta := p.stats.DnsDoneTime.Sub(p.stats.DnsStartTime)
- if debug.IsDebug(p.debug) {
- fmt.Printf("(Probe %v): DNS Time: %v\n", p.probeid, delta)
- }
- return delta
-}
-
-func (p *ProbeTracer) GetTCPDelta() time.Duration {
- if p.stats.ConnectionReused {
- return time.Duration(0)
- }
- delta := p.stats.ConnectDoneTime.Sub(p.stats.ConnectStartTime)
- if debug.IsDebug(p.debug) {
- fmt.Printf("(Probe %v): TCP Connection Time: %v\n", p.probeid, delta)
- }
- return delta
-}
-
-func (p *ProbeTracer) GetTLSDelta() time.Duration {
- if utilities.IsSome(p.stats.TLSDoneTime) {
- panic("There should not be TLS information, but there is.")
- }
- delta := time.Duration(0)
- if debug.IsDebug(p.debug) {
- fmt.Printf("(Probe %v): TLS Time: %v\n", p.probeid, delta)
- }
- return delta
-}
-
-func (p *ProbeTracer) GetTLSAndHttpHeaderDelta() time.Duration {
- // Because the TLS handshake occurs *after* the TCP connection (ConnectDoneTime)
- // and *before* the HTTP transaction, we know that the delta between the time
- // that the first HTTP response byte is available and the time that the TCP
- // connection was established includes both the time for the HTTP header RTT
- // *and* the TLS handshake RTT, whether we can specifically measure the latter
- // or not. Eventually when TLS handshake tracing is fixed, we can break these
- // into separate buckets, but for now this workaround is reasonable.
- before := p.stats.ConnectDoneTime
- if p.stats.ConnectionReused {
- // When we reuse a connection there will be no time logged for when the
- // TCP connection was established (obviously). So, fall back to the time
- // when we were notified about reusing a connection (as a close approximation!).
- before = p.stats.GetConnectionDoneTime
- }
- delta := p.stats.HttpResponseReadyTime.Sub(before)
- if debug.IsDebug(p.debug) {
- fmt.Printf("(Probe %v): Http TLS and Header Time: %v\n", p.probeid, delta)
- }
- return delta
-}
-
-func (p *ProbeTracer) GetHttpHeaderDelta() time.Duration {
- panic(
- "Unusable until TLS tracing support is enabled! Use GetTLSAndHttpHeaderDelta() instead.\n",
- )
- delta := p.stats.HttpResponseReadyTime.Sub(utilities.GetSome(p.stats.TLSDoneTime))
- if debug.IsDebug(p.debug) {
- fmt.Printf("(Probe %v): Http Header Time: %v\n", p.probeid, delta)
- }
- return delta
-}
-
-func (p *ProbeTracer) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration {
- delta := httpDoneTime.Sub(p.stats.HttpResponseReadyTime)
- if debug.IsDebug(p.debug) {
- fmt.Printf("(Probe %v): Http Download Time: %v\n", p.probeid, delta)
- }
- return delta
-}
-
-func NewProbeTracer(
- client *http.Client,
- probeType ProbeType,
- probeId uint64,
- debugging *debug.DebugWithPrefix,
-) *ProbeTracer {
- probe := &ProbeTracer{
- client: client,
- stats: &stats.TraceStats{},
- trace: nil,
- debug: debugging.Level,
- probeid: probeId,
- probeType: probeType,
- }
- trace := traceable.GenerateHttpTimingTracer(probe, debugging.Level)
-
- probe.trace = trace
- return probe
-}
-
-func (probe *ProbeTracer) SetDnsStartTimeInfo(
- now time.Time,
- dnsStartInfo httptrace.DNSStartInfo,
-) {
- probe.stats.DnsStartTime = now
- probe.stats.DnsStart = dnsStartInfo
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) DNS Start for Probe %v: %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- dnsStartInfo,
- )
- }
-}
-
-func (probe *ProbeTracer) SetDnsDoneTimeInfo(
- now time.Time,
- dnsDoneInfo httptrace.DNSDoneInfo,
-) {
- probe.stats.DnsDoneTime = now
- probe.stats.DnsDone = dnsDoneInfo
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) DNS Done for Probe %v: %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.DnsDone,
- )
- }
-}
-
-func (probe *ProbeTracer) SetConnectStartTime(
- now time.Time,
-) {
- probe.stats.ConnectStartTime = now
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) TCP Start for Probe %v at %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.ConnectStartTime,
- )
- }
-}
-
-func (probe *ProbeTracer) SetConnectDoneTimeError(
- now time.Time,
- err error,
-) {
- probe.stats.ConnectDoneTime = now
- probe.stats.ConnectDoneError = err
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) TCP Done for Probe %v (with error %v) @ %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.ConnectDoneError,
- probe.stats.ConnectDoneTime,
- )
- }
-}
-
-func (probe *ProbeTracer) SetGetConnTime(now time.Time) {
- probe.stats.GetConnectionStartTime = now
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) Started getting connection for Probe %v @ %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.GetConnectionStartTime,
- )
- }
-}
-
-func (probe *ProbeTracer) SetGotConnTimeInfo(
- now time.Time,
- gotConnInfo httptrace.GotConnInfo,
-) {
- probe.stats.GetConnectionDoneTime = now
- probe.stats.ConnInfo = gotConnInfo
- probe.stats.ConnectionReused = gotConnInfo.Reused
- if (probe.probeType == SelfUp || probe.probeType == SelfDown) && !gotConnInfo.Reused {
- fmt.Fprintf(
- os.Stderr,
- "A self probe sent using a new connection!\n",
- )
- }
- if gotConnInfo.Reused {
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) Got a reused connection for Probe %v at %v with info %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.GetConnectionDoneTime,
- probe.stats.ConnInfo,
- )
- }
- }
-}
-
-func (probe *ProbeTracer) SetTLSHandshakeStartTime(
- now time.Time,
-) {
- probe.stats.TLSStartTime = utilities.Some(now)
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) Started TLS Handshake for Probe %v @ %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.TLSStartTime,
- )
- }
-}
-
-func (probe *ProbeTracer) SetTLSHandshakeDoneTimeState(
- now time.Time,
- connectionState tls.ConnectionState,
-) {
- probe.stats.TLSDoneTime = utilities.Some(now)
- probe.stats.TLSConnInfo = connectionState
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) Completed TLS handshake for Probe %v at %v with info %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.TLSDoneTime,
- probe.stats.TLSConnInfo,
- )
- }
-}
-
-func (probe *ProbeTracer) SetHttpWroteRequestTimeInfo(
- now time.Time,
- info httptrace.WroteRequestInfo,
-) {
- probe.stats.HttpWroteRequestTime = now
- probe.stats.HttpInfo = info
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) Http finished writing request for Probe %v at %v with info %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.HttpWroteRequestTime,
- probe.stats.HttpInfo,
- )
- }
-}
-
-func (probe *ProbeTracer) SetHttpResponseReadyTime(
- now time.Time,
-) {
- probe.stats.HttpResponseReadyTime = now
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) Http response is ready for Probe %v at %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.HttpResponseReadyTime,
- )
- }
-}
diff --git a/stabilizer/rev3.go b/stabilizer/rev3.go
index 4ab0bd9..5d1aeec 100644
--- a/stabilizer/rev3.go
+++ b/stabilizer/rev3.go
@@ -6,6 +6,7 @@ import (
"github.com/network-quality/goresponsiveness/debug"
"github.com/network-quality/goresponsiveness/ms"
+ "github.com/network-quality/goresponsiveness/probe"
"github.com/network-quality/goresponsiveness/rpm"
"github.com/network-quality/goresponsiveness/utilities"
)
@@ -55,7 +56,7 @@ func NewProbeStabilizer(
dbgLevel: debugLevel}
}
-func (r3 *ProbeStabilizer) AddMeasurement(measurement rpm.ProbeDataPoint) {
+func (r3 *ProbeStabilizer) AddMeasurement(measurement probe.ProbeDataPoint) {
r3.m.Lock()
defer r3.m.Unlock()