diff options
| -rw-r--r-- | README.md | 8 | ||||
| -rw-r--r-- | debug/debug.go | 35 | ||||
| -rw-r--r-- | go.mod | 2 | ||||
| -rw-r--r-- | lgc/lgc.go | 387 | ||||
| -rw-r--r-- | networkQuality.go | 141 | ||||
| -rw-r--r-- | rpm/rpm.go | 342 | ||||
| -rw-r--r-- | stats/stats.go | 50 | ||||
| -rw-r--r-- | timeoutat/timeoutat.go | 8 | ||||
| -rw-r--r-- | traceable/traceable.go | 64 | ||||
| -rw-r--r-- | utilities/utilities.go | 97 |
10 files changed, 832 insertions, 302 deletions
@@ -88,11 +88,11 @@ $ ./networkQuality --config mensura.cdn-apple.com --port 443 --path /api/v1/gm/c ## Contributing -We *love* contributions. Before submitting a patch, format your code with `go fmt` *and* `golines`: +We *love* contributions. Before submitting a patch, format your code with `go fmt`. -``` -$ golines -w -m 80 --shorten-comments . -``` +Normally we would recommend that you format your code using `golines` before submitting, but `golines` does not appear to be compatible with Go's new generic types. + +This README.md will be updated when `golines` is patched to support Go's new generic types. You can easily install `golines` in to your `${GOPATH}` with diff --git a/debug/debug.go b/debug/debug.go new file mode 100644 index 0000000..2a3df4a --- /dev/null +++ b/debug/debug.go @@ -0,0 +1,35 @@ +package debug + +type DebugLevel int8 + +const ( + NoDebug DebugLevel = iota + Debug + Warn + Error +) + +type DebugWithPrefix struct { + Level DebugLevel + Prefix string +} + +func NewDebugWithPrefix(level DebugLevel, prefix string) *DebugWithPrefix { + return &DebugWithPrefix{Level: level, Prefix: prefix} +} + +func (d *DebugWithPrefix) String() string { + return d.Prefix +} + +func IsDebug(level DebugLevel) bool { + return level <= Debug +} + +func IsWarn(level DebugLevel) bool { + return level <= Warn +} + +func IsError(level DebugLevel) bool { + return level <= Error +} @@ -1,6 +1,6 @@ module github.com/network-quality/goresponsiveness -go 1.17 +go 1.18 require golang.org/x/net v0.0.0-20220225172249-27dd8689420f @@ -25,157 +25,191 @@ import ( "sync/atomic" "time" + "github.com/network-quality/goresponsiveness/debug" + "github.com/network-quality/goresponsiveness/stats" + "github.com/network-quality/goresponsiveness/traceable" "github.com/network-quality/goresponsiveness/utilities" "golang.org/x/net/http2" ) -var GenerateConnectionId func() uint64 = func() func() uint64 { - var nextConnectionId uint64 = 0 - return func() uint64 { - return atomic.AddUint64(&nextConnectionId, 1) - } -}() - type LoadGeneratingConnection interface { - Start(context.Context, bool) bool + Start(context.Context, debug.DebugLevel) bool Transferred() uint64 Client() *http.Client IsValid() bool ClientId() uint64 } -type LoadGeneratingConnectionStats struct { - dnsStart httptrace.DNSStartInfo - dnsDone httptrace.DNSDoneInfo - connInfo httptrace.GotConnInfo - httpInfo httptrace.WroteRequestInfo - tlsConnInfo tls.ConnectionState - dnsStartTime time.Time - dnsCompleteTime time.Time - tlsStartTime time.Time - tlsCompleteTime time.Time - connectStartTime time.Time - connectCompleteTime time.Time - getConnectionStartTime time.Time - getConnectionCompleteTime time.Time -} - type LoadGeneratingConnectionDownload struct { Path string downloaded uint64 client *http.Client - debug bool + debug debug.DebugLevel valid bool KeyLogger io.Writer clientId uint64 tracer *httptrace.ClientTrace - stats LoadGeneratingConnectionStats + stats stats.TraceStats } -func generateHttpTimingTracer( - lgd *LoadGeneratingConnectionDownload, -) *httptrace.ClientTrace { - newTracer := httptrace.ClientTrace{ - DNSStart: func(dnsStartInfo httptrace.DNSStartInfo) { - lgd.stats.dnsStartTime = time.Now() - lgd.stats.dnsStart = dnsStartInfo - if lgd.debug { - fmt.Printf( - "DNS Start for %v: %v\n", - lgd.ClientId(), - dnsStartInfo, - ) - } - }, - DNSDone: func(dnsDoneInfo httptrace.DNSDoneInfo) { - lgd.stats.dnsCompleteTime = time.Now() - lgd.stats.dnsDone = dnsDoneInfo - if lgd.debug { - fmt.Printf("DNS Done for %v: %v\n", lgd.ClientId(), dnsDoneInfo) - } - }, - ConnectStart: func(network, address string) { - lgd.stats.connectStartTime = time.Now() - if lgd.debug { - fmt.Printf( - "TCP Start of %v: %v: %v\n", - lgd.ClientId(), - network, - address, - ) - } - }, - ConnectDone: func(network, address string, err error) { - lgd.stats.connectCompleteTime = time.Now() - if lgd.debug { - fmt.Printf( - "TCP Done for %v: %v: %v (%v)\n", - lgd.ClientId(), - network, - address, - err, - ) - } - }, - GetConn: func(hostPort string) { - lgd.stats.getConnectionStartTime = time.Now() - if lgd.debug { - fmt.Printf( - "GetConn host port for %v: %v\n", - lgd.ClientId(), - hostPort, - ) - } - }, - GotConn: func(connInfo httptrace.GotConnInfo) { - if connInfo.Reused { - panic(!connInfo.Reused) - } - lgd.stats.connInfo = connInfo - lgd.stats.getConnectionCompleteTime = time.Now() - if lgd.debug { - fmt.Printf( - "GetConn host port for %v: %v\n", - lgd.ClientId(), - connInfo, - ) - } - }, - TLSHandshakeStart: func() { - lgd.stats.tlsStartTime = time.Now() - if lgd.debug { - fmt.Printf("TLSHandshakeStart for %v\n", lgd.ClientId()) - } - }, - TLSHandshakeDone: func(tlsConnState tls.ConnectionState, err error) { - lgd.stats.tlsCompleteTime = time.Now() - lgd.stats.tlsConnInfo = tlsConnState - if lgd.debug { - fmt.Printf( - "TLSHandshakeDone for %v: %v\n", - lgd.ClientId(), - tlsConnState, - ) - } - }, +func (lgd *LoadGeneratingConnectionDownload) SetDnsStartTimeInfo( + now time.Time, + dnsStartInfo httptrace.DNSStartInfo, +) { + lgd.stats.DnsStartTime = now + lgd.stats.DnsStart = dnsStartInfo + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "DNS Start for %v: %v\n", + lgd.ClientId(), + dnsStartInfo, + ) } - return &newTracer } -func (lbd *LoadGeneratingConnectionDownload) ClientId() uint64 { - return lbd.clientId +func (lgd *LoadGeneratingConnectionDownload) SetDnsDoneTimeInfo( + now time.Time, + dnsDoneInfo httptrace.DNSDoneInfo, +) { + lgd.stats.DnsDoneTime = now + lgd.stats.DnsDone = dnsDoneInfo + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "DNS Done for %v: %v\n", + lgd.ClientId(), + lgd.stats.DnsDone, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetConnectStartTime( + now time.Time, +) { + lgd.stats.ConnectStartTime = now + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "TCP Start for %v at %v\n", + lgd.ClientId(), + lgd.stats.ConnectStartTime, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetConnectDoneTimeError( + now time.Time, + err error, +) { + lgd.stats.ConnectDoneTime = now + lgd.stats.ConnectDoneError = err + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "TCP Done for %v (with error %v) @ %v\n", + lgd.ClientId(), + lgd.stats.ConnectDoneError, + lgd.stats.ConnectDoneTime, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetGetConnTime(now time.Time) { + lgd.stats.GetConnectionStartTime = now + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Started getting connection for %v @ %v\n", + lgd.ClientId(), + lgd.stats.GetConnectionStartTime, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetGotConnTimeInfo( + now time.Time, + gotConnInfo httptrace.GotConnInfo, +) { + lgd.stats.GetConnectionDoneTime = now + lgd.stats.ConnInfo = gotConnInfo + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Got connection for %v at %v with info %v\n", + lgd.ClientId(), + lgd.stats.GetConnectionDoneTime, + lgd.stats.ConnInfo, + ) + } } -func (lbd *LoadGeneratingConnectionDownload) Transferred() uint64 { - transferred := atomic.LoadUint64(&lbd.downloaded) - if lbd.debug { +func (lgd *LoadGeneratingConnectionDownload) SetTLSHandshakeStartTime( + now time.Time, +) { + lgd.stats.TLSStartTime = utilities.Some(now) + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Started TLS Handshake for %v @ %v\n", + lgd.ClientId(), + lgd.stats.TLSStartTime, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetTLSHandshakeDoneTimeState( + now time.Time, + connectionState tls.ConnectionState, +) { + lgd.stats.TLSDoneTime = utilities.Some(now) + lgd.stats.TLSConnInfo = connectionState + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Completed TLS handshake for %v at %v with info %v\n", + lgd.ClientId(), + lgd.stats.TLSDoneTime, + lgd.stats.TLSConnInfo, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetHttpWroteRequestTimeInfo( + now time.Time, + info httptrace.WroteRequestInfo, +) { + lgd.stats.HttpWroteRequestTime = now + lgd.stats.HttpInfo = info + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "(lgd) Http finished writing request for %v at %v with info %v\n", + lgd.ClientId(), + lgd.stats.HttpWroteRequestTime, + lgd.stats.HttpInfo, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) SetHttpResponseReadyTime( + now time.Time, +) { + lgd.stats.HttpResponseReadyTime = now + if debug.IsDebug(lgd.debug) { + fmt.Printf( + "Got the first byte of HTTP response headers for %v at %v\n", + lgd.ClientId(), + lgd.stats.HttpResponseReadyTime, + ) + } +} + +func (lgd *LoadGeneratingConnectionDownload) ClientId() uint64 { + return lgd.clientId +} + +func (lgd *LoadGeneratingConnectionDownload) Transferred() uint64 { + transferred := atomic.LoadUint64(&lgd.downloaded) + if debug.IsDebug(lgd.debug) { fmt.Printf("download: Transferred: %v\n", transferred) } return transferred } -func (lbd *LoadGeneratingConnectionDownload) Client() *http.Client { - return lbd.client +func (lgd *LoadGeneratingConnectionDownload) Client() *http.Client { + return lgd.client } type countingReader struct { @@ -193,16 +227,17 @@ func (cr *countingReader) Read(p []byte) (n int, err error) { return } -func (lbd *LoadGeneratingConnectionDownload) Start( +func (lgd *LoadGeneratingConnectionDownload) Start( ctx context.Context, - debug bool, + debugLevel debug.DebugLevel, ) bool { - lbd.downloaded = 0 - lbd.clientId = GenerateConnectionId() + lgd.downloaded = 0 + lgd.clientId = utilities.GenerateConnectionId() transport := http2.Transport{} + transport.TLSClientConfig = &tls.Config{} - if !utilities.IsInterfaceNil(lbd.KeyLogger) { - if debug { + if !utilities.IsInterfaceNil(lgd.KeyLogger) { + if debug.IsDebug(lgd.debug) { fmt.Printf( "Using an SSL Key Logger for this load-generating download.\n", ) @@ -215,24 +250,22 @@ func (lbd *LoadGeneratingConnectionDownload) Start( // depend on whether the url contains // https:// or http://: // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L74 - transport.TLSClientConfig = &tls.Config{ - KeyLogWriter: lbd.KeyLogger, - InsecureSkipVerify: true, - } + transport.TLSClientConfig.KeyLogWriter = lgd.KeyLogger } + transport.TLSClientConfig.InsecureSkipVerify = true - lbd.client = &http.Client{Transport: &transport} - lbd.debug = debug - lbd.valid = true - lbd.tracer = generateHttpTimingTracer(lbd) + lgd.client = &http.Client{Transport: &transport} + lgd.debug = debugLevel + lgd.valid = true + lgd.tracer = traceable.GenerateHttpTimingTracer(lgd, lgd.debug) - if debug { + if debug.IsDebug(lgd.debug) { fmt.Printf( "Started a load-generating download (id: %v).\n", - lbd.clientId, + lgd.clientId, ) } - go lbd.doDownload(ctx) + go lgd.doDownload(ctx) return true } func (lbd *LoadGeneratingConnectionDownload) IsValid() bool { @@ -240,22 +273,28 @@ func (lbd *LoadGeneratingConnectionDownload) IsValid() bool { } func (lbd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) { - newRequest, err := http.NewRequestWithContext( + var request *http.Request = nil + var get *http.Response = nil + var err error = nil + + if request, err = http.NewRequestWithContext( httptrace.WithClientTrace(ctx, lbd.tracer), "GET", lbd.Path, nil, - ) + ); err != nil { + lbd.valid = false + return + } - get, err := lbd.client.Do(newRequest) - if err != nil { + if get, err = lbd.client.Do(request); err != nil { lbd.valid = false return } cr := &countingReader{n: &lbd.downloaded, ctx: ctx, readable: get.Body} _, _ = io.Copy(ioutil.Discard, cr) get.Body.Close() - if lbd.debug { + if debug.IsDebug(lbd.debug) { fmt.Printf("Ending a load-generating download.\n") } } @@ -264,30 +303,30 @@ type LoadGeneratingConnectionUpload struct { Path string uploaded uint64 client *http.Client - debug bool + debug debug.DebugLevel valid bool KeyLogger io.Writer clientId uint64 } -func (lbu *LoadGeneratingConnectionUpload) ClientId() uint64 { - return lbu.clientId +func (lgu *LoadGeneratingConnectionUpload) ClientId() uint64 { + return lgu.clientId } -func (lbu *LoadGeneratingConnectionUpload) Transferred() uint64 { - transferred := atomic.LoadUint64(&lbu.uploaded) - if lbu.debug { +func (lgu *LoadGeneratingConnectionUpload) Transferred() uint64 { + transferred := atomic.LoadUint64(&lgu.uploaded) + if debug.IsDebug(lgu.debug) { fmt.Printf("upload: Transferred: %v\n", transferred) } return transferred } -func (lbu *LoadGeneratingConnectionUpload) Client() *http.Client { - return lbu.client +func (lgu *LoadGeneratingConnectionUpload) Client() *http.Client { + return lgu.client } -func (lbu *LoadGeneratingConnectionUpload) IsValid() bool { - return lbu.valid +func (lgu *LoadGeneratingConnectionUpload) IsValid() bool { + return lgu.valid } type syntheticCountingReader struct { @@ -306,52 +345,52 @@ func (s *syntheticCountingReader) Read(p []byte) (n int, err error) { return } -func (lbu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) bool { - lbu.uploaded = 0 - s := &syntheticCountingReader{n: &lbu.uploaded, ctx: ctx} +func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) bool { + lgu.uploaded = 0 + s := &syntheticCountingReader{n: &lgu.uploaded, ctx: ctx} var resp *http.Response = nil var err error - if resp, err = lbu.client.Post(lbu.Path, "application/octet-stream", s); err != nil { - lbu.valid = false + if resp, err = lgu.client.Post(lgu.Path, "application/octet-stream", s); err != nil { + lgu.valid = false + return false } resp.Body.Close() - if lbu.debug { + if debug.IsDebug(lgu.debug) { fmt.Printf("Ending a load-generating upload.\n") } return true } -func (lbu *LoadGeneratingConnectionUpload) Start( +func (lgu *LoadGeneratingConnectionUpload) Start( ctx context.Context, - debug bool, + debugLevel debug.DebugLevel, ) bool { - lbu.uploaded = 0 - lbu.clientId = GenerateConnectionId() + lgu.uploaded = 0 + lgu.clientId = utilities.GenerateConnectionId() + lgu.debug = debugLevel // See above for the rationale of doing http2.Transport{} here // to ensure that we are using h2. transport := http2.Transport{} + transport.TLSClientConfig = &tls.Config{} - if !utilities.IsInterfaceNil(lbu.KeyLogger) { - if debug { + if !utilities.IsInterfaceNil(lgu.KeyLogger) { + if debug.IsDebug(lgu.debug) { fmt.Printf( "Using an SSL Key Logger for this load-generating upload.\n", ) } - transport.TLSClientConfig = &tls.Config{ - KeyLogWriter: lbu.KeyLogger, - InsecureSkipVerify: true, - } + transport.TLSClientConfig.KeyLogWriter = lgu.KeyLogger } + transport.TLSClientConfig.InsecureSkipVerify = true - lbu.client = &http.Client{Transport: &transport} - lbu.debug = debug - lbu.valid = true + lgu.client = &http.Client{Transport: &transport} + lgu.valid = true - if debug { - fmt.Printf("Started a load-generating upload (id: %v).\n", lbu.clientId) + if debug.IsDebug(lgu.debug) { + fmt.Printf("Started a load-generating upload (id: %v).\n", lgu.clientId) } - go lbu.doUpload(ctx) + go lgu.doUpload(ctx) return true } diff --git a/networkQuality.go b/networkQuality.go index d450bf4..b5f9743 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -33,8 +33,10 @@ import ( "github.com/network-quality/goresponsiveness/ccw" "github.com/network-quality/goresponsiveness/constants" + "github.com/network-quality/goresponsiveness/debug" "github.com/network-quality/goresponsiveness/lgc" "github.com/network-quality/goresponsiveness/ma" + "github.com/network-quality/goresponsiveness/rpm" "github.com/network-quality/goresponsiveness/timeoutat" "github.com/network-quality/goresponsiveness/utilities" "golang.org/x/net/http2" @@ -57,7 +59,11 @@ var ( "config", "path on the server to the configuration endpoint.", ) - debug = flag.Bool("debug", constants.DefaultDebug, "Enable debugging.") + debugCliFlag = flag.Bool( + "debug", + constants.DefaultDebug, + "Enable debugging.", + ) timeout = flag.Int( "timeout", constants.DefaultTestTime, @@ -201,7 +207,7 @@ func addFlows( lgcs *[]lgc.LoadGeneratingConnection, lgcsPreviousTransferred *[]uint64, lgcGenerator func() lgc.LoadGeneratingConnection, - debug bool, + debug debug.DebugLevel, ) { for i := uint64(0); i < toAdd; i++ { *lgcs = append(*lgcs, lgcGenerator()) @@ -221,23 +227,11 @@ type SaturationResult struct { lgcs []lgc.LoadGeneratingConnection } -type Debugging struct { - Prefix string -} - -func NewDebugging(prefix string) *Debugging { - return &Debugging{Prefix: prefix} -} - -func (d *Debugging) String() string { - return d.Prefix -} - func saturate( saturationCtx context.Context, operatingCtx context.Context, lgcGenerator func() lgc.LoadGeneratingConnection, - debug *Debugging, + debugging *debug.DebugWithPrefix, ) (saturated chan SaturationResult) { saturated = make(chan SaturationResult) go func() { @@ -251,7 +245,7 @@ func saturate( &lgcs, &lgcsPreviousTransferred, lgcGenerator, - debug != nil, + debugging.Level, ) previousFlowIncreaseIteration := uint64(0) @@ -281,10 +275,10 @@ func saturate( now := time.Now() // At each 1-second interval if nextSampleStartTime.Sub(now) > 0 { - if debug != nil { + if debug.IsDebug(debugging.Level) { fmt.Printf( "%v: Sleeping until %v\n", - debug, + debugging, nextSampleStartTime, ) } @@ -300,10 +294,10 @@ func saturate( allInvalid := true for i := range lgcs { if !lgcs[i].IsValid() { - if debug != nil { + if debug.IsDebug(debugging.Level) { fmt.Printf( "%v: Load-generating connection with id %d is invalid ... skipping.\n", - debug, + debugging, lgcs[i].ClientId(), ) } @@ -319,10 +313,10 @@ func saturate( // For some reason, all the lgcs are invalid. This likely means that // the network/server went away. if allInvalid { - if debug != nil { + if debug.IsDebug(debugging.Level) { fmt.Printf( "%v: All lgcs were invalid. Assuming that network/server went away.\n", - debug, + debugging, ) } break @@ -339,25 +333,25 @@ func saturate( previousMovingAverage, ) - if debug != nil { + if debug.IsDebug(debugging.Level) { fmt.Printf( "%v: Instantaneous goodput: %f MB.\n", - debug, + debugging, utilities.ToMBps(float64(totalTransfer)), ) fmt.Printf( "%v: Previous moving average: %f MB.\n", - debug, + debugging, utilities.ToMBps(previousMovingAverage), ) fmt.Printf( "%v: Current moving average: %f MB.\n", - debug, + debugging, utilities.ToMBps(currentMovingAverage), ) fmt.Printf( "%v: Moving average delta: %f.\n", - debug, + debugging, movingAverageDelta, ) } @@ -377,10 +371,10 @@ func saturate( if (currentIteration - previousFlowIncreaseIteration) > uint64( constants.MovingAverageStabilitySpan, ) { - if debug != nil { + if debug.IsDebug(debugging.Level) { fmt.Printf( "%v: Adding flows because we are unsaturated and waited a while.\n", - debug, + debugging, ) } addFlows( @@ -389,31 +383,31 @@ func saturate( &lgcs, &lgcsPreviousTransferred, lgcGenerator, - debug != nil, + debugging.Level, ) previousFlowIncreaseIteration = currentIteration } else { - if debug != nil { - fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debug) + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debugging) } } } else { // Else, network reached saturation for the current flow count. - if debug != nil { - fmt.Printf("%v: Network reached saturation with current flow count.\n", debug) + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: Network reached saturation with current flow count.\n", debugging) } // If new flows added and for 4 seconds the moving average // throughput did not change: network reached stable saturation if (currentIteration-previousFlowIncreaseIteration) < uint64(constants.MovingAverageStabilitySpan) && movingAverageAverage.AllSequentialIncreasesLessThan(float64(5)) { - if debug != nil { - fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debug) + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging) } break } else { // Else, add four more flows - if debug != nil { - fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debug) + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging) } - addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, &lgcsPreviousTransferred, lgcGenerator, debug != nil) + addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, &lgcsPreviousTransferred, lgcGenerator, debugging.Level) previousFlowIncreaseIteration = currentIteration } } @@ -435,6 +429,11 @@ func main() { context.Background(), ) config := &Config{} + var debugLevel debug.DebugLevel = debug.Error + + if *debugCliFlag { + debugLevel = debug.Debug + } if err := config.Get(configHostPort, *configPath); err != nil { fmt.Fprintf(os.Stderr, "%s\n", err) @@ -449,16 +448,16 @@ func main() { ) return } - if *debug { + if debug.IsDebug(debugLevel) { fmt.Printf("Configuration: %s\n", config) } timeoutChannel := timeoutat.TimeoutAt( operatingCtx, timeoutAbsoluteTime, - *debug, + debugLevel, ) - if *debug { + if debug.IsDebug(debugLevel) { fmt.Printf("Test will end earlier than %v\n", timeoutAbsoluteTime) } @@ -495,7 +494,7 @@ func main() { fmt.Printf("Could not seek to the end of the key file: %v!\n", err) sslKeyFileConcurrentWriter = nil } else { - if *debug { + if debug.IsDebug(debugLevel) { fmt.Printf("Doing SSL key logging through file %v\n", *sslKeyFileName) } sslKeyFileConcurrentWriter = ccw.NewConcurrentFileWriter(sslKeyFileHandle) @@ -517,11 +516,11 @@ func main() { } } - var downloadDebugging *Debugging = nil - var uploadDebugging *Debugging = nil - if *debug { - downloadDebugging = &Debugging{Prefix: "download"} - uploadDebugging = &Debugging{Prefix: "upload"} + var downloadDebugging *debug.DebugWithPrefix = nil + var uploadDebugging *debug.DebugWithPrefix = nil + if debug.IsDebug(debugLevel) { + downloadDebugging = &debug.DebugWithPrefix{Prefix: "download"} + uploadDebugging = &debug.DebugWithPrefix{Prefix: "upload"} } downloadSaturationChannel := saturate( @@ -548,7 +547,7 @@ func main() { case downloadSaturation = <-downloadSaturationChannel: { downloadSaturated = true - if *debug { + if *debugCliFlag { fmt.Printf( "################# download is %s saturated (%fMBps, %d flows)!\n", utilities.Conditional( @@ -564,7 +563,7 @@ func main() { case uploadSaturation = <-uploadSaturationChannel: { uploadSaturated = true - if *debug { + if *debugCliFlag { fmt.Printf( "################# upload is %s saturated (%fMBps, %d flows)!\n", utilities.Conditional( @@ -588,7 +587,7 @@ func main() { "Error: Saturation could not be completed in time and no provisional rates could be accessed. Test failed.\n", ) cancelOperatingCtx() - if *debug { + if *debugCliFlag { time.Sleep(constants.CooldownPeriod) } return @@ -605,9 +604,9 @@ func main() { timeoutChannel = timeoutat.TimeoutAt( operatingCtx, timeoutAbsoluteTime, - *debug, + debugLevel, ) - if *debug { + if *debugCliFlag { fmt.Printf( "################# timeout reaching saturation!\n", ) @@ -625,7 +624,7 @@ func main() { timeoutChannel = timeoutat.TimeoutAt( operatingCtx, timeoutAbsoluteTime, - *debug, + debugLevel, ) } @@ -643,10 +642,10 @@ func main() { downloadSaturation.lgcs, ) if !downloadSaturation.lgcs[randomlgcsIndex].IsValid() { - if *debug { + if *debugCliFlag { fmt.Printf( "%v: The randomly selected download lgc (with id %d) was invalid. Skipping.\n", - debug, + debugCliFlag, downloadSaturation.lgcs[randomlgcsIndex].ClientId(), ) } @@ -655,7 +654,7 @@ func main() { // invalid connections and never // do the select below if time.Since(timeoutAbsoluteTime) > 0 { - if *debug { + if *debugCliFlag { fmt.Printf( "Pathologically could not find valid lgcs to use for measurement.\n", ) @@ -666,20 +665,26 @@ func main() { } newTransport := http2.Transport{} + newTransport.TLSClientConfig = &tls.Config{} if sslKeyFileConcurrentWriter != nil { - newTransport.TLSClientConfig = &tls.Config{ - KeyLogWriter: sslKeyFileConcurrentWriter, - InsecureSkipVerify: true, - } + newTransport.TLSClientConfig.KeyLogWriter = sslKeyFileConcurrentWriter } + newTransport.TLSClientConfig.InsecureSkipVerify = true newClient := http.Client{Transport: &newTransport} + newRTTProbe := rpm.NewProbe(&newClient, debugLevel) + + saturatedRTTProbe := rpm.NewProbe( + downloadSaturation.lgcs[randomlgcsIndex].Client(), + debugLevel, + ) + select { case <-timeoutChannel: { rttTimeout = true } - case sequentialRTTimes := <-utilities.CalculateSequentialRTTsTime(operatingCtx, downloadSaturation.lgcs[randomlgcsIndex].Client(), &newClient, config.Urls.SmallUrl): + case sequentialRTTimes := <-rpm.CalculateSequentialRTTsTime(operatingCtx, saturatedRTTProbe, newRTTProbe, config.Urls.SmallUrl, debugLevel): { if sequentialRTTimes.Err != nil { fmt.Printf( @@ -688,10 +693,14 @@ func main() { ) continue } + + if debug.IsDebug(debugLevel) { + fmt.Printf("rttProbe: %v\n", newRTTProbe) + } // We know that we have a good Sequential RTT. totalRTsCount += uint64(sequentialRTTimes.RoundTripCount) totalRTTimes += sequentialRTTimes.Delay.Seconds() - if *debug { + if debug.IsDebug(debugLevel) { fmt.Printf( "sequentialRTTsTime: %v\n", sequentialRTTimes.Delay.Seconds(), @@ -726,10 +735,6 @@ func main() { // Normalized to 60 seconds: 60 * (1 // / (totalRTTimes / totalRTsCount))) <- semantically the number of // probes per minute. - // I am concerned because the draft seems to conflate the concept of a - // probe - // with a roundtrip. In other words, I think that we are missing a - // multiplication by 5: DNS, TCP, TLS, HTTP GET, HTTP Download. rpm := float64( time.Minute.Seconds(), ) / (totalRTTimes / (float64(totalRTsCount))) @@ -740,7 +745,7 @@ func main() { } cancelOperatingCtx() - if *debug { + if *debugCliFlag { fmt.Printf("In debugging mode, we will cool down.\n") time.Sleep(constants.CooldownPeriod) fmt.Printf("Done cooling down.\n") diff --git a/rpm/rpm.go b/rpm/rpm.go new file mode 100644 index 0000000..e01e2e8 --- /dev/null +++ b/rpm/rpm.go @@ -0,0 +1,342 @@ +package rpm + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net/http" + "net/http/httptrace" + "time" + + "github.com/network-quality/goresponsiveness/debug" + "github.com/network-quality/goresponsiveness/stats" + "github.com/network-quality/goresponsiveness/traceable" + "github.com/network-quality/goresponsiveness/utilities" +) + +type Probe struct { + client *http.Client + stats *stats.TraceStats + trace *httptrace.ClientTrace + debug debug.DebugLevel + probeid uint64 +} + +func (p *Probe) String() string { + return fmt.Sprintf("(Probe %v): stats: %v\n", p.probeid, p.stats) +} + +func (p *Probe) ProbeId() uint64 { + return p.probeid +} + +func (p *Probe) GetTrace() *httptrace.ClientTrace { + return p.trace +} + +func (p *Probe) GetDnsDelta() time.Duration { + delta := p.stats.DnsDoneTime.Sub(p.stats.DnsStartTime) + if debug.IsDebug(p.debug) { + fmt.Printf("(Probe %v): DNS Time: %v\n", p.probeid, delta) + } + return delta +} + +func (p *Probe) GetTCPDelta() time.Duration { + delta := p.stats.ConnectDoneTime.Sub(p.stats.ConnectStartTime) + if debug.IsDebug(p.debug) { + fmt.Printf("(Probe %v): TCP Connection Time: %v\n", p.probeid, delta) + } + return delta +} + +func (p *Probe) GetTLSDelta() time.Duration { + if utilities.IsSome(p.stats.TLSDoneTime) { + panic("There should not be TLS information, but there is.") + } + delta := time.Duration(0) + if debug.IsDebug(p.debug) { + fmt.Printf("(Probe %v): TLS Time: %v\n", p.probeid, delta) + } + return delta +} + +func (p *Probe) GetTLSAndHttpHeaderDelta() time.Duration { + // Because the TLS handshake occurs *after* the TCP connection (ConnectDoneTime) + // and *before* the HTTP transaction, we know that the delta between the time + // that the first HTTP response byte is available and the time that the TCP + // connection was established includes both the time for the HTTP header RTT + // *and* the TLS handshake RTT, whether we can specifically measure the latter + // or not. Eventually when TLS handshake tracing is fixed, we can break these + // into separate buckets, but for now this workaround is reasonable. + delta := p.stats.HttpResponseReadyTime.Sub(p.stats.ConnectDoneTime) + if debug.IsDebug(p.debug) { + fmt.Printf("(Probe %v): Http TLS and Header Time: %v\n", p.probeid, delta) + } + return delta +} + +func (p *Probe) GetHttpHeaderDelta() time.Duration { + panic("Unusable until TLS tracing support is enabled! Use GetTLSAndHttpHeaderDelta() instead.\n") + delta := p.stats.HttpResponseReadyTime.Sub(utilities.GetSome(p.stats.TLSDoneTime)) + if debug.IsDebug(p.debug) { + fmt.Printf("(Probe %v): Http Header Time: %v\n", p.probeid, delta) + } + return delta +} + +func (p *Probe) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration { + delta := httpDoneTime.Sub(p.stats.HttpResponseReadyTime) + if debug.IsDebug(p.debug) { + fmt.Printf("(Probe %v): Http Download Time: %v\n", p.probeid, delta) + } + return delta +} + +func NewProbe(client *http.Client, debugLevel debug.DebugLevel) *Probe { + probe := &Probe{ + client: client, + stats: &stats.TraceStats{}, + trace: nil, + debug: debugLevel, + probeid: utilities.GenerateConnectionId(), + } + trace := traceable.GenerateHttpTimingTracer(probe, debugLevel) + + probe.trace = trace + return probe +} + +func (probe *Probe) SetDnsStartTimeInfo( + now time.Time, + dnsStartInfo httptrace.DNSStartInfo, +) { + probe.stats.DnsStartTime = now + probe.stats.DnsStart = dnsStartInfo + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(Probe) DNS Start for %v: %v\n", + probe.ProbeId(), + dnsStartInfo, + ) + } +} + +func (probe *Probe) SetDnsDoneTimeInfo( + now time.Time, + dnsDoneInfo httptrace.DNSDoneInfo, +) { + probe.stats.DnsDoneTime = now + probe.stats.DnsDone = dnsDoneInfo + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(Probe) DNS Done for %v: %v\n", + probe.ProbeId(), + probe.stats.DnsDone, + ) + } +} + +func (probe *Probe) SetConnectStartTime( + now time.Time, +) { + probe.stats.ConnectStartTime = now + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(Probe) TCP Start for %v at %v\n", + probe.ProbeId(), + probe.stats.ConnectStartTime, + ) + } +} + +func (probe *Probe) SetConnectDoneTimeError( + now time.Time, + err error, +) { + probe.stats.ConnectDoneTime = now + probe.stats.ConnectDoneError = err + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(Probe) TCP Done for %v (with error %v) @ %v\n", + probe.ProbeId(), + probe.stats.ConnectDoneError, + probe.stats.ConnectDoneTime, + ) + } +} + +func (probe *Probe) SetGetConnTime(now time.Time) { + probe.stats.GetConnectionStartTime = now + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(Probe) Started getting connection for %v @ %v\n", + probe.ProbeId(), + probe.stats.GetConnectionStartTime, + ) + } +} + +func (probe *Probe) SetGotConnTimeInfo( + now time.Time, + gotConnInfo httptrace.GotConnInfo, +) { + probe.stats.GetConnectionDoneTime = now + probe.stats.ConnInfo = gotConnInfo + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(Probe) Got connection for %v at %v with info %v\n", + probe.ProbeId(), + probe.stats.GetConnectionDoneTime, + probe.stats.ConnInfo, + ) + } +} + +func (probe *Probe) SetTLSHandshakeStartTime( + now time.Time, +) { + probe.stats.TLSStartTime = utilities.Some(now) + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(Probe) Started TLS Handshake for %v @ %v\n", + probe.ProbeId(), + probe.stats.TLSStartTime, + ) + } +} + +func (probe *Probe) SetTLSHandshakeDoneTimeState( + now time.Time, + connectionState tls.ConnectionState, +) { + probe.stats.TLSDoneTime = utilities.Some(now) + probe.stats.TLSConnInfo = connectionState + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(Probe) Completed TLS handshake for %v at %v with info %v\n", + probe.ProbeId(), + probe.stats.TLSDoneTime, + probe.stats.TLSConnInfo, + ) + } +} + +func (probe *Probe) SetHttpWroteRequestTimeInfo( + now time.Time, + info httptrace.WroteRequestInfo, +) { + probe.stats.HttpWroteRequestTime = now + probe.stats.HttpInfo = info + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(Probe) Http finished writing request for %v at %v with info %v\n", + probe.ProbeId(), + probe.stats.HttpWroteRequestTime, + probe.stats.HttpInfo, + ) + } +} + +func (probe *Probe) SetHttpResponseReadyTime( + now time.Time, +) { + probe.stats.HttpResponseReadyTime = now + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(Probe) Http response is ready for %v at %v\n", + probe.ProbeId(), + probe.stats.HttpResponseReadyTime, + ) + } +} + +func CalculateSequentialRTTsTime( + ctx context.Context, + saturated_rtt_probe *Probe, + new_rtt_probe *Probe, + url string, + debugLevel debug.DebugLevel, +) chan utilities.GetLatency { + responseChannel := make(chan utilities.GetLatency) + go func() { + before := time.Now() + roundTripCount := uint16(0) + /* + TODO: We are not going to measure round-trip times on the load-generating connection + right now because we are dealing with a massive amount of buffer bloat on the + Apple CDN. + + TODO: When this functionality is enabled, we may need to change the assertion in + the GotConn callback in the Traceable interface in traceable.go because a connection + will be reused in that case. If such a situation does come to pass, we will want to + move that assertion in to the various Traceable interface implementations that continue + to rely on this assertion. + + c_a, err := saturated_client.Get(url) + if err != nil { + responseChannel <- GetLatency{Delay: 0, RTTs: 0, Err: err} + return + } + // TODO: Make this interruptable somehow + // by using _ctx_. + _, err = io.ReadAll(c_a.Body) + if err != nil { + responseChannel <- GetLatency{Delay: 0, RTTs: 0, Err: err} + return + } + roundTripCount += 5 + c_a.Body.Close() + */ + c_b_req, err := http.NewRequestWithContext( + httptrace.WithClientTrace(ctx, new_rtt_probe.GetTrace()), + "GET", + url, + nil, + ) + if err != nil { + responseChannel <- utilities.GetLatency{Delay: 0, RoundTripCount: 0, Err: err} + return + } + + c_b, err := new_rtt_probe.client.Do(c_b_req) + if err != nil { + responseChannel <- utilities.GetLatency{Delay: 0, RoundTripCount: 0, Err: err} + return + } + + // TODO: Make this interruptable somehow by using _ctx_. + _, err = io.ReadAll(c_b.Body) + if err != nil { + responseChannel <- utilities.GetLatency{Delay: 0, Err: err} + return + } + after := time.Now() + + // Depending on whether we think that Close() requires another RTT (via TCP), we + // may need to move this before/after capturing the after time. + c_b.Body.Close() + + sanity := after.Sub(before) + + tlsAndHttpHeaderDelta := new_rtt_probe.GetTLSAndHttpHeaderDelta() // Constitutes 2 RTT, per the Spec. + httpDownloadDelta := new_rtt_probe.GetHttpDownloadDelta(after) // Constitutes 1 RTT, per the Spec. + dnsDelta := new_rtt_probe.GetDnsDelta() // Constitutes 1 RTT, per the Spec. + tcpDelta := new_rtt_probe.GetTCPDelta() // Constitutes 1 RTT, per the Spec. + totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + dnsDelta + tcpDelta + + if debug.IsDebug(debugLevel) { + fmt.Printf( + "(Probe %v) sanity vs total: %v vs %v\n", + new_rtt_probe.ProbeId(), + sanity, + totalDelay, + ) + } + + roundTripCount += 5 // According to addition, there are 5 RTTs that we measured. + responseChannel <- utilities.GetLatency{Delay: totalDelay, RoundTripCount: roundTripCount, Err: nil} + }() + return responseChannel +} diff --git a/stats/stats.go b/stats/stats.go new file mode 100644 index 0000000..a636326 --- /dev/null +++ b/stats/stats.go @@ -0,0 +1,50 @@ +package stats + +import ( + "crypto/tls" + "fmt" + "net/http/httptrace" + "time" + + "github.com/network-quality/goresponsiveness/utilities" +) + +type TraceStats struct { + DnsStart httptrace.DNSStartInfo + DnsDone httptrace.DNSDoneInfo + ConnInfo httptrace.GotConnInfo + HttpInfo httptrace.WroteRequestInfo + TLSConnInfo tls.ConnectionState + ConnectDoneError error + DnsStartTime time.Time + DnsDoneTime time.Time + TLSStartTime utilities.Optional[time.Time] + TLSDoneTime utilities.Optional[time.Time] + ConnectStartTime time.Time + ConnectDoneTime time.Time + GetConnectionStartTime time.Time + GetConnectionDoneTime time.Time + HttpWroteRequestTime time.Time + HttpResponseReadyTime time.Time +} + +func NewStats() *TraceStats { + return &TraceStats{} +} + +func (s *TraceStats) String() string { + return fmt.Sprintf("DnsStart: %v\n", s.DnsStart) + + fmt.Sprintf("DnsDone: %v\n", s.DnsDone) + + fmt.Sprintf("ConnInfo: %v\n", s.ConnInfo) + + fmt.Sprintf("HttpInfo: %v\n", s.HttpInfo) + + fmt.Sprintf("TLSConnInfo: %v\n", s.TLSConnInfo) + + fmt.Sprintf("ConnectDoneError: %v\n", s.ConnectDoneError) + + fmt.Sprintf("DnsStartTime: %v\n", s.DnsStartTime) + + fmt.Sprintf("DnsDoneTime: %v\n", s.DnsDoneTime) + + fmt.Sprintf("TLSDoneTime: %v\n", s.TLSDoneTime) + + fmt.Sprintf("ConnectStartTime: %v\n", s.ConnectStartTime) + + fmt.Sprintf("ConnectDoneTime: %v\n", s.ConnectDoneTime) + + fmt.Sprintf("GetConnectionStartTime: %v\n", s.GetConnectionStartTime) + + fmt.Sprintf("GetConnectionDoneTime: %v\n", s.GetConnectionDoneTime) + + fmt.Sprintf("HttpResponseReadyTime: %v\n", s.HttpResponseReadyTime) +} diff --git a/timeoutat/timeoutat.go b/timeoutat/timeoutat.go index 0e13a9f..673ca38 100644 --- a/timeoutat/timeoutat.go +++ b/timeoutat/timeoutat.go @@ -18,17 +18,19 @@ import ( "context" "fmt" "time" + + "github.com/network-quality/goresponsiveness/debug" ) func TimeoutAt( ctx context.Context, when time.Time, - debug bool, + debugLevel debug.DebugLevel, ) (response chan interface{}) { response = make(chan interface{}) go func(ctx context.Context) { go func() { - if debug { + if debug.IsDebug(debugLevel) { fmt.Printf("Timeout expected to end at %v\n", when) } select { @@ -36,7 +38,7 @@ func TimeoutAt( case <-ctx.Done(): } response <- struct{}{} - if debug { + if debug.IsDebug(debugLevel) { fmt.Printf("Timeout ended at %v\n", time.Now()) } }() diff --git a/traceable/traceable.go b/traceable/traceable.go new file mode 100644 index 0000000..6efc7f7 --- /dev/null +++ b/traceable/traceable.go @@ -0,0 +1,64 @@ +package traceable + +import ( + "crypto/tls" + "net/http/httptrace" + "time" + + "github.com/network-quality/goresponsiveness/debug" +) + +type Traceable interface { + SetDnsStartTimeInfo(time.Time, httptrace.DNSStartInfo) + SetDnsDoneTimeInfo(time.Time, httptrace.DNSDoneInfo) + SetConnectStartTime(time.Time) + SetConnectDoneTimeError(time.Time, error) + SetGetConnTime(time.Time) + SetGotConnTimeInfo(time.Time, httptrace.GotConnInfo) + SetTLSHandshakeStartTime(time.Time) + SetTLSHandshakeDoneTimeState(time.Time, tls.ConnectionState) + SetHttpWroteRequestTimeInfo(time.Time, httptrace.WroteRequestInfo) + SetHttpResponseReadyTime(time.Time) +} + +func GenerateHttpTimingTracer( + traceable Traceable, + debug debug.DebugLevel, +) *httptrace.ClientTrace { + tracer := httptrace.ClientTrace{ + DNSStart: func(dnsStartInfo httptrace.DNSStartInfo) { + traceable.SetDnsStartTimeInfo(time.Now(), dnsStartInfo) + }, + DNSDone: func(dnsDoneInfo httptrace.DNSDoneInfo) { + traceable.SetDnsDoneTimeInfo(time.Now(), dnsDoneInfo) + }, + ConnectStart: func(network, address string) { + traceable.SetConnectStartTime(time.Now()) + }, + ConnectDone: func(network, address string, err error) { + traceable.SetConnectDoneTimeError(time.Now(), err) + }, + GetConn: func(hostPort string) { + traceable.SetGetConnTime(time.Now()) + }, + GotConn: func(connInfo httptrace.GotConnInfo) { + if connInfo.Reused { + panic(!connInfo.Reused) + } + traceable.SetGotConnTimeInfo(time.Now(), connInfo) + }, + TLSHandshakeStart: func() { + traceable.SetTLSHandshakeStartTime(time.Now()) + }, + TLSHandshakeDone: func(tlsConnState tls.ConnectionState, err error) { + traceable.SetTLSHandshakeDoneTimeState(time.Now(), tlsConnState) + }, + WroteRequest: func(wroteRequest httptrace.WroteRequestInfo) { + traceable.SetHttpWroteRequestTimeInfo(time.Now(), wroteRequest) + }, + GotFirstResponseByte: func() { + traceable.SetHttpResponseReadyTime(time.Now()) + }, + } + return &tracer +} diff --git a/utilities/utilities.go b/utilities/utilities.go index 46d5766..160368b 100644 --- a/utilities/utilities.go +++ b/utilities/utilities.go @@ -15,12 +15,11 @@ package utilities import ( - "context" - "io" + "fmt" "math" - "net/http" "os" "reflect" + "sync/atomic" "time" ) @@ -68,56 +67,50 @@ type GetLatency struct { Err error } -func CalculateSequentialRTTsTime( - ctx context.Context, - saturated_client *http.Client, - new_client *http.Client, - url string, -) chan GetLatency { - responseChannel := make(chan GetLatency) - go func() { - roundTripCount := uint16(0) - before := time.Now() - /* - TODO: We are not going to measure round-trip times on the load-generating connection - right now because we are dealing with a massive amount of buffer bloat on the - Apple CDN. - - c_a, err := saturated_client.Get(url) - if err != nil { - responseChannel <- GetLatency{Delay: 0, RTTs: 0, Err: err} - return - } - // TODO: Make this interruptable somehow - // by using _ctx_. - _, err = io.ReadAll(c_a.Body) - if err != nil { - responseChannel <- GetLatency{Delay: 0, RTTs: 0, Err: err} - return - } - roundTripCount += 5 - c_a.Body.Close() - */ - c_b, err := new_client.Get(url) - if err != nil { - responseChannel <- GetLatency{Delay: 0, RoundTripCount: 0, Err: err} - return - } - // TODO: Make this interruptable somehow by using _ctx_. - _, err = io.ReadAll(c_b.Body) - if err != nil { - responseChannel <- GetLatency{Delay: 0, Err: err} - return - } - c_b.Body.Close() - // We use 1 here according to the wording in 4.2.1. - roundTripCount += 1 - responseChannel <- GetLatency{Delay: time.Since(before), RoundTripCount: roundTripCount, Err: nil} - }() - return responseChannel -} - func SeekForAppend(file *os.File) (err error) { _, err = file.Seek(0, 2) return } + +var GenerateConnectionId func() uint64 = func() func() uint64 { + var nextConnectionId uint64 = 0 + return func() uint64 { + return atomic.AddUint64(&nextConnectionId, 1) + } +}() + +type Optional[S any] struct { + value S + some bool +} + +func Some[S any](value S) Optional[S] { + return Optional[S]{value: value, some: true} +} + +func None[S any]() Optional[S] { + return Optional[S]{some: false} +} + +func IsNone[S any](optional Optional[S]) bool { + return !optional.some +} + +func IsSome[S any](optional Optional[S]) bool { + return optional.some +} + +func GetSome[S any](optional Optional[S]) S { + if !optional.some { + panic("Attempting to access Some of a None.") + } + return optional.value +} + +func (optional Optional[S]) String() string { + if IsSome(optional) { + return fmt.Sprintf("Some: %v", optional.some) + } else { + return "None" + } +} |
