diff options
| -rw-r--r-- | lgc/lgc.go | 4 | ||||
| -rw-r--r-- | networkQuality.go | 71 | ||||
| -rw-r--r-- | rpm/rpm.go | 290 | ||||
| -rw-r--r-- | utilities/utilities.go | 2 | ||||
| -rw-r--r-- | utilities/utilities_test.go | 39 |
5 files changed, 350 insertions, 56 deletions
@@ -245,7 +245,7 @@ func (lgd *LoadGeneratingConnectionDownload) Start( debugLevel debug.DebugLevel, ) bool { lgd.downloaded = 0 - lgd.clientId = utilities.GenerateConnectionId() + lgd.clientId = utilities.GenerateUniqueId() transport := http2.Transport{} transport.TLSClientConfig = &tls.Config{} @@ -400,7 +400,7 @@ func (lgu *LoadGeneratingConnectionUpload) Start( debugLevel debug.DebugLevel, ) bool { lgu.uploaded = 0 - lgu.clientId = utilities.GenerateConnectionId() + lgu.clientId = utilities.GenerateUniqueId() lgu.debug = debugLevel // See above for the rationale of doing http2.Transport{} here diff --git a/networkQuality.go b/networkQuality.go index d5c3523..ca51e51 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -98,6 +98,9 @@ func main() { lgDataCollectionCtx, cancelLGDataCollectionCtx := context.WithCancel( context.Background(), ) + newConnectionProberCtx, newConnectionProberCtxCancel := context.WithCancel( + context.Background(), + ) config := &config.Config{} var debugLevel debug.DebugLevel = debug.Error @@ -198,11 +201,20 @@ func main() { } generate_lg_probe_configuration := func() rpm.ProbeConfiguration { - return rpm.ProbeConfiguration{URL: config.Urls.SmallUrl} + return rpm.ProbeConfiguration{URL: config.Urls.SmallUrl, Interval: 100 * time.Millisecond} + } + + generate_nc_probe_configuration := func() rpm.ProbeConfiguration { + return rpm.ProbeConfiguration{URL: config.Urls.SmallUrl, Interval: 100 * time.Millisecond} } var downloadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "download") var uploadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "upload") + var newConnectionDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "new connection probe") + + // TODO: Separate contexts for load generation and data collection. If we do that, if either of the two + // data collection go routines stops well before the other, they will continue to send probes and we can + // generate additional information! downloadDataCollectionChannel := rpm.LGCollectData( lgDataCollectionCtx, @@ -219,6 +231,13 @@ func main() { uploadDebugging, ) + newConnectionProbeDataPoints := rpm.Prober( + newConnectionProberCtx, + generate_nc_probe_configuration, + sslKeyFileConcurrentWriter, + newConnectionDebugging, + ) + dataCollectionTimeout := false uploadDataCollectionComplete := false downloadDataCollectionComple := false @@ -298,6 +317,9 @@ func main() { } } + // Shutdown the new-connection prober! + newConnectionProberCtxCancel() + // In the new version we are no longer going to wait to send probes until after // saturation. When we get here we are now only going to compute the results // and/or extended statistics! @@ -333,6 +355,53 @@ func main() { len(uploadDataCollectionResult.LGCs), ) + totalNewConnectionRoundTripTime := float64(0) + totalNewConnectionRoundTrips := uint64(0) + for ncDp := range newConnectionProbeDataPoints { + totalNewConnectionRoundTripTime += ncDp.Duration.Seconds() + totalNewConnectionRoundTrips += uint64(ncDp.RoundTripCount) + } + averageNewConnectionRoundTripTime := totalNewConnectionRoundTripTime / float64( + totalNewConnectionRoundTrips, + ) + newConnectionRpm := (1.0 / averageNewConnectionRoundTripTime) * 60.0 + if *debugCliFlag { + fmt.Printf( + "Total New-Connection Round Trips: %d, Total New-Connection Round Trip Time: %f, Average New-Connection Round Trip Time (in seconds): %f\n", + totalNewConnectionRoundTrips, + totalNewConnectionRoundTripTime, + averageNewConnectionRoundTripTime, + ) + fmt.Printf("(New-Connection) RPM: %f\n", newConnectionRpm) + } + + totalLoadGeneratingRoundTripTime := float64(0) + totalLoadGeneratingRoundTrips := uint64(0) + for _, dp := range downloadDataCollectionResult.DataPoints { + totalLoadGeneratingRoundTripTime += dp.Duration.Seconds() + totalLoadGeneratingRoundTrips += uint64(dp.RoundTripCount) + } + for _, dp := range uploadDataCollectionResult.DataPoints { + totalLoadGeneratingRoundTripTime += dp.Duration.Seconds() + totalLoadGeneratingRoundTrips += uint64(dp.RoundTripCount) + } + averageLoadGeneratingRoundTripTime := totalLoadGeneratingRoundTripTime / float64( + totalLoadGeneratingRoundTrips, + ) + loadGeneratingRPM := (1.0 / averageLoadGeneratingRoundTripTime) * 60.0 + if *debugCliFlag { + fmt.Printf( + "Total Load-Generating Round Trips: %d, Total New-Connection Round Trip Time: %f, Average New-Connection Round Trip Time (in seconds): %f\n", + totalLoadGeneratingRoundTrips, + totalLoadGeneratingRoundTripTime, + averageLoadGeneratingRoundTripTime, + ) + fmt.Printf("(Load-Generating) RPM: %f\n", loadGeneratingRPM) + } + + rpm := (newConnectionRpm + loadGeneratingRPM) / 2.0 + fmt.Printf("RPM: %5.0f\n", rpm) + if *calculateExtendedStats { fmt.Println(extendedStats.Repr()) } @@ -17,6 +17,7 @@ import ( "github.com/network-quality/goresponsiveness/stats" "github.com/network-quality/goresponsiveness/traceable" "github.com/network-quality/goresponsiveness/utilities" + "golang.org/x/net/http2" ) func addFlows( @@ -39,7 +40,8 @@ func addFlows( } type ProbeConfiguration struct { - URL string + URL string + Interval time.Duration } type DataPoint struct { @@ -53,26 +55,38 @@ type LGDataCollectionResult struct { DataPoints []DataPoint } -func LGProbe( +func Probe( parentProbeCtx context.Context, - connection lgc.LoadGeneratingConnection, - lgProbeUrl string, + client *http.Client, + probeUrl string, + isLGProbe bool, result *chan DataPoint, debugging *debug.DebugWithPrefix, ) error { - probeTracer := NewProbeTracer(connection.Client(), true, debugging) + + probeTypeLabel := "New-Connection" + if isLGProbe { + probeTypeLabel = "Load-Generating" + } + + if client == nil { + return fmt.Errorf("Cannot start a probe with a nil client") + } + + probeId := utilities.GenerateUniqueId() + probeTracer := NewProbeTracer(client, isLGProbe, probeId, debugging) time_before_probe := time.Now() probe_req, err := http.NewRequestWithContext( httptrace.WithClientTrace(parentProbeCtx, probeTracer.trace), "GET", - lgProbeUrl, + probeUrl, nil, ) if err != nil { return err } - probe_resp, err := connection.Client().Do(probe_req) + probe_resp, err := client.Do(probe_req) if err != nil { return err } @@ -90,52 +104,161 @@ func LGProbe( sanity := time_after_probe.Sub(time_before_probe) - tlsAndHttpHeaderDelta := probeTracer.GetTLSAndHttpHeaderDelta() - httpDownloadDelta := probeTracer.GetHttpDownloadDelta( + // When the probe is run on a load-generating connection there should + // only be a single round trip that is measured. We will take the accumulation of all these + // values just to be sure, though. Because of how this traced connection was launched, most + // of the values will be 0 (or very small where the time that go takes for delivering callbacks + // and doing context switches pokes through). When it is !isLGProbe then the values will + // be significant and we want to add them regardless! + totalDelay := probeTracer.GetTLSAndHttpHeaderDelta() + probeTracer.GetHttpDownloadDelta( time_after_probe, - ) // Combined with above, constitutes 2 time measurements, per the Spec. - tcpDelta := probeTracer.GetTCPDelta() // Constitutes 1 time measurement, per the Spec. - totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + tcpDelta + ) + probeTracer.GetTCPDelta() - // We must have reused the connection! - if !probeTracer.stats.ConnectionReused { + // We must have reused the connection if we are a load-generating probe! + if isLGProbe && !probeTracer.stats.ConnectionReused { panic(!probeTracer.stats.ConnectionReused) } if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) (Probe %v) sanity vs total: %v vs %v\n", + "(%s) (%s Probe %v) sanity vs total: %v vs %v\n", debugging.Prefix, - probeTracer.ProbeId(), + probeTypeLabel, + probeId, sanity, totalDelay, ) } - *result <- DataPoint{RoundTripCount: 1, Duration: totalDelay} + roundTripCount := uint64(1) + if !isLGProbe { + roundTripCount = 3 + } + // TODO: Careful!!! It's possible that this channel has been closed because the Prober that + // started it has been stopped. Writing to a closed channel will cause a panic. It might not + // matter because a panic just stops the go thread containing the paniced code and we are in + // a go thread that executes only this function. + defer func() { + _ = recover() + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) (%s Probe %v) Probe attempted to write to the result channel after its invoker ended.\n", + debugging.Prefix, + probeTypeLabel, + probeId, + ) + } + }() + *result <- DataPoint{RoundTripCount: roundTripCount, Duration: totalDelay} return nil } +func Prober( + proberCtx context.Context, + ncProbeConfigurationGenerator func() ProbeConfiguration, + keyLogger io.Writer, + debugging *debug.DebugWithPrefix, +) (points chan DataPoint) { + points = make(chan DataPoint) + + ncProbeConfiguration := ncProbeConfigurationGenerator() + + go func() { + probeCount := 0 + + for proberCtx.Err() == nil { + time.Sleep(ncProbeConfiguration.Interval) + + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) About to start new-connection probe number %d!\n", + debugging.Prefix, + probeCount, + ) + } + transport := http2.Transport{} + transport.TLSClientConfig = &tls.Config{} + + if !utilities.IsInterfaceNil(keyLogger) { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "Using an SSL Key Logger for this new-connection probe.\n", + ) + } + + // The presence of a custom TLSClientConfig in a *generic* `transport` + // means that go will default to HTTP/1.1 and cowardly avoid HTTP/2: + // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278 + // Also, it would appear that the API's choice of HTTP vs HTTP2 can + // depend on whether the url contains + // https:// or http://: + // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L74 + transport.TLSClientConfig.KeyLogWriter = keyLogger + } + transport.TLSClientConfig.InsecureSkipVerify = true + + client := &http.Client{Transport: &transport} + + probeCount++ + go Probe(proberCtx, client, ncProbeConfiguration.URL, false, &points, debugging) + } + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) New-connection probing driver is stopping after sending %d probes.\n", + debugging.Prefix, + probeCount, + ) + } + close(points) + }() + return +} + func LGProber( proberCtx context.Context, defaultConnection lgc.LoadGeneratingConnection, altConnections *[]lgc.LoadGeneratingConnection, - url string, - interval time.Duration, + lgProbeConfiguration ProbeConfiguration, debugging *debug.DebugWithPrefix, ) (points chan DataPoint) { points = make(chan DataPoint) + debugging = debug.NewDebugWithPrefix(debugging.Level, debugging.Prefix+" load-generating probe") + go func() { + probeCount := 0 for proberCtx.Err() == nil { - time.Sleep(interval) + time.Sleep(lgProbeConfiguration.Interval) if debug.IsDebug(debugging.Level) { - fmt.Printf("(%s) About to probe!\n", debugging.Prefix) + fmt.Printf( + "(%s) About to start load-generating probe number %d!\n", + debugging.Prefix, + probeCount, + ) } - go LGProbe(proberCtx, defaultConnection, url, &points, debugging) + probeCount++ + // TODO: We do not yet take in to account that the load-generating connection that we were given + // on which to perform measurements might go away during testing. We have access to all the open + // load-generating connections (altConnections) to handle this case, but we just aren't using them + // yet. + go Probe( + proberCtx, + defaultConnection.Client(), + lgProbeConfiguration.URL, + true, + &points, + debugging, + ) + } + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) Load-generating probing driver is stopping after sending %d probes.\n", + debugging.Prefix, + probeCount, + ) } + close(points) }() - return } @@ -159,14 +282,11 @@ func LGCollectData( debugging.Level, ) - lgProbeConfiguration := lgProbeConfigurationGenerator() - - LGProber( - lgDataCollectionCtx, + lgProbeCtx, lgProbeCtxCancel := context.WithCancel(lgDataCollectionCtx) + probeDataPointsChannel := LGProber(lgProbeCtx, lgcs[0], &lgcs, - lgProbeConfiguration.URL, - time.Duration(100*time.Millisecond), + lgProbeConfigurationGenerator(), debugging, ) @@ -191,17 +311,13 @@ func LGCollectData( for currentInterval := uint64(0); true; currentInterval++ { - // When the program stops operating, then stop. - if lgDataCollectionCtx.Err() != nil { + // When the program stops operating, then stop. When our invoker tells + // us to stop, then stop. + if operatingCtx.Err() != nil || lgDataCollectionCtx.Err() != nil { + lgProbeCtxCancel() return } - // We may be asked to stop trying to saturate the - // network and return our current status. - if lgDataCollectionCtx.Err() != nil { - //break - } - now := time.Now() // At each 1-second interval if nextSampleStartTime.Sub(now) > 0 { @@ -346,7 +462,19 @@ func LGCollectData( } } - resulted <- LGDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs} + lgProbeCtxCancel() + probeDataPoints := make([]DataPoint, 0) + for dataPoint := range probeDataPointsChannel { + probeDataPoints = append(probeDataPoints, dataPoint) + } + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) Collected %d load-generating probe data points\n", + debugging.Prefix, + len(probeDataPoints), + ) + } + resulted <- LGDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, DataPoints: probeDataPoints} }() return } @@ -446,13 +574,18 @@ func (p *ProbeTracer) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration return delta } -func NewProbeTracer(client *http.Client, isLG bool, debugging *debug.DebugWithPrefix) *ProbeTracer { +func NewProbeTracer( + client *http.Client, + isLG bool, + probeId uint64, + debugging *debug.DebugWithPrefix, +) *ProbeTracer { probe := &ProbeTracer{ client: client, stats: &stats.TraceStats{}, trace: nil, debug: debugging.Level, - probeid: utilities.GenerateConnectionId(), + probeid: probeId, isLG: isLG, } trace := traceable.GenerateHttpTimingTracer(probe, debugging.Level) @@ -468,8 +601,13 @@ func (probe *ProbeTracer) SetDnsStartTimeInfo( probe.stats.DnsStartTime = now probe.stats.DnsStart = dnsStartInfo if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) DNS Start for %v: %v\n", + "(%s Probe) DNS Start for Probe %v: %v\n", + probeTypeLabel, probe.ProbeId(), dnsStartInfo, ) @@ -483,8 +621,13 @@ func (probe *ProbeTracer) SetDnsDoneTimeInfo( probe.stats.DnsDoneTime = now probe.stats.DnsDone = dnsDoneInfo if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) DNS Done for %v: %v\n", + "(%s Probe) DNS Done for Probe %v: %v\n", + probeTypeLabel, probe.ProbeId(), probe.stats.DnsDone, ) @@ -496,8 +639,13 @@ func (probe *ProbeTracer) SetConnectStartTime( ) { probe.stats.ConnectStartTime = now if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) TCP Start for %v at %v\n", + "(%s Probe) TCP Start for Probe %v at %v\n", + probeTypeLabel, probe.ProbeId(), probe.stats.ConnectStartTime, ) @@ -511,8 +659,13 @@ func (probe *ProbeTracer) SetConnectDoneTimeError( probe.stats.ConnectDoneTime = now probe.stats.ConnectDoneError = err if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) TCP Done for %v (with error %v) @ %v\n", + "(%s Probe) TCP Done for Probe %v (with error %v) @ %v\n", + probeTypeLabel, probe.ProbeId(), probe.stats.ConnectDoneError, probe.stats.ConnectDoneTime, @@ -523,8 +676,13 @@ func (probe *ProbeTracer) SetConnectDoneTimeError( func (probe *ProbeTracer) SetGetConnTime(now time.Time) { probe.stats.GetConnectionStartTime = now if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) Started getting connection for %v @ %v\n", + "(%s Probe) Started getting connection for Probe %v @ %v\n", + probeTypeLabel, probe.ProbeId(), probe.stats.GetConnectionStartTime, ) @@ -539,13 +697,21 @@ func (probe *ProbeTracer) SetGotConnTimeInfo( probe.stats.ConnInfo = gotConnInfo probe.stats.ConnectionReused = gotConnInfo.Reused if probe.isLG && !gotConnInfo.Reused { - fmt.Fprintf(os.Stderr, "A probe sent on an LG Connection used a new connection!\n") + fmt.Fprintf( + os.Stderr, + "A probe sent on an load-generating connection used a new connection!\n", + ) } else if debug.IsDebug(probe.debug) { - fmt.Printf("Properly reused a connection when probing on an LG Connection!\n") + fmt.Printf("Properly reused a connection when probing on a load-generating connection!\n") } if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) Got a reused connection for %v at %v with info %v\n", + "(%s Probe) Got a reused connection for Probe %v at %v with info %v\n", + probeTypeLabel, probe.ProbeId(), probe.stats.GetConnectionDoneTime, probe.stats.ConnInfo, @@ -558,8 +724,13 @@ func (probe *ProbeTracer) SetTLSHandshakeStartTime( ) { probe.stats.TLSStartTime = utilities.Some(now) if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) Started TLS Handshake for %v @ %v\n", + "(%s Probe) Started TLS Handshake for Probe %v @ %v\n", + probeTypeLabel, probe.ProbeId(), probe.stats.TLSStartTime, ) @@ -573,8 +744,13 @@ func (probe *ProbeTracer) SetTLSHandshakeDoneTimeState( probe.stats.TLSDoneTime = utilities.Some(now) probe.stats.TLSConnInfo = connectionState if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) Completed TLS handshake for %v at %v with info %v\n", + "(%s Probe) Completed TLS handshake for Probe %v at %v with info %v\n", + probeTypeLabel, probe.ProbeId(), probe.stats.TLSDoneTime, probe.stats.TLSConnInfo, @@ -589,8 +765,13 @@ func (probe *ProbeTracer) SetHttpWroteRequestTimeInfo( probe.stats.HttpWroteRequestTime = now probe.stats.HttpInfo = info if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) Http finished writing request for %v at %v with info %v\n", + "(%s Probe) Http finished writing request for Probe %v at %v with info %v\n", + probeTypeLabel, probe.ProbeId(), probe.stats.HttpWroteRequestTime, probe.stats.HttpInfo, @@ -603,8 +784,13 @@ func (probe *ProbeTracer) SetHttpResponseReadyTime( ) { probe.stats.HttpResponseReadyTime = now if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) Http response is ready for %v at %v\n", + "(%s Probe) Http response is ready for Probe %v at %v\n", + probeTypeLabel, probe.ProbeId(), probe.stats.HttpResponseReadyTime, ) diff --git a/utilities/utilities.go b/utilities/utilities.go index 76acbd2..7b96bef 100644 --- a/utilities/utilities.go +++ b/utilities/utilities.go @@ -76,7 +76,7 @@ func SeekForAppend(file *os.File) (err error) { return } -var GenerateConnectionId func() uint64 = func() func() uint64 { +var GenerateUniqueId func() uint64 = func() func() uint64 { var nextConnectionId uint64 = 0 return func() uint64 { return atomic.AddUint64(&nextConnectionId, 1) diff --git a/utilities/utilities_test.go b/utilities/utilities_test.go new file mode 100644 index 0000000..72a11fb --- /dev/null +++ b/utilities/utilities_test.go @@ -0,0 +1,39 @@ +package utilities + +import ( + "sync" + "testing" + "time" +) + +func TestReadAfterCloseOnBufferedChannel(t *testing.T) { + communication := make(chan int, 100) + + maxC := 0 + + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + counter := 0 + for range make([]int, 50) { + communication <- counter + counter++ + } + close(communication) + wg.Done() + }() + + go func() { + time.Sleep(2 * time.Second) + for c := range communication { + maxC = c + } + wg.Done() + }() + + wg.Wait() + if maxC != 49 { + t.Fatalf("Did not read all sent items from a buffered channel after channel.") + } +} |
