diff options
Diffstat (limited to 'rpm/rpm.go')
| -rw-r--r-- | rpm/rpm.go | 296 |
1 files changed, 134 insertions, 162 deletions
@@ -38,30 +38,119 @@ func addFlows( } } -type SaturationResult struct { - RateBps float64 - LGCs []lgc.LoadGeneratingConnection +type ProbeConfiguration struct { + URL string } -func Saturate( - saturationCtx context.Context, +type DataPoint struct { + RoundTripCount uint64 + Duration time.Duration +} + +type LGDataCollectionResult struct { + RateBps float64 + LGCs []lgc.LoadGeneratingConnection + DataPoints []DataPoint +} + +func LGProbe(parentProbeCtx context.Context, connection lgc.LoadGeneratingConnection, lgProbeUrl string, result *chan DataPoint, debugging *debug.DebugWithPrefix) error { + probeCtx, _ := context.WithCancel(parentProbeCtx) + probeTracer := NewProbeTracer(connection.Client(), true, debugging) + time_before_probe := time.Now() + probe_req, err := http.NewRequestWithContext( + httptrace.WithClientTrace(probeCtx, probeTracer.trace), + "GET", + lgProbeUrl, + nil, + ) + if err != nil { + return err + } + + probe_resp, err := connection.Client().Do(probe_req) + if err != nil { + return err + } + + // TODO: Make this interruptable somehow by using _ctx_. + _, err = io.ReadAll(probe_resp.Body) + if err != nil { + return err + } + time_after_probe := time.Now() + + // Depending on whether we think that Close() requires another RTT (via TCP), we + // may need to move this before/after capturing the after time. + probe_resp.Body.Close() + + sanity := time_after_probe.Sub(time_before_probe) + + tlsAndHttpHeaderDelta := probeTracer.GetTLSAndHttpHeaderDelta() + httpDownloadDelta := probeTracer.GetHttpDownloadDelta( + time_after_probe, + ) // Combined with above, constitutes 2 time measurements, per the Spec. + tcpDelta := probeTracer.GetTCPDelta() // Constitutes 1 time measurement, per the Spec. + totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + tcpDelta + + // We must have reused the connection! + if !probeTracer.stats.ConnectionReused { + panic(!probeTracer.stats.ConnectionReused) + } + + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) (Probe %v) sanity vs total: %v vs %v\n", + debugging.Prefix, + probeTracer.ProbeId(), + sanity, + totalDelay, + ) + } + *result <- DataPoint{RoundTripCount: 1, Duration: totalDelay} + + return nil + ///////////// +} +func LGProber(proberCtx context.Context, defaultConnection lgc.LoadGeneratingConnection, altConnections *[]lgc.LoadGeneratingConnection, url string, interval time.Duration, debugging *debug.DebugWithPrefix) (points chan DataPoint) { + points = make(chan DataPoint) + + go func() { + for proberCtx.Err() == nil { + time.Sleep(interval) + if debug.IsDebug(debugging.Level) { + fmt.Printf("(%s) About to probe!\n", debugging.Prefix) + } + go LGProbe(proberCtx, defaultConnection, url, &points, debugging) + } + }() + + return +} + +func LGCollectData( + lgDataCollectionCtx context.Context, operatingCtx context.Context, lgcGenerator func() lgc.LoadGeneratingConnection, + lgProbeConfigurationGenerator func() ProbeConfiguration, debugging *debug.DebugWithPrefix, -) (saturated chan SaturationResult) { - saturated = make(chan SaturationResult) +) (resulted chan LGDataCollectionResult) { + resulted = make(chan LGDataCollectionResult) go func() { lgcs := make([]lgc.LoadGeneratingConnection, 0) addFlows( - saturationCtx, + lgDataCollectionCtx, constants.StartingNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level, ) + lgProbeConfiguration := lgProbeConfigurationGenerator() + + LGProber(lgDataCollectionCtx, lgcs[0], &lgcs, lgProbeConfiguration.URL, time.Duration(100*time.Millisecond), debugging) + previousFlowIncreaseInterval := uint64(0) previousMovingAverage := float64(0) @@ -84,13 +173,13 @@ func Saturate( for currentInterval := uint64(0); true; currentInterval++ { // When the program stops operating, then stop. - if saturationCtx.Err() != nil { + if lgDataCollectionCtx.Err() != nil { return } // We may be asked to stop trying to saturate the // network and return our current status. - if saturationCtx.Err() != nil { + if lgDataCollectionCtx.Err() != nil { //break } @@ -204,7 +293,7 @@ func Saturate( ) } addFlows( - saturationCtx, + lgDataCollectionCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, @@ -232,38 +321,39 @@ func Saturate( if debug.IsDebug(debugging.Level) { fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging) } - addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level) + addFlows(lgDataCollectionCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level) previousFlowIncreaseInterval = currentInterval } } } - saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs} + resulted <- LGDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs} }() return } -type Probe struct { +type ProbeTracer struct { client *http.Client stats *stats.TraceStats trace *httptrace.ClientTrace debug debug.DebugLevel probeid uint64 + isLG bool } -func (p *Probe) String() string { +func (p *ProbeTracer) String() string { return fmt.Sprintf("(Probe %v): stats: %v\n", p.probeid, p.stats) } -func (p *Probe) ProbeId() uint64 { +func (p *ProbeTracer) ProbeId() uint64 { return p.probeid } -func (p *Probe) GetTrace() *httptrace.ClientTrace { +func (p *ProbeTracer) GetTrace() *httptrace.ClientTrace { return p.trace } -func (p *Probe) GetDnsDelta() time.Duration { +func (p *ProbeTracer) GetDnsDelta() time.Duration { if p.stats.ConnectionReused { return time.Duration(0) } @@ -274,7 +364,7 @@ func (p *Probe) GetDnsDelta() time.Duration { return delta } -func (p *Probe) GetTCPDelta() time.Duration { +func (p *ProbeTracer) GetTCPDelta() time.Duration { if p.stats.ConnectionReused { return time.Duration(0) } @@ -285,7 +375,7 @@ func (p *Probe) GetTCPDelta() time.Duration { return delta } -func (p *Probe) GetTLSDelta() time.Duration { +func (p *ProbeTracer) GetTLSDelta() time.Duration { if utilities.IsSome(p.stats.TLSDoneTime) { panic("There should not be TLS information, but there is.") } @@ -296,7 +386,7 @@ func (p *Probe) GetTLSDelta() time.Duration { return delta } -func (p *Probe) GetTLSAndHttpHeaderDelta() time.Duration { +func (p *ProbeTracer) GetTLSAndHttpHeaderDelta() time.Duration { // Because the TLS handshake occurs *after* the TCP connection (ConnectDoneTime) // and *before* the HTTP transaction, we know that the delta between the time // that the first HTTP response byte is available and the time that the TCP @@ -318,7 +408,7 @@ func (p *Probe) GetTLSAndHttpHeaderDelta() time.Duration { return delta } -func (p *Probe) GetHttpHeaderDelta() time.Duration { +func (p *ProbeTracer) GetHttpHeaderDelta() time.Duration { panic( "Unusable until TLS tracing support is enabled! Use GetTLSAndHttpHeaderDelta() instead.\n", ) @@ -329,7 +419,7 @@ func (p *Probe) GetHttpHeaderDelta() time.Duration { return delta } -func (p *Probe) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration { +func (p *ProbeTracer) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration { delta := httpDoneTime.Sub(p.stats.HttpResponseReadyTime) if debug.IsDebug(p.debug) { fmt.Printf("(Probe %v): Http Download Time: %v\n", p.probeid, delta) @@ -337,21 +427,22 @@ func (p *Probe) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration { return delta } -func NewProbe(client *http.Client, debugLevel debug.DebugLevel) *Probe { - probe := &Probe{ +func NewProbeTracer(client *http.Client, isLG bool, debugging *debug.DebugWithPrefix) *ProbeTracer { + probe := &ProbeTracer{ client: client, stats: &stats.TraceStats{}, trace: nil, - debug: debugLevel, + debug: debugging.Level, probeid: utilities.GenerateConnectionId(), + isLG: isLG, } - trace := traceable.GenerateHttpTimingTracer(probe, debugLevel) + trace := traceable.GenerateHttpTimingTracer(probe, debugging.Level) probe.trace = trace return probe } -func (probe *Probe) SetDnsStartTimeInfo( +func (probe *ProbeTracer) SetDnsStartTimeInfo( now time.Time, dnsStartInfo httptrace.DNSStartInfo, ) { @@ -366,7 +457,7 @@ func (probe *Probe) SetDnsStartTimeInfo( } } -func (probe *Probe) SetDnsDoneTimeInfo( +func (probe *ProbeTracer) SetDnsDoneTimeInfo( now time.Time, dnsDoneInfo httptrace.DNSDoneInfo, ) { @@ -381,7 +472,7 @@ func (probe *Probe) SetDnsDoneTimeInfo( } } -func (probe *Probe) SetConnectStartTime( +func (probe *ProbeTracer) SetConnectStartTime( now time.Time, ) { probe.stats.ConnectStartTime = now @@ -394,7 +485,7 @@ func (probe *Probe) SetConnectStartTime( } } -func (probe *Probe) SetConnectDoneTimeError( +func (probe *ProbeTracer) SetConnectDoneTimeError( now time.Time, err error, ) { @@ -410,7 +501,7 @@ func (probe *Probe) SetConnectDoneTimeError( } } -func (probe *Probe) SetGetConnTime(now time.Time) { +func (probe *ProbeTracer) SetGetConnTime(now time.Time) { probe.stats.GetConnectionStartTime = now if debug.IsDebug(probe.debug) { fmt.Printf( @@ -421,21 +512,21 @@ func (probe *Probe) SetGetConnTime(now time.Time) { } } -func (probe *Probe) SetGotConnTimeInfo( +func (probe *ProbeTracer) SetGotConnTimeInfo( now time.Time, gotConnInfo httptrace.GotConnInfo, ) { probe.stats.GetConnectionDoneTime = now probe.stats.ConnInfo = gotConnInfo probe.stats.ConnectionReused = gotConnInfo.Reused + if probe.isLG && !gotConnInfo.Reused { + fmt.Fprintf(os.Stderr, "A probe sent on an LG Connection used a new connection!\n") + } else if debug.IsDebug(probe.debug) { + fmt.Printf("Properly reused a connection when probing on an LG Connection!\n") + } if debug.IsDebug(probe.debug) { - reusedString := "(new)" - if probe.stats.ConnectionReused { - reusedString = "(reused)" - } fmt.Printf( - "(Probe) Got %v connection for %v at %v with info %v\n", - reusedString, + "(Probe) Got a reused connection for %v at %v with info %v\n", probe.ProbeId(), probe.stats.GetConnectionDoneTime, probe.stats.ConnInfo, @@ -443,7 +534,7 @@ func (probe *Probe) SetGotConnTimeInfo( } } -func (probe *Probe) SetTLSHandshakeStartTime( +func (probe *ProbeTracer) SetTLSHandshakeStartTime( now time.Time, ) { probe.stats.TLSStartTime = utilities.Some(now) @@ -456,7 +547,7 @@ func (probe *Probe) SetTLSHandshakeStartTime( } } -func (probe *Probe) SetTLSHandshakeDoneTimeState( +func (probe *ProbeTracer) SetTLSHandshakeDoneTimeState( now time.Time, connectionState tls.ConnectionState, ) { @@ -472,7 +563,7 @@ func (probe *Probe) SetTLSHandshakeDoneTimeState( } } -func (probe *Probe) SetHttpWroteRequestTimeInfo( +func (probe *ProbeTracer) SetHttpWroteRequestTimeInfo( now time.Time, info httptrace.WroteRequestInfo, ) { @@ -488,7 +579,7 @@ func (probe *Probe) SetHttpWroteRequestTimeInfo( } } -func (probe *Probe) SetHttpResponseReadyTime( +func (probe *ProbeTracer) SetHttpResponseReadyTime( now time.Time, ) { probe.stats.HttpResponseReadyTime = now @@ -500,122 +591,3 @@ func (probe *Probe) SetHttpResponseReadyTime( ) } } - -func getLatency( - ctx context.Context, - probe *Probe, - url string, - debugLevel debug.DebugLevel, -) utilities.MeasurementResult { - time_before_probe := time.Now() - probe_req, err := http.NewRequestWithContext( - httptrace.WithClientTrace(ctx, probe.GetTrace()), - "GET", - url, - nil, - ) - if err != nil { - return utilities.MeasurementResult{Delay: 0, MeasurementCount: 0, Err: err} - } - - probe_resp, err := probe.client.Do(probe_req) - if err != nil { - return utilities.MeasurementResult{Delay: 0, MeasurementCount: 0, Err: err} - } - - // TODO: Make this interruptable somehow by using _ctx_. - _, err = io.ReadAll(probe_resp.Body) - if err != nil { - return utilities.MeasurementResult{Delay: 0, Err: err} - } - time_after_probe := time.Now() - - // Depending on whether we think that Close() requires another RTT (via TCP), we - // may need to move this before/after capturing the after time. - probe_resp.Body.Close() - - sanity := time_after_probe.Sub(time_before_probe) - - tlsAndHttpHeaderDelta := probe.GetTLSAndHttpHeaderDelta() - httpDownloadDelta := probe.GetHttpDownloadDelta( - time_after_probe, - ) // Combined with above, constitutes 2 time measurements, per the Spec. - tcpDelta := probe.GetTCPDelta() // Constitutes 1 time measurement, per the Spec. - totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + tcpDelta - - // By default, assume that there was a reused connection which - // means that we only made 1 time measurement. - var measurementCount uint16 = 1 - if !probe.stats.ConnectionReused { - // If we did not reuse the connection, then we made three additional time measurements. - // See above for details on that calculation. - measurementCount = 3 - } - - if debug.IsDebug(debugLevel) { - fmt.Printf( - "(Probe %v) sanity vs total: %v vs %v\n", - probe.ProbeId(), - sanity, - totalDelay, - ) - } - return utilities.MeasurementResult{ - Delay: totalDelay, - MeasurementCount: measurementCount, - Err: nil, - } -} - -func CalculateProbeMeasurements( - ctx context.Context, - strict bool, - saturated_measurement_probe *Probe, - unsaturated_measurement_probe *Probe, - url string, - debugLevel debug.DebugLevel, -) chan utilities.MeasurementResult { - responseChannel := make(chan utilities.MeasurementResult) - go func() { - /* - * Depending on whether the user wants their measurements to be strict, we will - * measure on the LGC. - */ - var saturated_probe_latency utilities.MeasurementResult - if strict { - - if debug.IsDebug(debugLevel) { - fmt.Printf("Beginning saturated measurement probe.\n") - } - saturated_latency := getLatency(ctx, saturated_measurement_probe, url, debugLevel) - - if saturated_latency.Err != nil { - fmt.Printf("Error occurred getting the saturated measurement.\n") - responseChannel <- saturated_latency - return - } - } - - if debug.IsDebug(debugLevel) { - fmt.Printf("Beginning unsaturated measurement probe.\n") - } - unsaturated_probe_latency := getLatency(ctx, unsaturated_measurement_probe, url, debugLevel) - - if unsaturated_probe_latency.Err != nil { - fmt.Printf("Error occurred getting the unsaturated measurement.\n") - responseChannel <- unsaturated_probe_latency - return - } - - total_latency := unsaturated_probe_latency.Delay - total_measurement_count := unsaturated_probe_latency.MeasurementCount - - if strict { - total_latency += saturated_probe_latency.Delay - total_measurement_count += saturated_probe_latency.MeasurementCount - } - responseChannel <- utilities.MeasurementResult{Delay: total_latency, MeasurementCount: total_measurement_count, Err: nil} - return - }() - return responseChannel -} |
