diff options
Diffstat (limited to 'rpm/rpm.go')
| -rw-r--r-- | rpm/rpm.go | 290 |
1 files changed, 238 insertions, 52 deletions
@@ -17,6 +17,7 @@ import ( "github.com/network-quality/goresponsiveness/stats" "github.com/network-quality/goresponsiveness/traceable" "github.com/network-quality/goresponsiveness/utilities" + "golang.org/x/net/http2" ) func addFlows( @@ -39,7 +40,8 @@ func addFlows( } type ProbeConfiguration struct { - URL string + URL string + Interval time.Duration } type DataPoint struct { @@ -53,26 +55,38 @@ type LGDataCollectionResult struct { DataPoints []DataPoint } -func LGProbe( +func Probe( parentProbeCtx context.Context, - connection lgc.LoadGeneratingConnection, - lgProbeUrl string, + client *http.Client, + probeUrl string, + isLGProbe bool, result *chan DataPoint, debugging *debug.DebugWithPrefix, ) error { - probeTracer := NewProbeTracer(connection.Client(), true, debugging) + + probeTypeLabel := "New-Connection" + if isLGProbe { + probeTypeLabel = "Load-Generating" + } + + if client == nil { + return fmt.Errorf("Cannot start a probe with a nil client") + } + + probeId := utilities.GenerateUniqueId() + probeTracer := NewProbeTracer(client, isLGProbe, probeId, debugging) time_before_probe := time.Now() probe_req, err := http.NewRequestWithContext( httptrace.WithClientTrace(parentProbeCtx, probeTracer.trace), "GET", - lgProbeUrl, + probeUrl, nil, ) if err != nil { return err } - probe_resp, err := connection.Client().Do(probe_req) + probe_resp, err := client.Do(probe_req) if err != nil { return err } @@ -90,52 +104,161 @@ func LGProbe( sanity := time_after_probe.Sub(time_before_probe) - tlsAndHttpHeaderDelta := probeTracer.GetTLSAndHttpHeaderDelta() - httpDownloadDelta := probeTracer.GetHttpDownloadDelta( + // When the probe is run on a load-generating connection there should + // only be a single round trip that is measured. We will take the accumulation of all these + // values just to be sure, though. Because of how this traced connection was launched, most + // of the values will be 0 (or very small where the time that go takes for delivering callbacks + // and doing context switches pokes through). When it is !isLGProbe then the values will + // be significant and we want to add them regardless! + totalDelay := probeTracer.GetTLSAndHttpHeaderDelta() + probeTracer.GetHttpDownloadDelta( time_after_probe, - ) // Combined with above, constitutes 2 time measurements, per the Spec. - tcpDelta := probeTracer.GetTCPDelta() // Constitutes 1 time measurement, per the Spec. - totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + tcpDelta + ) + probeTracer.GetTCPDelta() - // We must have reused the connection! - if !probeTracer.stats.ConnectionReused { + // We must have reused the connection if we are a load-generating probe! + if isLGProbe && !probeTracer.stats.ConnectionReused { panic(!probeTracer.stats.ConnectionReused) } if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) (Probe %v) sanity vs total: %v vs %v\n", + "(%s) (%s Probe %v) sanity vs total: %v vs %v\n", debugging.Prefix, - probeTracer.ProbeId(), + probeTypeLabel, + probeId, sanity, totalDelay, ) } - *result <- DataPoint{RoundTripCount: 1, Duration: totalDelay} + roundTripCount := uint64(1) + if !isLGProbe { + roundTripCount = 3 + } + // TODO: Careful!!! It's possible that this channel has been closed because the Prober that + // started it has been stopped. Writing to a closed channel will cause a panic. It might not + // matter because a panic just stops the go thread containing the paniced code and we are in + // a go thread that executes only this function. + defer func() { + _ = recover() + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) (%s Probe %v) Probe attempted to write to the result channel after its invoker ended.\n", + debugging.Prefix, + probeTypeLabel, + probeId, + ) + } + }() + *result <- DataPoint{RoundTripCount: roundTripCount, Duration: totalDelay} return nil } +func Prober( + proberCtx context.Context, + ncProbeConfigurationGenerator func() ProbeConfiguration, + keyLogger io.Writer, + debugging *debug.DebugWithPrefix, +) (points chan DataPoint) { + points = make(chan DataPoint) + + ncProbeConfiguration := ncProbeConfigurationGenerator() + + go func() { + probeCount := 0 + + for proberCtx.Err() == nil { + time.Sleep(ncProbeConfiguration.Interval) + + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) About to start new-connection probe number %d!\n", + debugging.Prefix, + probeCount, + ) + } + transport := http2.Transport{} + transport.TLSClientConfig = &tls.Config{} + + if !utilities.IsInterfaceNil(keyLogger) { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "Using an SSL Key Logger for this new-connection probe.\n", + ) + } + + // The presence of a custom TLSClientConfig in a *generic* `transport` + // means that go will default to HTTP/1.1 and cowardly avoid HTTP/2: + // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278 + // Also, it would appear that the API's choice of HTTP vs HTTP2 can + // depend on whether the url contains + // https:// or http://: + // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L74 + transport.TLSClientConfig.KeyLogWriter = keyLogger + } + transport.TLSClientConfig.InsecureSkipVerify = true + + client := &http.Client{Transport: &transport} + + probeCount++ + go Probe(proberCtx, client, ncProbeConfiguration.URL, false, &points, debugging) + } + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) New-connection probing driver is stopping after sending %d probes.\n", + debugging.Prefix, + probeCount, + ) + } + close(points) + }() + return +} + func LGProber( proberCtx context.Context, defaultConnection lgc.LoadGeneratingConnection, altConnections *[]lgc.LoadGeneratingConnection, - url string, - interval time.Duration, + lgProbeConfiguration ProbeConfiguration, debugging *debug.DebugWithPrefix, ) (points chan DataPoint) { points = make(chan DataPoint) + debugging = debug.NewDebugWithPrefix(debugging.Level, debugging.Prefix+" load-generating probe") + go func() { + probeCount := 0 for proberCtx.Err() == nil { - time.Sleep(interval) + time.Sleep(lgProbeConfiguration.Interval) if debug.IsDebug(debugging.Level) { - fmt.Printf("(%s) About to probe!\n", debugging.Prefix) + fmt.Printf( + "(%s) About to start load-generating probe number %d!\n", + debugging.Prefix, + probeCount, + ) } - go LGProbe(proberCtx, defaultConnection, url, &points, debugging) + probeCount++ + // TODO: We do not yet take in to account that the load-generating connection that we were given + // on which to perform measurements might go away during testing. We have access to all the open + // load-generating connections (altConnections) to handle this case, but we just aren't using them + // yet. + go Probe( + proberCtx, + defaultConnection.Client(), + lgProbeConfiguration.URL, + true, + &points, + debugging, + ) + } + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) Load-generating probing driver is stopping after sending %d probes.\n", + debugging.Prefix, + probeCount, + ) } + close(points) }() - return } @@ -159,14 +282,11 @@ func LGCollectData( debugging.Level, ) - lgProbeConfiguration := lgProbeConfigurationGenerator() - - LGProber( - lgDataCollectionCtx, + lgProbeCtx, lgProbeCtxCancel := context.WithCancel(lgDataCollectionCtx) + probeDataPointsChannel := LGProber(lgProbeCtx, lgcs[0], &lgcs, - lgProbeConfiguration.URL, - time.Duration(100*time.Millisecond), + lgProbeConfigurationGenerator(), debugging, ) @@ -191,17 +311,13 @@ func LGCollectData( for currentInterval := uint64(0); true; currentInterval++ { - // When the program stops operating, then stop. - if lgDataCollectionCtx.Err() != nil { + // When the program stops operating, then stop. When our invoker tells + // us to stop, then stop. + if operatingCtx.Err() != nil || lgDataCollectionCtx.Err() != nil { + lgProbeCtxCancel() return } - // We may be asked to stop trying to saturate the - // network and return our current status. - if lgDataCollectionCtx.Err() != nil { - //break - } - now := time.Now() // At each 1-second interval if nextSampleStartTime.Sub(now) > 0 { @@ -346,7 +462,19 @@ func LGCollectData( } } - resulted <- LGDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs} + lgProbeCtxCancel() + probeDataPoints := make([]DataPoint, 0) + for dataPoint := range probeDataPointsChannel { + probeDataPoints = append(probeDataPoints, dataPoint) + } + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) Collected %d load-generating probe data points\n", + debugging.Prefix, + len(probeDataPoints), + ) + } + resulted <- LGDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, DataPoints: probeDataPoints} }() return } @@ -446,13 +574,18 @@ func (p *ProbeTracer) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration return delta } -func NewProbeTracer(client *http.Client, isLG bool, debugging *debug.DebugWithPrefix) *ProbeTracer { +func NewProbeTracer( + client *http.Client, + isLG bool, + probeId uint64, + debugging *debug.DebugWithPrefix, +) *ProbeTracer { probe := &ProbeTracer{ client: client, stats: &stats.TraceStats{}, trace: nil, debug: debugging.Level, - probeid: utilities.GenerateConnectionId(), + probeid: probeId, isLG: isLG, } trace := traceable.GenerateHttpTimingTracer(probe, debugging.Level) @@ -468,8 +601,13 @@ func (probe *ProbeTracer) SetDnsStartTimeInfo( probe.stats.DnsStartTime = now probe.stats.DnsStart = dnsStartInfo if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) DNS Start for %v: %v\n", + "(%s Probe) DNS Start for Probe %v: %v\n", + probeTypeLabel, probe.ProbeId(), dnsStartInfo, ) @@ -483,8 +621,13 @@ func (probe *ProbeTracer) SetDnsDoneTimeInfo( probe.stats.DnsDoneTime = now probe.stats.DnsDone = dnsDoneInfo if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) DNS Done for %v: %v\n", + "(%s Probe) DNS Done for Probe %v: %v\n", + probeTypeLabel, probe.ProbeId(), probe.stats.DnsDone, ) @@ -496,8 +639,13 @@ func (probe *ProbeTracer) SetConnectStartTime( ) { probe.stats.ConnectStartTime = now if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) TCP Start for %v at %v\n", + "(%s Probe) TCP Start for Probe %v at %v\n", + probeTypeLabel, probe.ProbeId(), probe.stats.ConnectStartTime, ) @@ -511,8 +659,13 @@ func (probe *ProbeTracer) SetConnectDoneTimeError( probe.stats.ConnectDoneTime = now probe.stats.ConnectDoneError = err if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) TCP Done for %v (with error %v) @ %v\n", + "(%s Probe) TCP Done for Probe %v (with error %v) @ %v\n", + probeTypeLabel, probe.ProbeId(), probe.stats.ConnectDoneError, probe.stats.ConnectDoneTime, @@ -523,8 +676,13 @@ func (probe *ProbeTracer) SetConnectDoneTimeError( func (probe *ProbeTracer) SetGetConnTime(now time.Time) { probe.stats.GetConnectionStartTime = now if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) Started getting connection for %v @ %v\n", + "(%s Probe) Started getting connection for Probe %v @ %v\n", + probeTypeLabel, probe.ProbeId(), probe.stats.GetConnectionStartTime, ) @@ -539,13 +697,21 @@ func (probe *ProbeTracer) SetGotConnTimeInfo( probe.stats.ConnInfo = gotConnInfo probe.stats.ConnectionReused = gotConnInfo.Reused if probe.isLG && !gotConnInfo.Reused { - fmt.Fprintf(os.Stderr, "A probe sent on an LG Connection used a new connection!\n") + fmt.Fprintf( + os.Stderr, + "A probe sent on an load-generating connection used a new connection!\n", + ) } else if debug.IsDebug(probe.debug) { - fmt.Printf("Properly reused a connection when probing on an LG Connection!\n") + fmt.Printf("Properly reused a connection when probing on a load-generating connection!\n") } if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) Got a reused connection for %v at %v with info %v\n", + "(%s Probe) Got a reused connection for Probe %v at %v with info %v\n", + probeTypeLabel, probe.ProbeId(), probe.stats.GetConnectionDoneTime, probe.stats.ConnInfo, @@ -558,8 +724,13 @@ func (probe *ProbeTracer) SetTLSHandshakeStartTime( ) { probe.stats.TLSStartTime = utilities.Some(now) if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) Started TLS Handshake for %v @ %v\n", + "(%s Probe) Started TLS Handshake for Probe %v @ %v\n", + probeTypeLabel, probe.ProbeId(), probe.stats.TLSStartTime, ) @@ -573,8 +744,13 @@ func (probe *ProbeTracer) SetTLSHandshakeDoneTimeState( probe.stats.TLSDoneTime = utilities.Some(now) probe.stats.TLSConnInfo = connectionState if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) Completed TLS handshake for %v at %v with info %v\n", + "(%s Probe) Completed TLS handshake for Probe %v at %v with info %v\n", + probeTypeLabel, probe.ProbeId(), probe.stats.TLSDoneTime, probe.stats.TLSConnInfo, @@ -589,8 +765,13 @@ func (probe *ProbeTracer) SetHttpWroteRequestTimeInfo( probe.stats.HttpWroteRequestTime = now probe.stats.HttpInfo = info if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) Http finished writing request for %v at %v with info %v\n", + "(%s Probe) Http finished writing request for Probe %v at %v with info %v\n", + probeTypeLabel, probe.ProbeId(), probe.stats.HttpWroteRequestTime, probe.stats.HttpInfo, @@ -603,8 +784,13 @@ func (probe *ProbeTracer) SetHttpResponseReadyTime( ) { probe.stats.HttpResponseReadyTime = now if debug.IsDebug(probe.debug) { + probeTypeLabel := "New-Connection" + if probe.isLG { + probeTypeLabel = "Load-Generating" + } fmt.Printf( - "(Probe) Http response is ready for %v at %v\n", + "(%s Probe) Http response is ready for Probe %v at %v\n", + probeTypeLabel, probe.ProbeId(), probe.stats.HttpResponseReadyTime, ) |
