summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lgc/collection.go54
-rw-r--r--lgc/download.go373
-rw-r--r--lgc/lgc.go533
-rw-r--r--lgc/upload.go206
-rw-r--r--networkQuality.go44
-rw-r--r--rpm/rpm.go596
-rw-r--r--stabilizer/rev3.go3
7 files changed, 748 insertions, 1061 deletions
diff --git a/lgc/collection.go b/lgc/collection.go
new file mode 100644
index 0000000..7560186
--- /dev/null
+++ b/lgc/collection.go
@@ -0,0 +1,54 @@
+/*
+ * This file is part of Go Responsiveness.
+ *
+ * Go Responsiveness is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software Foundation,
+ * either version 2 of the License, or (at your option) any later version.
+ * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package lgc
+
+import (
+ "fmt"
+ "sync"
+)
+
+type LoadGeneratingConnectionCollection struct {
+ Lock sync.Mutex
+ LGCs *[]LoadGeneratingConnection
+}
+
+func NewLoadGeneratingConnectionCollection() LoadGeneratingConnectionCollection {
+ return LoadGeneratingConnectionCollection{LGCs: new([]LoadGeneratingConnection)}
+}
+
+func (collection *LoadGeneratingConnectionCollection) Get(idx int) (*LoadGeneratingConnection, error) {
+ if collection.Lock.TryLock() {
+ collection.Lock.Unlock()
+ return nil, fmt.Errorf("collection is unlocked")
+ }
+
+ if idx > len(*collection.LGCs) {
+ return nil, fmt.Errorf("index too large")
+ }
+ return &(*collection.LGCs)[idx], nil
+}
+
+func (collection *LoadGeneratingConnectionCollection) Append(conn LoadGeneratingConnection) error {
+ if collection.Lock.TryLock() {
+ collection.Lock.Unlock()
+ return fmt.Errorf("collection is unlocked")
+ }
+ *collection.LGCs = append(*collection.LGCs, conn)
+ return nil
+}
+
+func (collection *LoadGeneratingConnectionCollection) Len() int {
+ return len(*collection.LGCs)
+}
diff --git a/lgc/download.go b/lgc/download.go
new file mode 100644
index 0000000..71ed647
--- /dev/null
+++ b/lgc/download.go
@@ -0,0 +1,373 @@
+/*
+ * This file is part of Go Responsiveness.
+ *
+ * Go Responsiveness is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software Foundation,
+ * either version 2 of the License, or (at your option) any later version.
+ * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package lgc
+
+import (
+ "context"
+ "crypto/tls"
+ "fmt"
+ "io"
+ "net/http"
+ "net/http/httptrace"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/network-quality/goresponsiveness/debug"
+ "github.com/network-quality/goresponsiveness/stats"
+ "github.com/network-quality/goresponsiveness/traceable"
+ "github.com/network-quality/goresponsiveness/utilities"
+)
+
+// TODO: All 64-bit fields that are accessed atomically must
+// appear at the top of this struct.
+type LoadGeneratingConnectionDownload struct {
+ downloaded uint64
+ lastIntervalEnd int64
+ ConnectToAddr string
+ URL string
+ downloadStartTime time.Time
+ lastDownloaded uint64
+ client *http.Client
+ debug debug.DebugLevel
+ InsecureSkipVerify bool
+ KeyLogger io.Writer
+ clientId uint64
+ tracer *httptrace.ClientTrace
+ stats stats.TraceStats
+ status LgcStatus
+ statusLock *sync.Mutex
+ statusWaiter *sync.Cond
+}
+
+func NewLoadGeneratingConnectionDownload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionDownload {
+ lgd := LoadGeneratingConnectionDownload{
+ URL: url,
+ KeyLogger: keyLogger,
+ ConnectToAddr: connectToAddr,
+ InsecureSkipVerify: insecureSkipVerify,
+ statusLock: &sync.Mutex{},
+ }
+ lgd.statusWaiter = sync.NewCond(lgd.statusLock)
+ return lgd
+}
+
+func (lgd *LoadGeneratingConnectionDownload) WaitUntilStarted(ctxt context.Context) bool {
+ conditional := func() bool { return lgd.status != LGC_STATUS_NOT_STARTED }
+ go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgd.statusWaiter)
+ return utilities.WaitWithContext(ctxt, &conditional, lgd.statusLock, lgd.statusWaiter)
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetDnsStartTimeInfo(
+ now time.Time,
+ dnsStartInfo httptrace.DNSStartInfo,
+) {
+ lgd.stats.DnsStartTime = now
+ lgd.stats.DnsStart = dnsStartInfo
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "DNS Start for %v: %v\n",
+ lgd.ClientId(),
+ dnsStartInfo,
+ )
+ }
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetDnsDoneTimeInfo(
+ now time.Time,
+ dnsDoneInfo httptrace.DNSDoneInfo,
+) {
+ lgd.stats.DnsDoneTime = now
+ lgd.stats.DnsDone = dnsDoneInfo
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "DNS Done for %v: %v\n",
+ lgd.ClientId(),
+ lgd.stats.DnsDone,
+ )
+ }
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetConnectStartTime(
+ now time.Time,
+) {
+ lgd.stats.ConnectStartTime = now
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "TCP Start for %v at %v\n",
+ lgd.ClientId(),
+ lgd.stats.ConnectStartTime,
+ )
+ }
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetConnectDoneTimeError(
+ now time.Time,
+ err error,
+) {
+ lgd.stats.ConnectDoneTime = now
+ lgd.stats.ConnectDoneError = err
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "TCP Done for %v (with error %v) @ %v\n",
+ lgd.ClientId(),
+ lgd.stats.ConnectDoneError,
+ lgd.stats.ConnectDoneTime,
+ )
+ }
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetGetConnTime(now time.Time) {
+ lgd.stats.GetConnectionStartTime = now
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "Started getting connection for %v @ %v\n",
+ lgd.ClientId(),
+ lgd.stats.GetConnectionStartTime,
+ )
+ }
+ lgd.statusLock.Lock()
+ lgd.status = LGC_STATUS_RUNNING
+ lgd.statusWaiter.Broadcast()
+ lgd.statusLock.Unlock()
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetGotConnTimeInfo(
+ now time.Time,
+ gotConnInfo httptrace.GotConnInfo,
+) {
+ if gotConnInfo.Reused {
+ fmt.Printf("Unexpectedly reusing a connection!\n")
+ panic(!gotConnInfo.Reused)
+ }
+ lgd.stats.GetConnectionDoneTime = now
+ lgd.stats.ConnInfo = gotConnInfo
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "Got connection for %v at %v with info %v\n",
+ lgd.ClientId(),
+ lgd.stats.GetConnectionDoneTime,
+ lgd.stats.ConnInfo,
+ )
+ }
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetTLSHandshakeStartTime(
+ now time.Time,
+) {
+ lgd.stats.TLSStartTime = utilities.Some(now)
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "Started TLS Handshake for %v @ %v\n",
+ lgd.ClientId(),
+ lgd.stats.TLSStartTime,
+ )
+ }
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetTLSHandshakeDoneTimeState(
+ now time.Time,
+ connectionState tls.ConnectionState,
+) {
+ lgd.stats.TLSDoneTime = utilities.Some(now)
+ lgd.stats.TLSConnInfo = connectionState
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "Completed TLS handshake for %v at %v with info %v\n",
+ lgd.ClientId(),
+ lgd.stats.TLSDoneTime,
+ lgd.stats.TLSConnInfo,
+ )
+ }
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetHttpWroteRequestTimeInfo(
+ now time.Time,
+ info httptrace.WroteRequestInfo,
+) {
+ lgd.stats.HttpWroteRequestTime = now
+ lgd.stats.HttpInfo = info
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "(lgd) Http finished writing request for %v at %v with info %v\n",
+ lgd.ClientId(),
+ lgd.stats.HttpWroteRequestTime,
+ lgd.stats.HttpInfo,
+ )
+ }
+}
+
+func (lgd *LoadGeneratingConnectionDownload) SetHttpResponseReadyTime(
+ now time.Time,
+) {
+ lgd.stats.HttpResponseReadyTime = now
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "Got the first byte of HTTP response headers for %v at %v\n",
+ lgd.ClientId(),
+ lgd.stats.HttpResponseReadyTime,
+ )
+ }
+}
+
+func (lgd *LoadGeneratingConnectionDownload) ClientId() uint64 {
+ return lgd.clientId
+}
+
+func (lgd *LoadGeneratingConnectionDownload) TransferredInInterval() (uint64, time.Duration) {
+ transferred := atomic.SwapUint64(&lgd.downloaded, 0)
+ newIntervalEnd := (time.Now().Sub(lgd.downloadStartTime)).Nanoseconds()
+ previousIntervalEnd := atomic.SwapInt64(&lgd.lastIntervalEnd, newIntervalEnd)
+ intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd)
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf("download: Transferred: %v bytes in %v.\n", transferred, intervalLength)
+ }
+ return transferred, intervalLength
+}
+
+func (lgd *LoadGeneratingConnectionDownload) Client() *http.Client {
+ return lgd.client
+}
+
+type countingReader struct {
+ n *uint64
+ ctx context.Context
+ readable io.Reader
+}
+
+func (cr *countingReader) Read(p []byte) (n int, err error) {
+ if cr.ctx.Err() != nil {
+ return 0, io.EOF
+ }
+
+ n, err = cr.readable.Read(p)
+ atomic.AddUint64(cr.n, uint64(n))
+ return
+}
+
+func (lgd *LoadGeneratingConnectionDownload) Start(
+ parentCtx context.Context,
+ debugLevel debug.DebugLevel,
+) bool {
+ lgd.downloaded = 0
+ lgd.debug = debugLevel
+ lgd.clientId = utilities.GenerateUniqueId()
+
+ transport := &http.Transport{
+ Proxy: http.ProxyFromEnvironment,
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: lgd.InsecureSkipVerify,
+ },
+ }
+
+ if !utilities.IsInterfaceNil(lgd.KeyLogger) {
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "Using an SSL Key Logger for this load-generating download.\n",
+ )
+ }
+
+ // The presence of a custom TLSClientConfig in a *generic* `transport`
+ // means that go will default to HTTP/1.1 and cowardly avoid HTTP/2:
+ // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278
+ // Also, it would appear that the API's choice of HTTP vs HTTP2 can
+ // depend on whether the url contains
+ // https:// or http://:
+ // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L74
+ transport.TLSClientConfig.KeyLogWriter = lgd.KeyLogger
+ }
+ transport.TLSClientConfig.InsecureSkipVerify = lgd.InsecureSkipVerify
+
+ utilities.OverrideHostTransport(transport, lgd.ConnectToAddr)
+
+ lgd.client = &http.Client{Transport: transport}
+ lgd.tracer = traceable.GenerateHttpTimingTracer(lgd, lgd.debug)
+
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf(
+ "Started a load-generating download (id: %v).\n",
+ lgd.clientId,
+ )
+ }
+
+ go lgd.doDownload(parentCtx)
+ return true
+}
+
+func (lgd *LoadGeneratingConnectionDownload) Status() LgcStatus {
+ return lgd.status
+}
+
+func (lgd *LoadGeneratingConnectionDownload) Stats() *stats.TraceStats {
+ return &lgd.stats
+}
+
+func (lgd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) error {
+ var request *http.Request = nil
+ var get *http.Response = nil
+ var err error = nil
+
+ if request, err = http.NewRequestWithContext(
+ httptrace.WithClientTrace(ctx, lgd.tracer),
+ "GET",
+ lgd.URL,
+ nil,
+ ); err != nil {
+ lgd.statusLock.Lock()
+ lgd.status = LGC_STATUS_ERROR
+ lgd.statusWaiter.Broadcast()
+ lgd.statusLock.Unlock()
+ return err
+ }
+
+ // Used to disable compression
+ request.Header.Set("Accept-Encoding", "identity")
+ request.Header.Set("User-Agent", utilities.UserAgent())
+
+ lgd.downloadStartTime = time.Now()
+ lgd.lastIntervalEnd = 0
+
+ if get, err = lgd.client.Do(request); err != nil {
+ lgd.statusLock.Lock()
+ lgd.status = LGC_STATUS_ERROR
+ lgd.statusWaiter.Broadcast()
+ lgd.statusLock.Unlock()
+ return err
+ }
+
+ // Header.Get returns "" when not set
+ if get.Header.Get("Content-Encoding") != "" {
+ lgd.statusLock.Lock()
+ lgd.status = LGC_STATUS_ERROR
+ lgd.statusWaiter.Broadcast()
+ lgd.statusLock.Unlock()
+ fmt.Printf("Content-Encoding header was set (compression not allowed)")
+ return fmt.Errorf("Content-Encoding header was set (compression not allowed)")
+ }
+ cr := &countingReader{n: &lgd.downloaded, ctx: ctx, readable: get.Body}
+ _, _ = io.Copy(io.Discard, cr)
+
+ lgd.statusLock.Lock()
+ lgd.status = LGC_STATUS_DONE
+ lgd.statusWaiter.Broadcast()
+ lgd.statusLock.Unlock()
+
+ get.Body.Close()
+ if debug.IsDebug(lgd.debug) {
+ fmt.Printf("Ending a load-generating download.\n")
+ }
+
+ return nil
+}
diff --git a/lgc/lgc.go b/lgc/lgc.go
index 16f67d2..b16eb76 100644
--- a/lgc/lgc.go
+++ b/lgc/lgc.go
@@ -16,545 +16,28 @@ package lgc
import (
"context"
- "crypto/tls"
- "fmt"
- "io"
"net/http"
- "net/http/httptrace"
- "sync"
- "sync/atomic"
"time"
"github.com/network-quality/goresponsiveness/debug"
"github.com/network-quality/goresponsiveness/stats"
- "github.com/network-quality/goresponsiveness/traceable"
- "github.com/network-quality/goresponsiveness/utilities"
)
type LoadGeneratingConnection interface {
Start(context.Context, debug.DebugLevel) bool
TransferredInInterval() (uint64, time.Duration)
Client() *http.Client
- IsValid() bool
+ Status() LgcStatus
ClientId() uint64
Stats() *stats.TraceStats
WaitUntilStarted(context.Context) bool
}
-type LoadGeneratingConnectionCollection struct {
- Lock sync.Mutex
- LGCs *[]LoadGeneratingConnection
-}
-
-func NewLoadGeneratingConnectionCollection() LoadGeneratingConnectionCollection {
- return LoadGeneratingConnectionCollection{LGCs: new([]LoadGeneratingConnection)}
-}
-
-func (collection *LoadGeneratingConnectionCollection) Get(idx int) (*LoadGeneratingConnection, error) {
- if collection.Lock.TryLock() {
- collection.Lock.Unlock()
- return nil, fmt.Errorf("collection is unlocked")
- }
-
- if idx > len(*collection.LGCs) {
- return nil, fmt.Errorf("index too large")
- }
- return &(*collection.LGCs)[idx], nil
-}
-
-func (collection *LoadGeneratingConnectionCollection) Append(conn LoadGeneratingConnection) error {
- if collection.Lock.TryLock() {
- collection.Lock.Unlock()
- return fmt.Errorf("collection is unlocked")
- }
- *collection.LGCs = append(*collection.LGCs, conn)
- return nil
-}
-
-// TODO: All 64-bit fields that are accessed atomically must
-// appear at the top of this struct.
-type LoadGeneratingConnectionDownload struct {
- downloaded uint64
- lastIntervalEnd int64
- ConnectToAddr string
- URL string
- downloadStartTime time.Time
- lastDownloaded uint64
- client *http.Client
- debug debug.DebugLevel
- valid bool
- validLock *sync.Mutex
- InsecureSkipVerify bool
- KeyLogger io.Writer
- clientId uint64
- tracer *httptrace.ClientTrace
- stats stats.TraceStats
- validWaiter *sync.Cond
-}
-
-func NewLoadGeneratingConnectionDownload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionDownload {
- lgd := LoadGeneratingConnectionDownload{
- URL: url,
- KeyLogger: keyLogger,
- ConnectToAddr: connectToAddr,
- InsecureSkipVerify: insecureSkipVerify,
- validLock: &sync.Mutex{},
- }
- lgd.validWaiter = sync.NewCond(lgd.validLock)
- return lgd
-}
-
-func (lgd *LoadGeneratingConnectionDownload) WaitUntilStarted(ctxt context.Context) bool {
- conditional := func() bool { return lgd.valid }
- go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgd.validWaiter)
- return utilities.WaitWithContext(ctxt, &conditional, lgd.validLock, lgd.validWaiter)
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetDnsStartTimeInfo(
- now time.Time,
- dnsStartInfo httptrace.DNSStartInfo,
-) {
- lgd.stats.DnsStartTime = now
- lgd.stats.DnsStart = dnsStartInfo
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "DNS Start for %v: %v\n",
- lgd.ClientId(),
- dnsStartInfo,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetDnsDoneTimeInfo(
- now time.Time,
- dnsDoneInfo httptrace.DNSDoneInfo,
-) {
- lgd.stats.DnsDoneTime = now
- lgd.stats.DnsDone = dnsDoneInfo
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "DNS Done for %v: %v\n",
- lgd.ClientId(),
- lgd.stats.DnsDone,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetConnectStartTime(
- now time.Time,
-) {
- lgd.stats.ConnectStartTime = now
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "TCP Start for %v at %v\n",
- lgd.ClientId(),
- lgd.stats.ConnectStartTime,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetConnectDoneTimeError(
- now time.Time,
- err error,
-) {
- lgd.stats.ConnectDoneTime = now
- lgd.stats.ConnectDoneError = err
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "TCP Done for %v (with error %v) @ %v\n",
- lgd.ClientId(),
- lgd.stats.ConnectDoneError,
- lgd.stats.ConnectDoneTime,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetGetConnTime(now time.Time) {
- lgd.stats.GetConnectionStartTime = now
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "Started getting connection for %v @ %v\n",
- lgd.ClientId(),
- lgd.stats.GetConnectionStartTime,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetGotConnTimeInfo(
- now time.Time,
- gotConnInfo httptrace.GotConnInfo,
-) {
- if gotConnInfo.Reused {
- fmt.Printf("Unexpectedly reusing a connection!\n")
- panic(!gotConnInfo.Reused)
- }
- lgd.stats.GetConnectionDoneTime = now
- lgd.stats.ConnInfo = gotConnInfo
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "Got connection for %v at %v with info %v\n",
- lgd.ClientId(),
- lgd.stats.GetConnectionDoneTime,
- lgd.stats.ConnInfo,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetTLSHandshakeStartTime(
- now time.Time,
-) {
- lgd.stats.TLSStartTime = utilities.Some(now)
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "Started TLS Handshake for %v @ %v\n",
- lgd.ClientId(),
- lgd.stats.TLSStartTime,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetTLSHandshakeDoneTimeState(
- now time.Time,
- connectionState tls.ConnectionState,
-) {
- lgd.stats.TLSDoneTime = utilities.Some(now)
- lgd.stats.TLSConnInfo = connectionState
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "Completed TLS handshake for %v at %v with info %v\n",
- lgd.ClientId(),
- lgd.stats.TLSDoneTime,
- lgd.stats.TLSConnInfo,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetHttpWroteRequestTimeInfo(
- now time.Time,
- info httptrace.WroteRequestInfo,
-) {
- lgd.stats.HttpWroteRequestTime = now
- lgd.stats.HttpInfo = info
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "(lgd) Http finished writing request for %v at %v with info %v\n",
- lgd.ClientId(),
- lgd.stats.HttpWroteRequestTime,
- lgd.stats.HttpInfo,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) SetHttpResponseReadyTime(
- now time.Time,
-) {
- lgd.stats.HttpResponseReadyTime = now
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "Got the first byte of HTTP response headers for %v at %v\n",
- lgd.ClientId(),
- lgd.stats.HttpResponseReadyTime,
- )
- }
-}
-
-func (lgd *LoadGeneratingConnectionDownload) ClientId() uint64 {
- return lgd.clientId
-}
-
-func (lgd *LoadGeneratingConnectionDownload) TransferredInInterval() (uint64, time.Duration) {
- transferred := atomic.SwapUint64(&lgd.downloaded, 0)
- newIntervalEnd := (time.Now().Sub(lgd.downloadStartTime)).Nanoseconds()
- previousIntervalEnd := atomic.SwapInt64(&lgd.lastIntervalEnd, newIntervalEnd)
- intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd)
- if debug.IsDebug(lgd.debug) {
- fmt.Printf("download: Transferred: %v bytes in %v.\n", transferred, intervalLength)
- }
- return transferred, intervalLength
-}
-
-func (lgd *LoadGeneratingConnectionDownload) Client() *http.Client {
- return lgd.client
-}
-
-type countingReader struct {
- n *uint64
- ctx context.Context
- readable io.Reader
-}
-
-func (cr *countingReader) Read(p []byte) (n int, err error) {
- if cr.ctx.Err() != nil {
- return 0, io.EOF
- }
- n, err = cr.readable.Read(p)
- atomic.AddUint64(cr.n, uint64(n))
- return
-}
-
-func (lgd *LoadGeneratingConnectionDownload) Start(
- parentCtx context.Context,
- debugLevel debug.DebugLevel,
-) bool {
- lgd.downloaded = 0
- lgd.debug = debugLevel
- lgd.clientId = utilities.GenerateUniqueId()
-
- transport := &http.Transport{
- Proxy: http.ProxyFromEnvironment,
- TLSClientConfig: &tls.Config{
- InsecureSkipVerify: lgd.InsecureSkipVerify,
- },
- }
-
- if !utilities.IsInterfaceNil(lgd.KeyLogger) {
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "Using an SSL Key Logger for this load-generating download.\n",
- )
- }
-
- // The presence of a custom TLSClientConfig in a *generic* `transport`
- // means that go will default to HTTP/1.1 and cowardly avoid HTTP/2:
- // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278
- // Also, it would appear that the API's choice of HTTP vs HTTP2 can
- // depend on whether the url contains
- // https:// or http://:
- // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L74
- transport.TLSClientConfig.KeyLogWriter = lgd.KeyLogger
- }
- transport.TLSClientConfig.InsecureSkipVerify = lgd.InsecureSkipVerify
-
- utilities.OverrideHostTransport(transport, lgd.ConnectToAddr)
-
- lgd.client = &http.Client{Transport: transport}
- lgd.validLock.Lock()
- lgd.valid = true
- lgd.validLock.Unlock()
- lgd.tracer = traceable.GenerateHttpTimingTracer(lgd, lgd.debug)
-
- if debug.IsDebug(lgd.debug) {
- fmt.Printf(
- "Started a load-generating download (id: %v).\n",
- lgd.clientId,
- )
- }
+type LgcStatus int
- go lgd.doDownload(parentCtx)
- return true
-}
-
-func (lgd *LoadGeneratingConnectionDownload) IsValid() bool {
- return lgd.valid
-}
-
-func (lgd *LoadGeneratingConnectionDownload) Stats() *stats.TraceStats {
- return &lgd.stats
-}
-
-func (lgd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) {
- var request *http.Request = nil
- var get *http.Response = nil
- var err error = nil
-
- if request, err = http.NewRequestWithContext(
- httptrace.WithClientTrace(ctx, lgd.tracer),
- "GET",
- lgd.URL,
- nil,
- ); err != nil {
- lgd.validLock.Lock()
- lgd.valid = false
- lgd.validLock.Unlock()
- return
- }
-
- // Used to disable compression
- request.Header.Set("Accept-Encoding", "identity")
- request.Header.Set("User-Agent", utilities.UserAgent())
-
- lgd.downloadStartTime = time.Now()
- lgd.lastIntervalEnd = 0
-
- if get, err = lgd.client.Do(request); err != nil {
- lgd.validLock.Lock()
- lgd.valid = false
- lgd.validLock.Unlock()
- return
- }
-
- // Header.Get returns "" when not set
- if get.Header.Get("Content-Encoding") != "" {
- lgd.validLock.Lock()
- lgd.valid = false
- lgd.validLock.Unlock()
- fmt.Printf("Content-Encoding header was set (compression not allowed)")
- return
- }
- cr := &countingReader{n: &lgd.downloaded, ctx: ctx, readable: get.Body}
- _, _ = io.Copy(io.Discard, cr)
- get.Body.Close()
- if debug.IsDebug(lgd.debug) {
- fmt.Printf("Ending a load-generating download.\n")
- }
-}
-
-// TODO: All 64-bit fields that are accessed atomically must
-// appear at the top of this struct.
-type LoadGeneratingConnectionUpload struct {
- uploaded uint64
- lastIntervalEnd int64
- URL string
- ConnectToAddr string
- uploadStartTime time.Time
- lastUploaded uint64
- client *http.Client
- debug debug.DebugLevel
- valid bool
- validLock *sync.Mutex
- InsecureSkipVerify bool
- KeyLogger io.Writer
- clientId uint64
- validWaiter *sync.Cond
-}
-
-func NewLoadGeneratingConnectionUpload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionUpload {
- lgu := LoadGeneratingConnectionUpload{
- URL: url,
- KeyLogger: keyLogger,
- ConnectToAddr: connectToAddr,
- InsecureSkipVerify: insecureSkipVerify,
- validLock: &sync.Mutex{},
- }
- lgu.validWaiter = sync.NewCond(lgu.validLock)
- return lgu
-}
-
-func (lgu *LoadGeneratingConnectionUpload) WaitUntilStarted(ctxt context.Context) bool {
- conditional := func() bool { return lgu.valid }
- go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgu.validWaiter)
- return utilities.WaitWithContext(ctxt, &conditional, lgu.validLock, lgu.validWaiter)
-}
-
-func (lgu *LoadGeneratingConnectionUpload) ClientId() uint64 {
- return lgu.clientId
-}
-
-func (lgu *LoadGeneratingConnectionUpload) TransferredInInterval() (uint64, time.Duration) {
- transferred := atomic.SwapUint64(&lgu.uploaded, 0)
- newIntervalEnd := (time.Now().Sub(lgu.uploadStartTime)).Nanoseconds()
- previousIntervalEnd := atomic.SwapInt64(&lgu.lastIntervalEnd, newIntervalEnd)
- intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd)
- if debug.IsDebug(lgu.debug) {
- fmt.Printf("upload: Transferred: %v bytes in %v.\n", transferred, intervalLength)
- }
- return transferred, intervalLength
-}
-
-func (lgu *LoadGeneratingConnectionUpload) Client() *http.Client {
- return lgu.client
-}
-
-func (lgu *LoadGeneratingConnectionUpload) IsValid() bool {
- return lgu.valid
-}
-
-type syntheticCountingReader struct {
- n *uint64
- ctx context.Context
-}
-
-func (s *syntheticCountingReader) Read(p []byte) (n int, err error) {
- if s.ctx.Err() != nil {
- return 0, io.EOF
- }
- err = nil
- n = len(p)
-
- atomic.AddUint64(s.n, uint64(n))
- return
-}
-
-func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) bool {
- lgu.uploaded = 0
- s := &syntheticCountingReader{n: &lgu.uploaded, ctx: ctx}
- var resp *http.Response = nil
- var request *http.Request = nil
- var err error
-
- if request, err = http.NewRequest(
- "POST",
- lgu.URL,
- s,
- ); err != nil {
- lgu.validLock.Lock()
- lgu.valid = false
- lgu.validLock.Unlock()
- return false
- }
-
- // Used to disable compression
- request.Header.Set("Accept-Encoding", "identity")
- request.Header.Set("User-Agent", utilities.UserAgent())
-
- lgu.uploadStartTime = time.Now()
- lgu.lastIntervalEnd = 0
-
- if resp, err = lgu.client.Do(request); err != nil {
- lgu.validLock.Lock()
- lgu.valid = false
- lgu.validLock.Unlock()
- return false
- }
-
- resp.Body.Close()
- if debug.IsDebug(lgu.debug) {
- fmt.Printf("Ending a load-generating upload.\n")
- }
- return true
-}
-
-func (lgu *LoadGeneratingConnectionUpload) Start(
- parentCtx context.Context,
- debugLevel debug.DebugLevel,
-) bool {
- lgu.uploaded = 0
- lgu.clientId = utilities.GenerateUniqueId()
- lgu.debug = debugLevel
-
- transport := &http.Transport{
- Proxy: http.ProxyFromEnvironment,
- TLSClientConfig: &tls.Config{
- InsecureSkipVerify: lgu.InsecureSkipVerify,
- },
- }
-
- if !utilities.IsInterfaceNil(lgu.KeyLogger) {
- if debug.IsDebug(lgu.debug) {
- fmt.Printf(
- "Using an SSL Key Logger for this load-generating upload.\n",
- )
- }
- transport.TLSClientConfig.KeyLogWriter = lgu.KeyLogger
- }
-
- utilities.OverrideHostTransport(transport, lgu.ConnectToAddr)
-
- lgu.client = &http.Client{Transport: transport}
-
- lgu.validLock.Lock()
- lgu.valid = true
- lgu.validLock.Unlock()
-
- if debug.IsDebug(lgu.debug) {
- fmt.Printf("Started a load-generating upload (id: %v).\n", lgu.clientId)
- }
-
- go lgu.doUpload(parentCtx)
- return true
-}
-
-func (lgu *LoadGeneratingConnectionUpload) Stats() *stats.TraceStats {
- // Get all your stats from the download side of the LGC.
- return nil
-}
+const (
+ LGC_STATUS_NOT_STARTED LgcStatus = iota
+ LGC_STATUS_RUNNING
+ LGC_STATUS_DONE
+ LGC_STATUS_ERROR
+)
diff --git a/lgc/upload.go b/lgc/upload.go
new file mode 100644
index 0000000..f0c772e
--- /dev/null
+++ b/lgc/upload.go
@@ -0,0 +1,206 @@
+/*
+ * This file is part of Go Responsiveness.
+ *
+ * Go Responsiveness is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software Foundation,
+ * either version 2 of the License, or (at your option) any later version.
+ * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package lgc
+
+import (
+ "context"
+ "crypto/tls"
+ "fmt"
+ "io"
+ "net/http"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/network-quality/goresponsiveness/debug"
+ "github.com/network-quality/goresponsiveness/stats"
+ "github.com/network-quality/goresponsiveness/utilities"
+)
+
+// TODO: All 64-bit fields that are accessed atomically must
+// appear at the top of this struct.
+type LoadGeneratingConnectionUpload struct {
+ uploaded uint64
+ lastIntervalEnd int64
+ URL string
+ ConnectToAddr string
+ uploadStartTime time.Time
+ lastUploaded uint64
+ client *http.Client
+ debug debug.DebugLevel
+ InsecureSkipVerify bool
+ KeyLogger io.Writer
+ clientId uint64
+ status LgcStatus
+ statusLock *sync.Mutex
+ statusWaiter *sync.Cond
+}
+
+func NewLoadGeneratingConnectionUpload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionUpload {
+ lgu := LoadGeneratingConnectionUpload{
+ URL: url,
+ KeyLogger: keyLogger,
+ ConnectToAddr: connectToAddr,
+ InsecureSkipVerify: insecureSkipVerify,
+ statusLock: &sync.Mutex{},
+ }
+ lgu.status = LGC_STATUS_NOT_STARTED
+ lgu.statusWaiter = sync.NewCond(lgu.statusLock)
+ return lgu
+}
+
+func (lgu *LoadGeneratingConnectionUpload) WaitUntilStarted(ctxt context.Context) bool {
+ conditional := func() bool { return lgu.status != LGC_STATUS_NOT_STARTED }
+ go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgu.statusWaiter)
+ return utilities.WaitWithContext(ctxt, &conditional, lgu.statusLock, lgu.statusWaiter)
+}
+
+func (lgu *LoadGeneratingConnectionUpload) ClientId() uint64 {
+ return lgu.clientId
+}
+
+func (lgu *LoadGeneratingConnectionUpload) TransferredInInterval() (uint64, time.Duration) {
+ transferred := atomic.SwapUint64(&lgu.uploaded, 0)
+ newIntervalEnd := (time.Now().Sub(lgu.uploadStartTime)).Nanoseconds()
+ previousIntervalEnd := atomic.SwapInt64(&lgu.lastIntervalEnd, newIntervalEnd)
+ intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd)
+ if debug.IsDebug(lgu.debug) {
+ fmt.Printf("upload: Transferred: %v bytes in %v.\n", transferred, intervalLength)
+ }
+ return transferred, intervalLength
+}
+
+func (lgu *LoadGeneratingConnectionUpload) Client() *http.Client {
+ return lgu.client
+}
+
+func (lgu *LoadGeneratingConnectionUpload) Status() LgcStatus {
+ return lgu.status
+}
+
+type syntheticCountingReader struct {
+ n *uint64
+ ctx context.Context
+ lgu *LoadGeneratingConnectionUpload
+}
+
+func (s *syntheticCountingReader) Read(p []byte) (n int, err error) {
+ if s.ctx.Err() != nil {
+ return 0, io.EOF
+ }
+ if n == 0 {
+ s.lgu.statusLock.Lock()
+ s.lgu.status = LGC_STATUS_RUNNING
+ s.lgu.statusWaiter.Broadcast()
+ s.lgu.statusLock.Unlock()
+ }
+ err = nil
+ n = len(p)
+
+ atomic.AddUint64(s.n, uint64(n))
+ return
+}
+
+func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) error {
+ lgu.uploaded = 0
+ s := &syntheticCountingReader{n: &lgu.uploaded, ctx: ctx, lgu: lgu}
+ var resp *http.Response = nil
+ var request *http.Request = nil
+ var err error
+
+ if request, err = http.NewRequest(
+ "POST",
+ lgu.URL,
+ s,
+ ); err != nil {
+ lgu.statusLock.Lock()
+ lgu.status = LGC_STATUS_ERROR
+ lgu.statusWaiter.Broadcast()
+ lgu.statusLock.Unlock()
+ return err
+ }
+
+ // Used to disable compression
+ request.Header.Set("Accept-Encoding", "identity")
+ request.Header.Set("User-Agent", utilities.UserAgent())
+
+ lgu.uploadStartTime = time.Now()
+ lgu.lastIntervalEnd = 0
+
+ lgu.statusLock.Lock()
+ lgu.status = LGC_STATUS_RUNNING
+ lgu.statusWaiter.Broadcast()
+ lgu.statusLock.Unlock()
+
+ if resp, err = lgu.client.Do(request); err != nil {
+ lgu.statusLock.Lock()
+ lgu.status = LGC_STATUS_ERROR
+ lgu.statusWaiter.Broadcast()
+ lgu.statusLock.Unlock()
+ return err
+ }
+
+ lgu.statusLock.Lock()
+ lgu.status = LGC_STATUS_DONE
+ lgu.statusWaiter.Broadcast()
+ lgu.statusLock.Unlock()
+
+ resp.Body.Close()
+ if debug.IsDebug(lgu.debug) {
+ fmt.Printf("Ending a load-generating upload.\n")
+ }
+ return nil
+}
+
+func (lgu *LoadGeneratingConnectionUpload) Start(
+ parentCtx context.Context,
+ debugLevel debug.DebugLevel,
+) bool {
+ lgu.uploaded = 0
+ lgu.clientId = utilities.GenerateUniqueId()
+ lgu.debug = debugLevel
+
+ transport := &http.Transport{
+ Proxy: http.ProxyFromEnvironment,
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: lgu.InsecureSkipVerify,
+ },
+ }
+
+ if !utilities.IsInterfaceNil(lgu.KeyLogger) {
+ if debug.IsDebug(lgu.debug) {
+ fmt.Printf(
+ "Using an SSL Key Logger for this load-generating upload.\n",
+ )
+ }
+ transport.TLSClientConfig.KeyLogWriter = lgu.KeyLogger
+ }
+
+ utilities.OverrideHostTransport(transport, lgu.ConnectToAddr)
+
+ lgu.client = &http.Client{Transport: transport}
+
+ if debug.IsDebug(lgu.debug) {
+ fmt.Printf("Started a load-generating upload (id: %v).\n", lgu.clientId)
+ }
+
+ go lgu.doUpload(parentCtx)
+ return true
+}
+
+func (lgu *LoadGeneratingConnectionUpload) Stats() *stats.TraceStats {
+ // Get all your stats from the download side of the LGC.
+ return nil
+}
diff --git a/networkQuality.go b/networkQuality.go
index 06ac0a0..309aa94 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -32,6 +32,7 @@ import (
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
"github.com/network-quality/goresponsiveness/ms"
+ "github.com/network-quality/goresponsiveness/probe"
"github.com/network-quality/goresponsiveness/rpm"
"github.com/network-quality/goresponsiveness/stabilizer"
"github.com/network-quality/goresponsiveness/timeoutat"
@@ -246,8 +247,8 @@ func main() {
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
- var selfProbeDataLogger datalogger.DataLogger[rpm.ProbeDataPoint] = nil
- var foreignProbeDataLogger datalogger.DataLogger[rpm.ProbeDataPoint] = nil
+ var selfProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] = nil
+ var foreignProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] = nil
var downloadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil
var uploadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil
var granularThroughputDataLogger datalogger.DataLogger[rpm.GranularThroughputDataPoint] = nil
@@ -275,7 +276,7 @@ func main() {
"-throughput-granular-"+unique,
)
- selfProbeDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ProbeDataPoint](
+ selfProbeDataLogger, err = datalogger.CreateCSVDataLogger[probe.ProbeDataPoint](
dataLoggerSelfFilename,
)
if err != nil {
@@ -286,7 +287,7 @@ func main() {
selfProbeDataLogger = nil
}
- foreignProbeDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ProbeDataPoint](
+ foreignProbeDataLogger, err = datalogger.CreateCSVDataLogger[probe.ProbeDataPoint](
dataLoggerForeignFilename,
)
if err != nil {
@@ -333,10 +334,10 @@ func main() {
// If, for some reason, the data loggers are nil, make them Null Data Loggers so that we don't have conditional
// code later.
if selfProbeDataLogger == nil {
- selfProbeDataLogger = datalogger.CreateNullDataLogger[rpm.ProbeDataPoint]()
+ selfProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]()
}
if foreignProbeDataLogger == nil {
- foreignProbeDataLogger = datalogger.CreateNullDataLogger[rpm.ProbeDataPoint]()
+ foreignProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]()
}
if downloadThroughputDataLogger == nil {
downloadThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]()
@@ -352,26 +353,26 @@ func main() {
* Create (and then, ironically, name) two anonymous functions that, when invoked,
* will create load-generating connections for upload/download
*/
- generate_lgd := func() lgc.LoadGeneratingConnection {
+ generateLgdc := func() lgc.LoadGeneratingConnection {
lgd := lgc.NewLoadGeneratingConnectionDownload(config.Urls.LargeUrl, sslKeyFileConcurrentWriter, config.ConnectToAddr, *insecureSkipVerify)
return &lgd
}
- generate_lgu := func() lgc.LoadGeneratingConnection {
+ generateLguc := func() lgc.LoadGeneratingConnection {
lgu := lgc.NewLoadGeneratingConnectionUpload(config.Urls.UploadUrl, sslKeyFileConcurrentWriter, config.ConnectToAddr, *insecureSkipVerify)
return &lgu
}
- generateSelfProbeConfiguration := func() rpm.ProbeConfiguration {
- return rpm.ProbeConfiguration{
+ generateSelfProbeConfiguration := func() probe.ProbeConfiguration {
+ return probe.ProbeConfiguration{
URL: config.Urls.SmallUrl,
ConnectToAddr: config.ConnectToAddr,
InsecureSkipVerify: *insecureSkipVerify,
}
}
- generateForeignProbeConfiguration := func() rpm.ProbeConfiguration {
- return rpm.ProbeConfiguration{
+ generateForeignProbeConfiguration := func() probe.ProbeConfiguration {
+ return probe.ProbeConfiguration{
URL: config.Urls.SmallUrl,
ConnectToAddr: config.ConnectToAddr,
InsecureSkipVerify: *insecureSkipVerify,
@@ -393,7 +394,7 @@ func main() {
networkActivityCtx,
downloadLoadGeneratorOperatorCtx,
time.Second,
- generate_lgd,
+ generateLgdc,
&downloadLoadGeneratingConnectionCollection,
*calculateExtendedStats,
downloadDebugging,
@@ -402,7 +403,7 @@ func main() {
networkActivityCtx,
uploadLoadGeneratorOperatorCtx,
time.Second,
- generate_lgu,
+ generateLguc,
&uploadLoadGeneratingConnectionCollection,
*calculateExtendedStats,
uploadDebugging,
@@ -410,7 +411,7 @@ func main() {
// Handles for the first connection that the load-generating go routines (both up and
// download) open are passed back on the self[Down|Up]ProbeConnectionCommunicationChannel
- // so that we can then start probes on those handles.
+ // so that we can then start probes on those connections.
selfDownProbeConnection := <-selfDownProbeConnectionCommunicationChannel
selfUpProbeConnection := <-selfUpProbeConnectionCommunicationChannel
@@ -532,7 +533,7 @@ timeout:
fmt.Printf(
"################# Responsiveness is instantaneously %s.\n", utilities.Conditional(responsivenessIsStable, "stable", "unstable"))
}
- if probeMeasurement.Type == rpm.Foreign {
+ if probeMeasurement.Type == probe.Foreign {
// There may be more than one round trip accumulated together. If that is the case,
// we will blow them apart in to three separate measurements and each one will just
// be 1 / measurement.RoundTripCount of the total length.
@@ -540,13 +541,13 @@ timeout:
foreignRtts.AddElement(probeMeasurement.Duration.Seconds() / float64(probeMeasurement.RoundTripCount))
}
- } else if probeMeasurement.Type == rpm.SelfDown || probeMeasurement.Type == rpm.SelfUp {
+ } else if probeMeasurement.Type == probe.SelfDown || probeMeasurement.Type == probe.SelfUp {
selfRtts.AddElement(probeMeasurement.Duration.Seconds())
}
- if probeMeasurement.Type == rpm.Foreign {
+ if probeMeasurement.Type == probe.Foreign {
foreignProbeDataLogger.LogRecord(probeMeasurement)
- } else if probeMeasurement.Type == rpm.SelfDown || probeMeasurement.Type == rpm.SelfUp {
+ } else if probeMeasurement.Type == probe.SelfDown || probeMeasurement.Type == probe.SelfUp {
selfProbeDataLogger.LogRecord(probeMeasurement)
}
}
@@ -592,10 +593,11 @@ timeout:
defer downloadLoadGeneratingConnectionCollection.Lock.Unlock()
// Note: We do not trace upload connections!
- for i := 0; i < len(*downloadLoadGeneratingConnectionCollection.LGCs); i++ {
+ for i := 0; i < downloadLoadGeneratingConnectionCollection.Len(); i++ {
// Assume that extended statistics are available -- the check was done explicitly at
// program startup if the calculateExtendedStats flag was set by the user on the command line.
- if err := extendedStats.IncorporateConnectionStats((*downloadLoadGeneratingConnectionCollection.LGCs)[i].Stats().ConnInfo.Conn); err != nil {
+ currentLgc, _ := downloadLoadGeneratingConnectionCollection.Get(i)
+ if err := extendedStats.IncorporateConnectionStats((*currentLgc).Stats().ConnInfo.Conn); err != nil {
fmt.Fprintf(
os.Stderr,
"Warning: Could not add extended stats for the connection: %v\n",
diff --git a/rpm/rpm.go b/rpm/rpm.go
index db6f11e..99a52b6 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -20,7 +20,6 @@ import (
"fmt"
"io"
"net/http"
- "net/http/httptrace"
"os"
"sync"
"time"
@@ -29,8 +28,7 @@ import (
"github.com/network-quality/goresponsiveness/debug"
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
- "github.com/network-quality/goresponsiveness/stats"
- "github.com/network-quality/goresponsiveness/traceable"
+ "github.com/network-quality/goresponsiveness/probe"
"github.com/network-quality/goresponsiveness/utilities"
)
@@ -59,22 +57,6 @@ func addFlows(
return toAdd
}
-type ProbeConfiguration struct {
- ConnectToAddr string
- URL string
- Host string
- InsecureSkipVerify bool
-}
-
-type ProbeDataPoint struct {
- Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"`
- RoundTripCount uint64 `Description:"The number of round trips measured by this data point."`
- Duration time.Duration `Description:"The duration for this measurement." Formatter:"Seconds"`
- TCPRtt time.Duration `Description:"The underlying connection's RTT at probe time." Formatter:"Seconds"`
- TCPCwnd uint32 `Description:"The underlying connection's congestion window at probe time."`
- Type ProbeType `Description:"The type of the probe." Formatter:"Value"`
-}
-
type GranularThroughputDataPoint struct {
Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"`
Throughput float64 `Description:"Instantaneous throughput (B/s)."`
@@ -94,182 +76,26 @@ type ThroughputDataPoint struct {
type SelfDataCollectionResult struct {
RateBps float64
LGCs []lgc.LoadGeneratingConnection
- ProbeDataPoints []ProbeDataPoint
+ ProbeDataPoints []probe.ProbeDataPoint
LoggingContinuation func()
}
-type ProbeType int64
-
-const (
- SelfUp ProbeType = iota
- SelfDown
- Foreign
-)
-
-type ProbeRoundTripCountType uint16
-
-const (
- DefaultDownRoundTripCount ProbeRoundTripCountType = 1
- SelfUpRoundTripCount ProbeRoundTripCountType = 1
- SelfDownRoundTripCount ProbeRoundTripCountType = 1
- ForeignRoundTripCount ProbeRoundTripCountType = 3
-)
-
-func (pt ProbeType) Value() string {
- if pt == SelfUp {
- return "SelfUp"
- } else if pt == SelfDown {
- return "SelfDown"
- }
- return "Foreign"
-}
-
-func Probe(
- managingCtx context.Context,
- waitGroup *sync.WaitGroup,
- client *http.Client,
- probeUrl string,
- probeHost string, // optional: for use with a test_endpoint
- probeType ProbeType,
- result *chan ProbeDataPoint,
- captureExtendedStats bool,
- debugging *debug.DebugWithPrefix,
-) error {
-
- if waitGroup != nil {
- waitGroup.Add(1)
- defer waitGroup.Done()
- }
-
- if client == nil {
- return fmt.Errorf("cannot start a probe with a nil client")
- }
-
- probeId := utilities.GenerateUniqueId()
- probeTracer := NewProbeTracer(client, probeType, probeId, debugging)
- time_before_probe := time.Now()
- probe_req, err := http.NewRequestWithContext(
- httptrace.WithClientTrace(managingCtx, probeTracer.trace),
- "GET",
- probeUrl,
- nil,
- )
- if err != nil {
- return err
- }
-
- // Used to disable compression
- probe_req.Header.Set("Accept-Encoding", "identity")
- probe_req.Header.Set("User-Agent", utilities.UserAgent())
-
- probe_resp, err := client.Do(probe_req)
- if err != nil {
- return err
- }
-
- // Header.Get returns "" when not set
- if probe_resp.Header.Get("Content-Encoding") != "" {
- return fmt.Errorf("Content-Encoding header was set (compression not allowed)")
- }
-
- // TODO: Make this interruptable somehow by using _ctx_.
- _, err = io.ReadAll(probe_resp.Body)
- if err != nil {
- return err
- }
- time_after_probe := time.Now()
-
- // Depending on whether we think that Close() requires another RTT (via TCP), we
- // may need to move this before/after capturing the after time.
- probe_resp.Body.Close()
-
- sanity := time_after_probe.Sub(time_before_probe)
-
- // When the probe is run on a load-generating connection (a self probe) there should
- // only be a single round trip that is measured. We will take the accumulation of all these
- // values just to be sure, though. Because of how this traced connection was launched, most
- // of the values will be 0 (or very small where the time that go takes for delivering callbacks
- // and doing context switches pokes through). When it is !isSelfProbe then the values will
- // be significant and we want to add them regardless!
- totalDelay := probeTracer.GetTLSAndHttpHeaderDelta() + probeTracer.GetHttpDownloadDelta(
- time_after_probe,
- ) + probeTracer.GetTCPDelta()
-
- // We must have reused the connection if we are a self probe!
- if (probeType == SelfUp || probeType == SelfDown) && !probeTracer.stats.ConnectionReused {
- panic(!probeTracer.stats.ConnectionReused)
- }
-
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "(%s) (%s Probe %v) sanity vs total: %v vs %v\n",
- debugging.Prefix,
- probeType.Value(),
- probeId,
- sanity,
- totalDelay,
- )
- }
- roundTripCount := DefaultDownRoundTripCount
- if probeType == Foreign {
- roundTripCount = ForeignRoundTripCount
- }
- // Careful!!! It's possible that this channel has been closed because the Prober that
- // started it has been stopped. Writing to a closed channel will cause a panic. It might not
- // matter because a panic just stops the go thread containing the paniced code and we are in
- // a go thread that executes only this function.
- defer func() {
- isThreadPanicing := recover()
- if isThreadPanicing != nil && debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "(%s) (%s Probe %v) Probe attempted to write to the result channel after its invoker ended (official reason: %v).\n",
- debugging.Prefix,
- probeType.Value(),
- probeId,
- isThreadPanicing,
- )
- }
- }()
- tcpRtt := time.Duration(0 * time.Second)
- tcpCwnd := uint32(0)
- // TODO: Only get the extended stats for a connection if the user has requested them overall.
- if captureExtendedStats && extendedstats.ExtendedStatsAvailable() {
- tcpInfo, err := extendedstats.GetTCPInfo(probeTracer.stats.ConnInfo.Conn)
- if err == nil {
- tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond
- tcpCwnd = tcpInfo.Snd_cwnd
- } else {
- fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err)
- }
- }
- dataPoint := ProbeDataPoint{
- Time: time_before_probe,
- RoundTripCount: uint64(roundTripCount),
- Duration: totalDelay,
- TCPRtt: tcpRtt,
- TCPCwnd: tcpCwnd,
- Type: probeType,
- }
- *result <- dataPoint
- return nil
-}
-
func CombinedProber(
proberCtx context.Context,
networkActivityCtx context.Context,
- foreignProbeConfigurationGenerator func() ProbeConfiguration,
- selfProbeConfigurationGenerator func() ProbeConfiguration,
+ foreignProbeConfigurationGenerator func() probe.ProbeConfiguration,
+ selfProbeConfigurationGenerator func() probe.ProbeConfiguration,
selfDownProbeConnection lgc.LoadGeneratingConnection,
selfUpProbeConnection lgc.LoadGeneratingConnection,
probeInterval time.Duration,
keyLogger io.Writer,
captureExtendedStats bool,
debugging *debug.DebugWithPrefix,
-) (dataPoints chan ProbeDataPoint) {
+) (dataPoints chan probe.ProbeDataPoint) {
// Make a channel to send back all the generated data points
// when we are probing.
- dataPoints = make(chan ProbeDataPoint)
+ dataPoints = make(chan probe.ProbeDataPoint)
go func() {
wg := sync.WaitGroup{}
@@ -319,39 +145,39 @@ func CombinedProber(
// Start Foreign Connection Prober
probeCount++
- go Probe(
+ go probe.Probe(
networkActivityCtx,
&wg,
foreignProbeClient,
foreignProbeConfiguration.URL,
foreignProbeConfiguration.Host,
- Foreign,
+ probe.Foreign,
&dataPoints,
captureExtendedStats,
debugging,
)
// Start Self Download Connection Prober
- go Probe(
+ go probe.Probe(
networkActivityCtx,
&wg,
selfDownProbeConnection.Client(),
selfProbeConfiguration.URL,
selfProbeConfiguration.Host,
- SelfDown,
+ probe.SelfDown,
&dataPoints,
captureExtendedStats,
debugging,
)
// Start Self Upload Connection Prober
- go Probe(
+ go probe.Probe(
proberCtx,
&wg,
selfUpProbeConnection.Client(),
selfProbeConfiguration.URL,
selfProbeConfiguration.Host,
- SelfUp,
+ probe.SelfUp,
&dataPoints,
captureExtendedStats,
debugging,
@@ -380,7 +206,7 @@ func LoadGenerator(
loadGeneratorCtx context.Context, // Stop our activity when we no longer need to generate load.
rampupInterval time.Duration,
lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection.
- loadGeneratingConnections *lgc.LoadGeneratingConnectionCollection,
+ loadGeneratingConnectionsCollection *lgc.LoadGeneratingConnectionCollection,
captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections?
debugging *debug.DebugWithPrefix, // How can we forget debugging?
) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes.
@@ -400,7 +226,7 @@ func LoadGenerator(
flowsCreated += addFlows(
networkActivityCtx,
constants.StartingNumberOfLoadGeneratingConnections,
- loadGeneratingConnections,
+ loadGeneratingConnectionsCollection,
lgcGenerator,
debugging.Level,
)
@@ -408,9 +234,9 @@ func LoadGenerator(
// We have at least a single load-generating channel. This channel will be the one that
// the self probes use. Let's send it back to the caller so that they can pass it on if they need to.
go func() {
- loadGeneratingConnections.Lock.Lock()
- zerothConnection, err := loadGeneratingConnections.Get(0)
- loadGeneratingConnections.Lock.Unlock()
+ loadGeneratingConnectionsCollection.Lock.Lock()
+ zerothConnection, err := loadGeneratingConnectionsCollection.Get(0)
+ loadGeneratingConnectionsCollection.Lock.Unlock()
if err != nil {
panic("Could not get the zeroth connection!\n")
}
@@ -452,56 +278,76 @@ func LoadGenerator(
granularThroughputDatapoints := make([]GranularThroughputDataPoint, 0)
now = time.Now() // Used to align granular throughput data
allInvalid := true
- for i := range *loadGeneratingConnections.LGCs {
- if !(*loadGeneratingConnections.LGCs)[i].IsValid() {
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "%v: Load-generating connection with id %d is invalid ... skipping.\n",
+ for i := range *loadGeneratingConnectionsCollection.LGCs {
+ loadGeneratingConnectionsCollection.Lock.Lock()
+ connectionState := (*loadGeneratingConnectionsCollection.LGCs)[i].Status()
+ loadGeneratingConnectionsCollection.Lock.Unlock()
+ switch connectionState {
+ default:
+ {
+ error := fmt.Sprintf(
+ "%v: Load-generating connection with id %d is in an unrecognizable state.\n",
debugging,
- (*loadGeneratingConnections.LGCs)[i].ClientId(),
+ (*loadGeneratingConnectionsCollection.LGCs)[i].ClientId())
+ fmt.Fprintf(os.Stderr, "%s", error)
+ panic(error)
+ }
+ case lgc.LGC_STATUS_ERROR,
+ lgc.LGC_STATUS_DONE:
+ {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: Load-generating connection with id %d is invalid or complete ... skipping.\n",
+ debugging,
+ (*loadGeneratingConnectionsCollection.LGCs)[i].ClientId(),
+ )
+ }
+ // TODO: Do we add null connection to throughput? and how do we define it? Throughput -1 or 0?
+ granularThroughputDatapoints = append(
+ granularThroughputDatapoints,
+ GranularThroughputDataPoint{now, 0, uint32(i), 0, 0, ""},
)
+ continue
}
- // TODO: Do we add null connection to throughput? and how do we define it? Throughput -1 or 0?
- granularThroughputDatapoints = append(
- granularThroughputDatapoints,
- GranularThroughputDataPoint{now, 0, uint32(i), 0, 0, ""},
- )
- continue
- }
- allInvalid = false
- currentTransferred, currentInterval := (*loadGeneratingConnections.LGCs)[i].TransferredInInterval()
- // normalize to a second-long interval!
- instantaneousConnectionThroughput := float64(
- currentTransferred,
- ) / float64(
- currentInterval.Seconds(),
- )
- instantaneousTotalThroughput += instantaneousConnectionThroughput
+ case lgc.LGC_STATUS_RUNNING:
+ {
+ allInvalid = false
+ currentTransferred, currentInterval := (*loadGeneratingConnectionsCollection.LGCs)[i].TransferredInInterval()
+ // normalize to a second-long interval!
+ instantaneousConnectionThroughput := float64(
+ currentTransferred,
+ ) / float64(
+ currentInterval.Seconds(),
+ )
+ instantaneousTotalThroughput += instantaneousConnectionThroughput
- tcpRtt := time.Duration(0 * time.Second)
- tcpCwnd := uint32(0)
- if captureExtendedStats && extendedstats.ExtendedStatsAvailable() {
- if stats := (*loadGeneratingConnections.LGCs)[i].Stats(); stats != nil {
- tcpInfo, err := extendedstats.GetTCPInfo(stats.ConnInfo.Conn)
- if err == nil {
- tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond
- tcpCwnd = tcpInfo.Snd_cwnd
- } else {
- fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err)
+ tcpRtt := time.Duration(0 * time.Second)
+ tcpCwnd := uint32(0)
+ if captureExtendedStats && extendedstats.ExtendedStatsAvailable() {
+ if stats := (*loadGeneratingConnectionsCollection.LGCs)[i].Stats(); stats != nil {
+ tcpInfo, err := extendedstats.GetTCPInfo(stats.ConnInfo.Conn)
+ if err == nil {
+ tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond
+ tcpCwnd = tcpInfo.Snd_cwnd
+ } else {
+ fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err)
+ }
+ }
}
+ granularThroughputDatapoints = append(
+ granularThroughputDatapoints,
+ GranularThroughputDataPoint{
+ now,
+ instantaneousConnectionThroughput,
+ uint32(i),
+ tcpRtt,
+ tcpCwnd,
+ "",
+ },
+ )
}
+
}
- granularThroughputDatapoints = append(
- granularThroughputDatapoints,
- GranularThroughputDataPoint{
- now,
- instantaneousConnectionThroughput,
- uint32(i),
- tcpRtt,
- tcpCwnd,
- "",
- },
- )
}
// For some reason, all the lgcs are invalid. This likely means that
@@ -520,7 +366,7 @@ func LoadGenerator(
throughputDataPoint := ThroughputDataPoint{
time.Now(),
instantaneousTotalThroughput,
- len(*loadGeneratingConnections.LGCs),
+ len(*loadGeneratingConnectionsCollection.LGCs),
granularThroughputDatapoints,
}
throughputCalculations <- throughputDataPoint
@@ -529,7 +375,7 @@ func LoadGenerator(
flowsCreated += addFlows(
networkActivityCtx,
constants.AdditiveNumberOfLoadGeneratingConnections,
- loadGeneratingConnections,
+ loadGeneratingConnectionsCollection,
lgcGenerator,
debugging.Level,
)
@@ -543,281 +389,3 @@ func LoadGenerator(
}()
return
}
-
-type ProbeTracer struct {
- client *http.Client
- stats *stats.TraceStats
- trace *httptrace.ClientTrace
- debug debug.DebugLevel
- probeid uint64
- probeType ProbeType
-}
-
-func (p *ProbeTracer) String() string {
- return fmt.Sprintf("(Probe %v): stats: %v\n", p.probeid, p.stats)
-}
-
-func (p *ProbeTracer) ProbeId() uint64 {
- return p.probeid
-}
-
-func (p *ProbeTracer) GetTrace() *httptrace.ClientTrace {
- return p.trace
-}
-
-func (p *ProbeTracer) GetDnsDelta() time.Duration {
- if p.stats.ConnectionReused {
- return time.Duration(0)
- }
- delta := p.stats.DnsDoneTime.Sub(p.stats.DnsStartTime)
- if debug.IsDebug(p.debug) {
- fmt.Printf("(Probe %v): DNS Time: %v\n", p.probeid, delta)
- }
- return delta
-}
-
-func (p *ProbeTracer) GetTCPDelta() time.Duration {
- if p.stats.ConnectionReused {
- return time.Duration(0)
- }
- delta := p.stats.ConnectDoneTime.Sub(p.stats.ConnectStartTime)
- if debug.IsDebug(p.debug) {
- fmt.Printf("(Probe %v): TCP Connection Time: %v\n", p.probeid, delta)
- }
- return delta
-}
-
-func (p *ProbeTracer) GetTLSDelta() time.Duration {
- if utilities.IsSome(p.stats.TLSDoneTime) {
- panic("There should not be TLS information, but there is.")
- }
- delta := time.Duration(0)
- if debug.IsDebug(p.debug) {
- fmt.Printf("(Probe %v): TLS Time: %v\n", p.probeid, delta)
- }
- return delta
-}
-
-func (p *ProbeTracer) GetTLSAndHttpHeaderDelta() time.Duration {
- // Because the TLS handshake occurs *after* the TCP connection (ConnectDoneTime)
- // and *before* the HTTP transaction, we know that the delta between the time
- // that the first HTTP response byte is available and the time that the TCP
- // connection was established includes both the time for the HTTP header RTT
- // *and* the TLS handshake RTT, whether we can specifically measure the latter
- // or not. Eventually when TLS handshake tracing is fixed, we can break these
- // into separate buckets, but for now this workaround is reasonable.
- before := p.stats.ConnectDoneTime
- if p.stats.ConnectionReused {
- // When we reuse a connection there will be no time logged for when the
- // TCP connection was established (obviously). So, fall back to the time
- // when we were notified about reusing a connection (as a close approximation!).
- before = p.stats.GetConnectionDoneTime
- }
- delta := p.stats.HttpResponseReadyTime.Sub(before)
- if debug.IsDebug(p.debug) {
- fmt.Printf("(Probe %v): Http TLS and Header Time: %v\n", p.probeid, delta)
- }
- return delta
-}
-
-func (p *ProbeTracer) GetHttpHeaderDelta() time.Duration {
- panic(
- "Unusable until TLS tracing support is enabled! Use GetTLSAndHttpHeaderDelta() instead.\n",
- )
- delta := p.stats.HttpResponseReadyTime.Sub(utilities.GetSome(p.stats.TLSDoneTime))
- if debug.IsDebug(p.debug) {
- fmt.Printf("(Probe %v): Http Header Time: %v\n", p.probeid, delta)
- }
- return delta
-}
-
-func (p *ProbeTracer) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration {
- delta := httpDoneTime.Sub(p.stats.HttpResponseReadyTime)
- if debug.IsDebug(p.debug) {
- fmt.Printf("(Probe %v): Http Download Time: %v\n", p.probeid, delta)
- }
- return delta
-}
-
-func NewProbeTracer(
- client *http.Client,
- probeType ProbeType,
- probeId uint64,
- debugging *debug.DebugWithPrefix,
-) *ProbeTracer {
- probe := &ProbeTracer{
- client: client,
- stats: &stats.TraceStats{},
- trace: nil,
- debug: debugging.Level,
- probeid: probeId,
- probeType: probeType,
- }
- trace := traceable.GenerateHttpTimingTracer(probe, debugging.Level)
-
- probe.trace = trace
- return probe
-}
-
-func (probe *ProbeTracer) SetDnsStartTimeInfo(
- now time.Time,
- dnsStartInfo httptrace.DNSStartInfo,
-) {
- probe.stats.DnsStartTime = now
- probe.stats.DnsStart = dnsStartInfo
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) DNS Start for Probe %v: %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- dnsStartInfo,
- )
- }
-}
-
-func (probe *ProbeTracer) SetDnsDoneTimeInfo(
- now time.Time,
- dnsDoneInfo httptrace.DNSDoneInfo,
-) {
- probe.stats.DnsDoneTime = now
- probe.stats.DnsDone = dnsDoneInfo
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) DNS Done for Probe %v: %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.DnsDone,
- )
- }
-}
-
-func (probe *ProbeTracer) SetConnectStartTime(
- now time.Time,
-) {
- probe.stats.ConnectStartTime = now
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) TCP Start for Probe %v at %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.ConnectStartTime,
- )
- }
-}
-
-func (probe *ProbeTracer) SetConnectDoneTimeError(
- now time.Time,
- err error,
-) {
- probe.stats.ConnectDoneTime = now
- probe.stats.ConnectDoneError = err
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) TCP Done for Probe %v (with error %v) @ %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.ConnectDoneError,
- probe.stats.ConnectDoneTime,
- )
- }
-}
-
-func (probe *ProbeTracer) SetGetConnTime(now time.Time) {
- probe.stats.GetConnectionStartTime = now
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) Started getting connection for Probe %v @ %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.GetConnectionStartTime,
- )
- }
-}
-
-func (probe *ProbeTracer) SetGotConnTimeInfo(
- now time.Time,
- gotConnInfo httptrace.GotConnInfo,
-) {
- probe.stats.GetConnectionDoneTime = now
- probe.stats.ConnInfo = gotConnInfo
- probe.stats.ConnectionReused = gotConnInfo.Reused
- if (probe.probeType == SelfUp || probe.probeType == SelfDown) && !gotConnInfo.Reused {
- fmt.Fprintf(
- os.Stderr,
- "A self probe sent using a new connection!\n",
- )
- }
- if gotConnInfo.Reused {
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) Got a reused connection for Probe %v at %v with info %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.GetConnectionDoneTime,
- probe.stats.ConnInfo,
- )
- }
- }
-}
-
-func (probe *ProbeTracer) SetTLSHandshakeStartTime(
- now time.Time,
-) {
- probe.stats.TLSStartTime = utilities.Some(now)
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) Started TLS Handshake for Probe %v @ %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.TLSStartTime,
- )
- }
-}
-
-func (probe *ProbeTracer) SetTLSHandshakeDoneTimeState(
- now time.Time,
- connectionState tls.ConnectionState,
-) {
- probe.stats.TLSDoneTime = utilities.Some(now)
- probe.stats.TLSConnInfo = connectionState
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) Completed TLS handshake for Probe %v at %v with info %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.TLSDoneTime,
- probe.stats.TLSConnInfo,
- )
- }
-}
-
-func (probe *ProbeTracer) SetHttpWroteRequestTimeInfo(
- now time.Time,
- info httptrace.WroteRequestInfo,
-) {
- probe.stats.HttpWroteRequestTime = now
- probe.stats.HttpInfo = info
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) Http finished writing request for Probe %v at %v with info %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.HttpWroteRequestTime,
- probe.stats.HttpInfo,
- )
- }
-}
-
-func (probe *ProbeTracer) SetHttpResponseReadyTime(
- now time.Time,
-) {
- probe.stats.HttpResponseReadyTime = now
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) Http response is ready for Probe %v at %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.HttpResponseReadyTime,
- )
- }
-}
diff --git a/stabilizer/rev3.go b/stabilizer/rev3.go
index 4ab0bd9..5d1aeec 100644
--- a/stabilizer/rev3.go
+++ b/stabilizer/rev3.go
@@ -6,6 +6,7 @@ import (
"github.com/network-quality/goresponsiveness/debug"
"github.com/network-quality/goresponsiveness/ms"
+ "github.com/network-quality/goresponsiveness/probe"
"github.com/network-quality/goresponsiveness/rpm"
"github.com/network-quality/goresponsiveness/utilities"
)
@@ -55,7 +56,7 @@ func NewProbeStabilizer(
dbgLevel: debugLevel}
}
-func (r3 *ProbeStabilizer) AddMeasurement(measurement rpm.ProbeDataPoint) {
+func (r3 *ProbeStabilizer) AddMeasurement(measurement probe.ProbeDataPoint) {
r3.m.Lock()
defer r3.m.Unlock()