diff options
Diffstat (limited to 'rpm/rpm.go')
| -rw-r--r-- | rpm/rpm.go | 404 |
1 files changed, 333 insertions, 71 deletions
@@ -7,14 +7,238 @@ import ( "io" "net/http" "net/http/httptrace" + "os" "time" + "github.com/network-quality/goresponsiveness/constants" "github.com/network-quality/goresponsiveness/debug" + "github.com/network-quality/goresponsiveness/lgc" + "github.com/network-quality/goresponsiveness/ma" "github.com/network-quality/goresponsiveness/stats" "github.com/network-quality/goresponsiveness/traceable" "github.com/network-quality/goresponsiveness/utilities" ) +func addFlows( + ctx context.Context, + toAdd uint64, + lgcs *[]lgc.LoadGeneratingConnection, + lgcGenerator func() lgc.LoadGeneratingConnection, + debug debug.DebugLevel, +) { + for i := uint64(0); i < toAdd; i++ { + *lgcs = append(*lgcs, lgcGenerator()) + if !(*lgcs)[len(*lgcs)-1].Start(ctx, debug) { + fmt.Printf( + "Error starting lgc with id %d!\n", + (*lgcs)[len(*lgcs)-1].ClientId(), + ) + return + } + } +} + +type SaturationResult struct { + RateBps float64 + LGCs []lgc.LoadGeneratingConnection +} + +func Saturate( + saturationCtx context.Context, + operatingCtx context.Context, + lgcGenerator func() lgc.LoadGeneratingConnection, + debugging *debug.DebugWithPrefix, +) (saturated chan SaturationResult) { + saturated = make(chan SaturationResult) + go func() { + + lgcs := make([]lgc.LoadGeneratingConnection, 0) + + addFlows( + saturationCtx, + constants.StartingNumberOfLoadGeneratingConnections, + &lgcs, + lgcGenerator, + debugging.Level, + ) + + previousFlowIncreaseInterval := uint64(0) + previousMovingAverage := float64(0) + + // The moving average will contain the average for the last + // constants.MovingAverageIntervalCount throughputs. + // ie, ma[i] = (throughput[i-3] + throughput[i-2] + throughput[i-1] + throughput[i])/4 + movingAverage := ma.NewMovingAverage( + constants.MovingAverageIntervalCount, + ) + + // The moving average average will be the average of the last + // constants.MovingAverageIntervalCount moving averages. + // ie, maa[i] = (ma[i-3] + ma[i-2] + ma[i-1] + ma[i])/4 + movingAverageAverage := ma.NewMovingAverage( + constants.MovingAverageIntervalCount, + ) + + nextSampleStartTime := time.Now().Add(time.Second) + + for currentInterval := uint64(0); true; currentInterval++ { + + // When the program stops operating, then stop. + if saturationCtx.Err() != nil { + return + } + + // We may be asked to stop trying to saturate the + // network and return our current status. + if saturationCtx.Err() != nil { + //break + } + + now := time.Now() + // At each 1-second interval + if nextSampleStartTime.Sub(now) > 0 { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Sleeping until %v\n", + debugging, + nextSampleStartTime, + ) + } + time.Sleep(nextSampleStartTime.Sub(now)) + } else { + fmt.Fprintf(os.Stderr, "Warning: Missed a one-second deadline.\n") + } + nextSampleStartTime = time.Now().Add(time.Second) + + // Compute "instantaneous aggregate" goodput which is the number of + // bytes transferred within the last second. + var totalTransfer float64 = 0 + allInvalid := true + for i := range lgcs { + if !lgcs[i].IsValid() { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Load-generating connection with id %d is invalid ... skipping.\n", + debugging, + lgcs[i].ClientId(), + ) + } + continue + } + allInvalid = false + currentTransferred, currentInterval := lgcs[i].TransferredInInterval() + // normalize to a second-long interval! + instantaneousTransferred := float64(currentTransferred) / float64(currentInterval.Seconds()) + totalTransfer += instantaneousTransferred + } + + // For some reason, all the lgcs are invalid. This likely means that + // the network/server went away. + if allInvalid { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: All lgcs were invalid. Assuming that network/server went away.\n", + debugging, + ) + } + break + } + + // Compute a moving average of the last + // constants.MovingAverageIntervalCount "instantaneous aggregate + // goodput" measurements + movingAverage.AddMeasurement(float64(totalTransfer)) + currentMovingAverage := movingAverage.CalculateAverage() + movingAverageAverage.AddMeasurement(currentMovingAverage) + movingAverageDelta := utilities.SignedPercentDifference( + currentMovingAverage, + previousMovingAverage, + ) + + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Instantaneous goodput: %f MB.\n", + debugging, + utilities.ToMBps(float64(totalTransfer)), + ) + fmt.Printf( + "%v: Previous moving average: %f MB.\n", + debugging, + utilities.ToMBps(previousMovingAverage), + ) + fmt.Printf( + "%v: Current moving average: %f MB.\n", + debugging, + utilities.ToMBps(currentMovingAverage), + ) + fmt.Printf( + "%v: Moving average delta: %f.\n", + debugging, + movingAverageDelta, + ) + } + + previousMovingAverage = currentMovingAverage + + intervalsSinceLastFlowIncrease := currentInterval - previousFlowIncreaseInterval + + // Special case: We won't make any adjustments on the first + // iteration. + if currentInterval == 0 { + continue + } + + // If moving average > "previous" moving average + InstabilityDelta: + if movingAverageDelta > constants.InstabilityDelta { + // Network did not yet reach saturation. If no flows added + // within the last 4 seconds, add 4 more flows + if intervalsSinceLastFlowIncrease > constants.MovingAverageStabilitySpan { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Adding flows because we are unsaturated and waited a while.\n", + debugging, + ) + } + addFlows( + saturationCtx, + constants.AdditiveNumberOfLoadGeneratingConnections, + &lgcs, + lgcGenerator, + debugging.Level, + ) + previousFlowIncreaseInterval = currentInterval + } else { + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debugging) + } + } + } else { // Else, network reached saturation for the current flow count. + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: Network reached saturation with current flow count.\n", debugging) + } + // If new flows added and for 4 seconds the moving average + // throughput did not change: network reached stable saturation + if intervalsSinceLastFlowIncrease < constants.MovingAverageStabilitySpan && movingAverageAverage.AllSequentialIncreasesLessThan(constants.InstabilityDelta) { + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging) + } + break + } else { + // Else, add four more flows + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging) + } + addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level) + previousFlowIncreaseInterval = currentInterval + } + } + + } + saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs} + }() + return +} + type Probe struct { client *http.Client stats *stats.TraceStats @@ -36,6 +260,9 @@ func (p *Probe) GetTrace() *httptrace.ClientTrace { } func (p *Probe) GetDnsDelta() time.Duration { + if p.stats.ConnectionReused { + return time.Duration(0) + } delta := p.stats.DnsDoneTime.Sub(p.stats.DnsStartTime) if debug.IsDebug(p.debug) { fmt.Printf("(Probe %v): DNS Time: %v\n", p.probeid, delta) @@ -44,6 +271,9 @@ func (p *Probe) GetDnsDelta() time.Duration { } func (p *Probe) GetTCPDelta() time.Duration { + if p.stats.ConnectionReused { + return time.Duration(0) + } delta := p.stats.ConnectDoneTime.Sub(p.stats.ConnectStartTime) if debug.IsDebug(p.debug) { fmt.Printf("(Probe %v): TCP Connection Time: %v\n", p.probeid, delta) @@ -70,7 +300,14 @@ func (p *Probe) GetTLSAndHttpHeaderDelta() time.Duration { // *and* the TLS handshake RTT, whether we can specifically measure the latter // or not. Eventually when TLS handshake tracing is fixed, we can break these // into separate buckets, but for now this workaround is reasonable. - delta := p.stats.HttpResponseReadyTime.Sub(p.stats.ConnectDoneTime) + before := p.stats.ConnectDoneTime + if p.stats.ConnectionReused { + // When we reuse a connection there will be no time logged for when the + // TCP connection was established (obviously). So, fall back to the time + // when we were notified about reusing a connection (as a close approximation!). + before = p.stats.GetConnectionDoneTime + } + delta := p.stats.HttpResponseReadyTime.Sub(before) if debug.IsDebug(p.debug) { fmt.Printf("(Probe %v): Http TLS and Header Time: %v\n", p.probeid, delta) } @@ -184,9 +421,15 @@ func (probe *Probe) SetGotConnTimeInfo( ) { probe.stats.GetConnectionDoneTime = now probe.stats.ConnInfo = gotConnInfo + probe.stats.ConnectionReused = gotConnInfo.Reused if debug.IsDebug(probe.debug) { + reusedString := "(new)" + if probe.stats.ConnectionReused { + reusedString = "(reused)" + } fmt.Printf( - "(Probe) Got connection for %v at %v with info %v\n", + "(Probe) Got %v connection for %v at %v with info %v\n", + reusedString, probe.ProbeId(), probe.stats.GetConnectionDoneTime, probe.stats.ConnInfo, @@ -252,91 +495,110 @@ func (probe *Probe) SetHttpResponseReadyTime( } } -func CalculateSequentialRTTsTime( +func getLatency(ctx context.Context, probe *Probe, url string, debugLevel debug.DebugLevel) utilities.MeasurementResult { + time_before_probe := time.Now() + probe_req, err := http.NewRequestWithContext( + httptrace.WithClientTrace(ctx, probe.GetTrace()), + "GET", + url, + nil, + ) + if err != nil { + return utilities.MeasurementResult{Delay: 0, MeasurementCount: 0, Err: err} + } + + probe_resp, err := probe.client.Do(probe_req) + if err != nil { + return utilities.MeasurementResult{Delay: 0, MeasurementCount: 0, Err: err} + } + + // TODO: Make this interruptable somehow by using _ctx_. + _, err = io.ReadAll(probe_resp.Body) + if err != nil { + return utilities.MeasurementResult{Delay: 0, Err: err} + } + time_after_probe := time.Now() + + // Depending on whether we think that Close() requires another RTT (via TCP), we + // may need to move this before/after capturing the after time. + probe_resp.Body.Close() + + sanity := time_after_probe.Sub(time_before_probe) + + tlsAndHttpHeaderDelta := probe.GetTLSAndHttpHeaderDelta() + httpDownloadDelta := probe.GetHttpDownloadDelta(time_after_probe) // Combined with above, constitutes 2 time measurements, per the Spec. + tcpDelta := probe.GetTCPDelta() // Constitutes 1 time measurement, per the Spec. + totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + tcpDelta + + // By default, assume that there was a reused connection which + // means that we only made 1 time measurement. + var measurementCount uint16 = 1 + if !probe.stats.ConnectionReused { + // If we did not reuse the connection, then we made three additional time measurements. + // See above for details on that calculation. + measurementCount = 3 + } + + if debug.IsDebug(debugLevel) { + fmt.Printf( + "(Probe %v) sanity vs total: %v vs %v\n", + probe.ProbeId(), + sanity, + totalDelay, + ) + } + return utilities.MeasurementResult{Delay: totalDelay, MeasurementCount: measurementCount, Err: nil} +} + +func CalculateProbeMeasurements( ctx context.Context, - saturated_rtt_probe *Probe, - new_rtt_probe *Probe, + strict bool, + saturated_measurement_probe *Probe, + unsaturated_measurement_probe *Probe, url string, debugLevel debug.DebugLevel, -) chan utilities.GetLatency { - responseChannel := make(chan utilities.GetLatency) +) chan utilities.MeasurementResult { + responseChannel := make(chan utilities.MeasurementResult) go func() { - before := time.Now() - roundTripCount := uint16(0) /* - TODO: We are not going to measure round-trip times on the load-generating connection - right now because we are dealing with a massive amount of buffer bloat on the - Apple CDN. + * Depending on whether the user wants their measurements to be strict, we will + * measure on the LGC. + */ + var saturated_probe_latency utilities.MeasurementResult + if strict { - TODO: When this functionality is enabled, we may need to change the assertion in - the GotConn callback in the Traceable interface in traceable.go because a connection - will be reused in that case. If such a situation does come to pass, we will want to - move that assertion in to the various Traceable interface implementations that continue - to rely on this assertion. + if debug.IsDebug(debugLevel) { + fmt.Printf("Beginning saturated measurement probe.\n") + } + saturated_latency := getLatency(ctx, saturated_measurement_probe, url, debugLevel) - c_a, err := saturated_client.Get(url) - if err != nil { - responseChannel <- GetLatency{Delay: 0, RTTs: 0, Err: err} - return - } - // TODO: Make this interruptable somehow - // by using _ctx_. - _, err = io.ReadAll(c_a.Body) - if err != nil { - responseChannel <- GetLatency{Delay: 0, RTTs: 0, Err: err} - return - } - roundTripCount += 5 - c_a.Body.Close() - */ - c_b_req, err := http.NewRequestWithContext( - httptrace.WithClientTrace(ctx, new_rtt_probe.GetTrace()), - "GET", - url, - nil, - ) - if err != nil { - responseChannel <- utilities.GetLatency{Delay: 0, RoundTripCount: 0, Err: err} - return + if saturated_latency.Err != nil { + fmt.Printf("Error occurred getting the saturated measurement.\n") + responseChannel <- saturated_latency + return + } } - c_b, err := new_rtt_probe.client.Do(c_b_req) - if err != nil { - responseChannel <- utilities.GetLatency{Delay: 0, RoundTripCount: 0, Err: err} - return + if debug.IsDebug(debugLevel) { + fmt.Printf("Beginning unsaturated measurement probe.\n") } + unsaturated_probe_latency := getLatency(ctx, unsaturated_measurement_probe, url, debugLevel) - // TODO: Make this interruptable somehow by using _ctx_. - _, err = io.ReadAll(c_b.Body) - if err != nil { - responseChannel <- utilities.GetLatency{Delay: 0, Err: err} + if unsaturated_probe_latency.Err != nil { + fmt.Printf("Error occurred getting the unsaturated measurement.\n") + responseChannel <- unsaturated_probe_latency return } - after := time.Now() - - // Depending on whether we think that Close() requires another RTT (via TCP), we - // may need to move this before/after capturing the after time. - c_b.Body.Close() - - sanity := after.Sub(before) - tlsAndHttpHeaderDelta := new_rtt_probe.GetTLSAndHttpHeaderDelta() // Constitutes 2 RTT, per the Spec. - httpDownloadDelta := new_rtt_probe.GetHttpDownloadDelta(after) // Constitutes 1 RTT, per the Spec. - dnsDelta := new_rtt_probe.GetDnsDelta() // Constitutes 1 RTT, per the Spec. - tcpDelta := new_rtt_probe.GetTCPDelta() // Constitutes 1 RTT, per the Spec. - totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + dnsDelta + tcpDelta + total_latency := unsaturated_probe_latency.Delay + total_measurement_count := unsaturated_probe_latency.MeasurementCount - if debug.IsDebug(debugLevel) { - fmt.Printf( - "(Probe %v) sanity vs total: %v vs %v\n", - new_rtt_probe.ProbeId(), - sanity, - totalDelay, - ) + if strict { + total_latency += saturated_probe_latency.Delay + total_measurement_count += saturated_probe_latency.MeasurementCount } - - roundTripCount += 5 // According to addition, there are 5 RTTs that we measured. - responseChannel <- utilities.GetLatency{Delay: totalDelay, RoundTripCount: roundTripCount, Err: nil} + responseChannel <- utilities.MeasurementResult{Delay: total_latency, MeasurementCount: total_measurement_count, Err: nil} + return }() return responseChannel } |
