diff options
| -rw-r--r-- | lgc/lgc.go | 25 | ||||
| -rw-r--r-- | networkQuality.go | 214 | ||||
| -rw-r--r-- | rpm/rpm.go | 296 |
3 files changed, 209 insertions, 326 deletions
@@ -241,7 +241,7 @@ func (cr *countingReader) Read(p []byte) (n int, err error) { } func (lgd *LoadGeneratingConnectionDownload) Start( - ctx context.Context, + parentCtx context.Context, debugLevel debug.DebugLevel, ) bool { lgd.downloaded = 0 @@ -278,9 +278,18 @@ func (lgd *LoadGeneratingConnectionDownload) Start( lgd.clientId, ) } - go lgd.doDownload(ctx) + + // Later, when the doDownload function attempts to add a tracer to the http request, + // it will be associated with the context. Multiple tracers associated with the same + // context will make it impossible to disambiguate the events. In other words, if there + // are multiple tracers associated with the same context, *all* the tracers get invoked + // every time that an event happens on a request with any of them! So, we will make a + // unique context so that there is a one-to-one correspondence between tracers and requests. + downloadCtx, _ := context.WithCancel(parentCtx) + go lgd.doDownload(downloadCtx) return true } + func (lgd *LoadGeneratingConnectionDownload) IsValid() bool { return lgd.valid } @@ -394,7 +403,7 @@ func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) bool { } func (lgu *LoadGeneratingConnectionUpload) Start( - ctx context.Context, + parentCtx context.Context, debugLevel debug.DebugLevel, ) bool { lgu.uploaded = 0 @@ -422,7 +431,15 @@ func (lgu *LoadGeneratingConnectionUpload) Start( if debug.IsDebug(lgu.debug) { fmt.Printf("Started a load-generating upload (id: %v).\n", lgu.clientId) } - go lgu.doUpload(ctx) + + // Later, when the doUpload function attempts to add a tracer to the http request, + // it will be associated with the context. Multiple tracers associated with the same + // context will make it impossible to disambiguate the events. In other words, if there + // are multiple tracers associated with the same context, *all* the tracers get invoked + // every time that an event happens on a request with any of them! So, we will make a + // unique context so that there is a one-to-one correspondence between tracers and requests. + uploadCtx, _ := context.WithCancel(parentCtx) + go lgu.doUpload(uploadCtx) return true } diff --git a/networkQuality.go b/networkQuality.go index 93c42e0..d5c3523 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -16,10 +16,8 @@ package main import ( "context" - "crypto/tls" "flag" "fmt" - "net/http" "os" "runtime/pprof" "time" @@ -33,7 +31,6 @@ import ( "github.com/network-quality/goresponsiveness/rpm" "github.com/network-quality/goresponsiveness/timeoutat" "github.com/network-quality/goresponsiveness/utilities" - "golang.org/x/net/http2" ) var ( @@ -98,7 +95,7 @@ func main() { timeoutAbsoluteTime := time.Now().Add(timeoutDuration) configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort) operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background()) - saturationCtx, cancelSaturationCtx := context.WithCancel( + lgDataCollectionCtx, cancelLGDataCollectionCtx := context.WithCancel( context.Background(), ) config := &config.Config{} @@ -187,84 +184,90 @@ func main() { * Create (and then, ironically, name) two anonymous functions that, when invoked, * will create load-generating connections for upload/download/ */ - generate_lbd := func() lgc.LoadGeneratingConnection { + generate_lgd := func() lgc.LoadGeneratingConnection { return &lgc.LoadGeneratingConnectionDownload{ Path: config.Urls.LargeUrl, KeyLogger: sslKeyFileConcurrentWriter, } } - generate_lbu := func() lgc.LoadGeneratingConnection { + generate_lgu := func() lgc.LoadGeneratingConnection { return &lgc.LoadGeneratingConnectionUpload{ Path: config.Urls.UploadUrl, KeyLogger: sslKeyFileConcurrentWriter, } } + generate_lg_probe_configuration := func() rpm.ProbeConfiguration { + return rpm.ProbeConfiguration{URL: config.Urls.SmallUrl} + } + var downloadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "download") var uploadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "upload") - downloadSaturationChannel := rpm.Saturate( - saturationCtx, + downloadDataCollectionChannel := rpm.LGCollectData( + lgDataCollectionCtx, operatingCtx, - generate_lbd, + generate_lgd, + generate_lg_probe_configuration, downloadDebugging, ) - uploadSaturationChannel := rpm.Saturate( - saturationCtx, + uploadDataCollectionChannel := rpm.LGCollectData( + lgDataCollectionCtx, operatingCtx, - generate_lbu, + generate_lgu, + generate_lg_probe_configuration, uploadDebugging, ) - saturationTimeout := false - uploadSaturated := false - downloadSaturated := false - downloadSaturation := rpm.SaturationResult{} - uploadSaturation := rpm.SaturationResult{} + dataCollectionTimeout := false + uploadDataCollectionComplete := false + downloadDataCollectionComple := false + downloadDataCollectionResult := rpm.LGDataCollectionResult{} + uploadDataCollectionResult := rpm.LGDataCollectionResult{} - for !(uploadSaturated && downloadSaturated) { + for !(uploadDataCollectionComplete && downloadDataCollectionComple) { select { - case downloadSaturation = <-downloadSaturationChannel: + case downloadDataCollectionResult = <-downloadDataCollectionChannel: { - downloadSaturated = true + downloadDataCollectionComple = true if *debugCliFlag { fmt.Printf( - "################# download is %s saturated (%fMBps, %d flows)!\n", + "################# download load-generating data collection is %s complete (%fMBps, %d flows)!\n", utilities.Conditional( - saturationTimeout, + dataCollectionTimeout, "(provisionally)", "", ), - utilities.ToMBps(downloadSaturation.RateBps), - len(downloadSaturation.LGCs), + utilities.ToMBps(downloadDataCollectionResult.RateBps), + len(downloadDataCollectionResult.LGCs), ) } } - case uploadSaturation = <-uploadSaturationChannel: + case uploadDataCollectionResult = <-uploadDataCollectionChannel: { - uploadSaturated = true + uploadDataCollectionComplete = true if *debugCliFlag { fmt.Printf( - "################# upload is %s saturated (%fMBps, %d flows)!\n", + "################# upload load-generating data collection is %s complete (%fMBps, %d flows)!\n", utilities.Conditional( - saturationTimeout, + dataCollectionTimeout, "(provisionally)", "", ), - utilities.ToMBps(uploadSaturation.RateBps), - len(uploadSaturation.LGCs), + utilities.ToMBps(uploadDataCollectionResult.RateBps), + len(uploadDataCollectionResult.LGCs), ) } } case <-timeoutChannel: { - if saturationTimeout { - // We already timedout on saturation. This signal means that - // we are timedout on getting the provisional saturation. We + if dataCollectionTimeout { + // We already timedout on data collection. This signal means that + // we are timedout on getting the provisional data collection. We // will exit! fmt.Fprint( os.Stderr, - "Error: Saturation could not be completed in time and no provisional rates could be assessed. Test failed.\n", + "Error: Load-Generating data collection could not be completed in time and no provisional data could be gathered. Test failed.\n", ) cancelOperatingCtx() if *debugCliFlag { @@ -272,13 +275,13 @@ func main() { } return } - saturationTimeout = true + dataCollectionTimeout = true - // We timed out attempting to saturate the link. So, we will - // shut down all the saturation xfers - cancelSaturationCtx() + // We timed out attempting to collect data about the link. So, we will + // shut down all the collection xfers + cancelLGDataCollectionCtx() // and then we will give ourselves some additional time in order - // to calculate a provisional saturation. + // to complete provisional data collection. timeoutAbsoluteTime = time.Now(). Add(time.Second * time.Duration(*rpmtimeout)) timeoutChannel = timeoutat.TimeoutAt( @@ -288,39 +291,27 @@ func main() { ) if *debugCliFlag { fmt.Printf( - "################# timeout reaching saturation!\n", + "################# timeout collecting load-generating data!\n", ) } } } } - // Give ourselves no more than 15 seconds to complete the RPM calculation. - // This is conditional because (above) we may have already added the time. - // We did it up there so that we could also limit the amount of time waiting - // for a conditional saturation calculation. - if !saturationTimeout { - timeoutAbsoluteTime = time.Now().Add(time.Second * time.Duration(*rpmtimeout)) - timeoutChannel = timeoutat.TimeoutAt( - operatingCtx, - timeoutAbsoluteTime, - debugLevel, - ) - } + // In the new version we are no longer going to wait to send probes until after + // saturation. When we get here we are now only going to compute the results + // and/or extended statistics! - totalMeasurements := uint64(0) - totalMeasurementTimes := float64(0) - measurementTimeout := false extendedStats := extendedstats.ExtendedStats{} - for i := 0; i < len(downloadSaturation.LGCs); i++ { + for i := 0; i < len(downloadDataCollectionResult.LGCs); i++ { // Assume that extended statistics are available -- the check was done explicitly at // program startup if the calculateExtendedStats flag was set by the user on the command line. if *calculateExtendedStats { if !extendedstats.ExtendedStatsAvailable() { panic("Extended stats are not available but the user requested their calculation.") } - if err := extendedStats.IncorporateConnectionStats(downloadSaturation.LGCs[i].Stats().ConnInfo.Conn); err != nil { + if err := extendedStats.IncorporateConnectionStats(downloadDataCollectionResult.LGCs[i].Stats().ConnInfo.Conn); err != nil { fmt.Fprintf( os.Stderr, "Warning: Could not add extended stats for the connection: %v", @@ -329,116 +320,19 @@ func main() { } } } - - for i := 0; i < constants.MeasurementProbeCount && !measurementTimeout; i++ { - if len(downloadSaturation.LGCs) == 0 { - continue - } - randomLGCsIndex := utilities.RandBetween(len(downloadSaturation.LGCs)) - if !downloadSaturation.LGCs[randomLGCsIndex].IsValid() { - if *debugCliFlag { - fmt.Printf( - "%v: The randomly selected saturated connection (with id %d) was invalid. Skipping.\n", - debugCliFlag, - downloadSaturation.LGCs[randomLGCsIndex].ClientId(), - ) - } - - // Protect against pathological cases where we continuously select - // invalid connections and never - // do the select below - if time.Since(timeoutAbsoluteTime) > 0 { - if *debugCliFlag { - fmt.Printf( - "Pathologically could not find valid saturated connections use for measurement.\n", - ) - } - break - } - continue - } - - unsaturatedMeasurementTransport := http2.Transport{} - unsaturatedMeasurementTransport.TLSClientConfig = &tls.Config{} - if sslKeyFileConcurrentWriter != nil { - unsaturatedMeasurementTransport.TLSClientConfig.KeyLogWriter = sslKeyFileConcurrentWriter - } - unsaturatedMeasurementTransport.TLSClientConfig.InsecureSkipVerify = true - newClient := http.Client{Transport: &unsaturatedMeasurementTransport} - - unsaturatedMeasurementProbe := rpm.NewProbe(&newClient, debugLevel) - - saturatedMeasurementProbe := rpm.NewProbe( - downloadSaturation.LGCs[randomLGCsIndex].Client(), - debugLevel, - ) - - select { - case <-timeoutChannel: - { - measurementTimeout = true - } - case sequentialMeasurementTimes := <-rpm.CalculateProbeMeasurements(operatingCtx, *strictFlag, saturatedMeasurementProbe, unsaturatedMeasurementProbe, config.Urls.SmallUrl, debugLevel): - { - if sequentialMeasurementTimes.Err != nil { - fmt.Printf( - "Failed to calculate a time for sequential measurements: %v\n", - sequentialMeasurementTimes.Err, - ) - continue - } - - if debug.IsDebug(debugLevel) { - fmt.Printf("unsaturatedMeasurementProbe: %v\n", unsaturatedMeasurementProbe) - } - // We know that we have a good Sequential measurement. - totalMeasurements += uint64(sequentialMeasurementTimes.MeasurementCount) - totalMeasurementTimes += sequentialMeasurementTimes.Delay.Seconds() - if debug.IsDebug(debugLevel) { - fmt.Printf( - "most-recent sequential measurement time: %v; most-recent sequential measurement count: %v\n", - sequentialMeasurementTimes.Delay.Seconds(), - sequentialMeasurementTimes.MeasurementCount, - ) - } - } - } - } - fmt.Printf( "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", - utilities.ToMbps(downloadSaturation.RateBps), - utilities.ToMBps(downloadSaturation.RateBps), - len(downloadSaturation.LGCs), + utilities.ToMbps(downloadDataCollectionResult.RateBps), + utilities.ToMBps(downloadDataCollectionResult.RateBps), + len(downloadDataCollectionResult.LGCs), ) fmt.Printf( "Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", - utilities.ToMbps(uploadSaturation.RateBps), - utilities.ToMBps(uploadSaturation.RateBps), - len(uploadSaturation.LGCs), + utilities.ToMbps(uploadDataCollectionResult.RateBps), + utilities.ToMBps(uploadDataCollectionResult.RateBps), + len(uploadDataCollectionResult.LGCs), ) - if totalMeasurements != 0 { - // "... it sums the five time values for each probe, and divides by the - // total - // number of probes to compute an average probe duration. The - // reciprocal of this, normalized to 60 seconds, gives the Round-trips - // Per Minute (RPM)." - // "average probe duration" = totalMeasurementTimes / totalMeasurements. - // The reciprocol of this = 1 / (totalMeasurementTimes / totalMeasurements) <- - // semantically the probes-per-second. - // Normalized to 60 seconds: 60 * (1 - // / ((totalMeasurementTimes / totalMeasurements)))) <- semantically the number of - // probes per minute. - rpm := float64( - time.Minute.Seconds(), - ) / (totalMeasurementTimes / (float64(totalMeasurements))) - fmt.Printf("Total measurements: %d\n", totalMeasurements) - fmt.Printf("RPM: %5.0f\n", rpm) - } else { - fmt.Printf("Error occurred calculating RPM -- no probe measurements received.\n") - } - if *calculateExtendedStats { fmt.Println(extendedStats.Repr()) } @@ -38,30 +38,119 @@ func addFlows( } } -type SaturationResult struct { - RateBps float64 - LGCs []lgc.LoadGeneratingConnection +type ProbeConfiguration struct { + URL string } -func Saturate( - saturationCtx context.Context, +type DataPoint struct { + RoundTripCount uint64 + Duration time.Duration +} + +type LGDataCollectionResult struct { + RateBps float64 + LGCs []lgc.LoadGeneratingConnection + DataPoints []DataPoint +} + +func LGProbe(parentProbeCtx context.Context, connection lgc.LoadGeneratingConnection, lgProbeUrl string, result *chan DataPoint, debugging *debug.DebugWithPrefix) error { + probeCtx, _ := context.WithCancel(parentProbeCtx) + probeTracer := NewProbeTracer(connection.Client(), true, debugging) + time_before_probe := time.Now() + probe_req, err := http.NewRequestWithContext( + httptrace.WithClientTrace(probeCtx, probeTracer.trace), + "GET", + lgProbeUrl, + nil, + ) + if err != nil { + return err + } + + probe_resp, err := connection.Client().Do(probe_req) + if err != nil { + return err + } + + // TODO: Make this interruptable somehow by using _ctx_. + _, err = io.ReadAll(probe_resp.Body) + if err != nil { + return err + } + time_after_probe := time.Now() + + // Depending on whether we think that Close() requires another RTT (via TCP), we + // may need to move this before/after capturing the after time. + probe_resp.Body.Close() + + sanity := time_after_probe.Sub(time_before_probe) + + tlsAndHttpHeaderDelta := probeTracer.GetTLSAndHttpHeaderDelta() + httpDownloadDelta := probeTracer.GetHttpDownloadDelta( + time_after_probe, + ) // Combined with above, constitutes 2 time measurements, per the Spec. + tcpDelta := probeTracer.GetTCPDelta() // Constitutes 1 time measurement, per the Spec. + totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + tcpDelta + + // We must have reused the connection! + if !probeTracer.stats.ConnectionReused { + panic(!probeTracer.stats.ConnectionReused) + } + + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) (Probe %v) sanity vs total: %v vs %v\n", + debugging.Prefix, + probeTracer.ProbeId(), + sanity, + totalDelay, + ) + } + *result <- DataPoint{RoundTripCount: 1, Duration: totalDelay} + + return nil + ///////////// +} +func LGProber(proberCtx context.Context, defaultConnection lgc.LoadGeneratingConnection, altConnections *[]lgc.LoadGeneratingConnection, url string, interval time.Duration, debugging *debug.DebugWithPrefix) (points chan DataPoint) { + points = make(chan DataPoint) + + go func() { + for proberCtx.Err() == nil { + time.Sleep(interval) + if debug.IsDebug(debugging.Level) { + fmt.Printf("(%s) About to probe!\n", debugging.Prefix) + } + go LGProbe(proberCtx, defaultConnection, url, &points, debugging) + } + }() + + return +} + +func LGCollectData( + lgDataCollectionCtx context.Context, operatingCtx context.Context, lgcGenerator func() lgc.LoadGeneratingConnection, + lgProbeConfigurationGenerator func() ProbeConfiguration, debugging *debug.DebugWithPrefix, -) (saturated chan SaturationResult) { - saturated = make(chan SaturationResult) +) (resulted chan LGDataCollectionResult) { + resulted = make(chan LGDataCollectionResult) go func() { lgcs := make([]lgc.LoadGeneratingConnection, 0) addFlows( - saturationCtx, + lgDataCollectionCtx, constants.StartingNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level, ) + lgProbeConfiguration := lgProbeConfigurationGenerator() + + LGProber(lgDataCollectionCtx, lgcs[0], &lgcs, lgProbeConfiguration.URL, time.Duration(100*time.Millisecond), debugging) + previousFlowIncreaseInterval := uint64(0) previousMovingAverage := float64(0) @@ -84,13 +173,13 @@ func Saturate( for currentInterval := uint64(0); true; currentInterval++ { // When the program stops operating, then stop. - if saturationCtx.Err() != nil { + if lgDataCollectionCtx.Err() != nil { return } // We may be asked to stop trying to saturate the // network and return our current status. - if saturationCtx.Err() != nil { + if lgDataCollectionCtx.Err() != nil { //break } @@ -204,7 +293,7 @@ func Saturate( ) } addFlows( - saturationCtx, + lgDataCollectionCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, @@ -232,38 +321,39 @@ func Saturate( if debug.IsDebug(debugging.Level) { fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging) } - addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level) + addFlows(lgDataCollectionCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level) previousFlowIncreaseInterval = currentInterval } } } - saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs} + resulted <- LGDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs} }() return } -type Probe struct { +type ProbeTracer struct { client *http.Client stats *stats.TraceStats trace *httptrace.ClientTrace debug debug.DebugLevel probeid uint64 + isLG bool } -func (p *Probe) String() string { +func (p *ProbeTracer) String() string { return fmt.Sprintf("(Probe %v): stats: %v\n", p.probeid, p.stats) } -func (p *Probe) ProbeId() uint64 { +func (p *ProbeTracer) ProbeId() uint64 { return p.probeid } -func (p *Probe) GetTrace() *httptrace.ClientTrace { +func (p *ProbeTracer) GetTrace() *httptrace.ClientTrace { return p.trace } -func (p *Probe) GetDnsDelta() time.Duration { +func (p *ProbeTracer) GetDnsDelta() time.Duration { if p.stats.ConnectionReused { return time.Duration(0) } @@ -274,7 +364,7 @@ func (p *Probe) GetDnsDelta() time.Duration { return delta } -func (p *Probe) GetTCPDelta() time.Duration { +func (p *ProbeTracer) GetTCPDelta() time.Duration { if p.stats.ConnectionReused { return time.Duration(0) } @@ -285,7 +375,7 @@ func (p *Probe) GetTCPDelta() time.Duration { return delta } -func (p *Probe) GetTLSDelta() time.Duration { +func (p *ProbeTracer) GetTLSDelta() time.Duration { if utilities.IsSome(p.stats.TLSDoneTime) { panic("There should not be TLS information, but there is.") } @@ -296,7 +386,7 @@ func (p *Probe) GetTLSDelta() time.Duration { return delta } -func (p *Probe) GetTLSAndHttpHeaderDelta() time.Duration { +func (p *ProbeTracer) GetTLSAndHttpHeaderDelta() time.Duration { // Because the TLS handshake occurs *after* the TCP connection (ConnectDoneTime) // and *before* the HTTP transaction, we know that the delta between the time // that the first HTTP response byte is available and the time that the TCP @@ -318,7 +408,7 @@ func (p *Probe) GetTLSAndHttpHeaderDelta() time.Duration { return delta } -func (p *Probe) GetHttpHeaderDelta() time.Duration { +func (p *ProbeTracer) GetHttpHeaderDelta() time.Duration { panic( "Unusable until TLS tracing support is enabled! Use GetTLSAndHttpHeaderDelta() instead.\n", ) @@ -329,7 +419,7 @@ func (p *Probe) GetHttpHeaderDelta() time.Duration { return delta } -func (p *Probe) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration { +func (p *ProbeTracer) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration { delta := httpDoneTime.Sub(p.stats.HttpResponseReadyTime) if debug.IsDebug(p.debug) { fmt.Printf("(Probe %v): Http Download Time: %v\n", p.probeid, delta) @@ -337,21 +427,22 @@ func (p *Probe) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration { return delta } -func NewProbe(client *http.Client, debugLevel debug.DebugLevel) *Probe { - probe := &Probe{ +func NewProbeTracer(client *http.Client, isLG bool, debugging *debug.DebugWithPrefix) *ProbeTracer { + probe := &ProbeTracer{ client: client, stats: &stats.TraceStats{}, trace: nil, - debug: debugLevel, + debug: debugging.Level, probeid: utilities.GenerateConnectionId(), + isLG: isLG, } - trace := traceable.GenerateHttpTimingTracer(probe, debugLevel) + trace := traceable.GenerateHttpTimingTracer(probe, debugging.Level) probe.trace = trace return probe } -func (probe *Probe) SetDnsStartTimeInfo( +func (probe *ProbeTracer) SetDnsStartTimeInfo( now time.Time, dnsStartInfo httptrace.DNSStartInfo, ) { @@ -366,7 +457,7 @@ func (probe *Probe) SetDnsStartTimeInfo( } } -func (probe *Probe) SetDnsDoneTimeInfo( +func (probe *ProbeTracer) SetDnsDoneTimeInfo( now time.Time, dnsDoneInfo httptrace.DNSDoneInfo, ) { @@ -381,7 +472,7 @@ func (probe *Probe) SetDnsDoneTimeInfo( } } -func (probe *Probe) SetConnectStartTime( +func (probe *ProbeTracer) SetConnectStartTime( now time.Time, ) { probe.stats.ConnectStartTime = now @@ -394,7 +485,7 @@ func (probe *Probe) SetConnectStartTime( } } -func (probe *Probe) SetConnectDoneTimeError( +func (probe *ProbeTracer) SetConnectDoneTimeError( now time.Time, err error, ) { @@ -410,7 +501,7 @@ func (probe *Probe) SetConnectDoneTimeError( } } -func (probe *Probe) SetGetConnTime(now time.Time) { +func (probe *ProbeTracer) SetGetConnTime(now time.Time) { probe.stats.GetConnectionStartTime = now if debug.IsDebug(probe.debug) { fmt.Printf( @@ -421,21 +512,21 @@ func (probe *Probe) SetGetConnTime(now time.Time) { } } -func (probe *Probe) SetGotConnTimeInfo( +func (probe *ProbeTracer) SetGotConnTimeInfo( now time.Time, gotConnInfo httptrace.GotConnInfo, ) { probe.stats.GetConnectionDoneTime = now probe.stats.ConnInfo = gotConnInfo probe.stats.ConnectionReused = gotConnInfo.Reused + if probe.isLG && !gotConnInfo.Reused { + fmt.Fprintf(os.Stderr, "A probe sent on an LG Connection used a new connection!\n") + } else if debug.IsDebug(probe.debug) { + fmt.Printf("Properly reused a connection when probing on an LG Connection!\n") + } if debug.IsDebug(probe.debug) { - reusedString := "(new)" - if probe.stats.ConnectionReused { - reusedString = "(reused)" - } fmt.Printf( - "(Probe) Got %v connection for %v at %v with info %v\n", - reusedString, + "(Probe) Got a reused connection for %v at %v with info %v\n", probe.ProbeId(), probe.stats.GetConnectionDoneTime, probe.stats.ConnInfo, @@ -443,7 +534,7 @@ func (probe *Probe) SetGotConnTimeInfo( } } -func (probe *Probe) SetTLSHandshakeStartTime( +func (probe *ProbeTracer) SetTLSHandshakeStartTime( now time.Time, ) { probe.stats.TLSStartTime = utilities.Some(now) @@ -456,7 +547,7 @@ func (probe *Probe) SetTLSHandshakeStartTime( } } -func (probe *Probe) SetTLSHandshakeDoneTimeState( +func (probe *ProbeTracer) SetTLSHandshakeDoneTimeState( now time.Time, connectionState tls.ConnectionState, ) { @@ -472,7 +563,7 @@ func (probe *Probe) SetTLSHandshakeDoneTimeState( } } -func (probe *Probe) SetHttpWroteRequestTimeInfo( +func (probe *ProbeTracer) SetHttpWroteRequestTimeInfo( now time.Time, info httptrace.WroteRequestInfo, ) { @@ -488,7 +579,7 @@ func (probe *Probe) SetHttpWroteRequestTimeInfo( } } -func (probe *Probe) SetHttpResponseReadyTime( +func (probe *ProbeTracer) SetHttpResponseReadyTime( now time.Time, ) { probe.stats.HttpResponseReadyTime = now @@ -500,122 +591,3 @@ func (probe *Probe) SetHttpResponseReadyTime( ) } } - -func getLatency( - ctx context.Context, - probe *Probe, - url string, - debugLevel debug.DebugLevel, -) utilities.MeasurementResult { - time_before_probe := time.Now() - probe_req, err := http.NewRequestWithContext( - httptrace.WithClientTrace(ctx, probe.GetTrace()), - "GET", - url, - nil, - ) - if err != nil { - return utilities.MeasurementResult{Delay: 0, MeasurementCount: 0, Err: err} - } - - probe_resp, err := probe.client.Do(probe_req) - if err != nil { - return utilities.MeasurementResult{Delay: 0, MeasurementCount: 0, Err: err} - } - - // TODO: Make this interruptable somehow by using _ctx_. - _, err = io.ReadAll(probe_resp.Body) - if err != nil { - return utilities.MeasurementResult{Delay: 0, Err: err} - } - time_after_probe := time.Now() - - // Depending on whether we think that Close() requires another RTT (via TCP), we - // may need to move this before/after capturing the after time. - probe_resp.Body.Close() - - sanity := time_after_probe.Sub(time_before_probe) - - tlsAndHttpHeaderDelta := probe.GetTLSAndHttpHeaderDelta() - httpDownloadDelta := probe.GetHttpDownloadDelta( - time_after_probe, - ) // Combined with above, constitutes 2 time measurements, per the Spec. - tcpDelta := probe.GetTCPDelta() // Constitutes 1 time measurement, per the Spec. - totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + tcpDelta - - // By default, assume that there was a reused connection which - // means that we only made 1 time measurement. - var measurementCount uint16 = 1 - if !probe.stats.ConnectionReused { - // If we did not reuse the connection, then we made three additional time measurements. - // See above for details on that calculation. - measurementCount = 3 - } - - if debug.IsDebug(debugLevel) { - fmt.Printf( - "(Probe %v) sanity vs total: %v vs %v\n", - probe.ProbeId(), - sanity, - totalDelay, - ) - } - return utilities.MeasurementResult{ - Delay: totalDelay, - MeasurementCount: measurementCount, - Err: nil, - } -} - -func CalculateProbeMeasurements( - ctx context.Context, - strict bool, - saturated_measurement_probe *Probe, - unsaturated_measurement_probe *Probe, - url string, - debugLevel debug.DebugLevel, -) chan utilities.MeasurementResult { - responseChannel := make(chan utilities.MeasurementResult) - go func() { - /* - * Depending on whether the user wants their measurements to be strict, we will - * measure on the LGC. - */ - var saturated_probe_latency utilities.MeasurementResult - if strict { - - if debug.IsDebug(debugLevel) { - fmt.Printf("Beginning saturated measurement probe.\n") - } - saturated_latency := getLatency(ctx, saturated_measurement_probe, url, debugLevel) - - if saturated_latency.Err != nil { - fmt.Printf("Error occurred getting the saturated measurement.\n") - responseChannel <- saturated_latency - return - } - } - - if debug.IsDebug(debugLevel) { - fmt.Printf("Beginning unsaturated measurement probe.\n") - } - unsaturated_probe_latency := getLatency(ctx, unsaturated_measurement_probe, url, debugLevel) - - if unsaturated_probe_latency.Err != nil { - fmt.Printf("Error occurred getting the unsaturated measurement.\n") - responseChannel <- unsaturated_probe_latency - return - } - - total_latency := unsaturated_probe_latency.Delay - total_measurement_count := unsaturated_probe_latency.MeasurementCount - - if strict { - total_latency += saturated_probe_latency.Delay - total_measurement_count += saturated_probe_latency.MeasurementCount - } - responseChannel <- utilities.MeasurementResult{Delay: total_latency, MeasurementCount: total_measurement_count, Err: nil} - return - }() - return responseChannel -} |
