summaryrefslogtreecommitdiff
path: root/rpm/rpm.go
diff options
context:
space:
mode:
Diffstat (limited to 'rpm/rpm.go')
-rw-r--r--rpm/rpm.go404
1 files changed, 333 insertions, 71 deletions
diff --git a/rpm/rpm.go b/rpm/rpm.go
index e01e2e8..28496cd 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -7,14 +7,238 @@ import (
"io"
"net/http"
"net/http/httptrace"
+ "os"
"time"
+ "github.com/network-quality/goresponsiveness/constants"
"github.com/network-quality/goresponsiveness/debug"
+ "github.com/network-quality/goresponsiveness/lgc"
+ "github.com/network-quality/goresponsiveness/ma"
"github.com/network-quality/goresponsiveness/stats"
"github.com/network-quality/goresponsiveness/traceable"
"github.com/network-quality/goresponsiveness/utilities"
)
+func addFlows(
+ ctx context.Context,
+ toAdd uint64,
+ lgcs *[]lgc.LoadGeneratingConnection,
+ lgcGenerator func() lgc.LoadGeneratingConnection,
+ debug debug.DebugLevel,
+) {
+ for i := uint64(0); i < toAdd; i++ {
+ *lgcs = append(*lgcs, lgcGenerator())
+ if !(*lgcs)[len(*lgcs)-1].Start(ctx, debug) {
+ fmt.Printf(
+ "Error starting lgc with id %d!\n",
+ (*lgcs)[len(*lgcs)-1].ClientId(),
+ )
+ return
+ }
+ }
+}
+
+type SaturationResult struct {
+ RateBps float64
+ LGCs []lgc.LoadGeneratingConnection
+}
+
+func Saturate(
+ saturationCtx context.Context,
+ operatingCtx context.Context,
+ lgcGenerator func() lgc.LoadGeneratingConnection,
+ debugging *debug.DebugWithPrefix,
+) (saturated chan SaturationResult) {
+ saturated = make(chan SaturationResult)
+ go func() {
+
+ lgcs := make([]lgc.LoadGeneratingConnection, 0)
+
+ addFlows(
+ saturationCtx,
+ constants.StartingNumberOfLoadGeneratingConnections,
+ &lgcs,
+ lgcGenerator,
+ debugging.Level,
+ )
+
+ previousFlowIncreaseInterval := uint64(0)
+ previousMovingAverage := float64(0)
+
+ // The moving average will contain the average for the last
+ // constants.MovingAverageIntervalCount throughputs.
+ // ie, ma[i] = (throughput[i-3] + throughput[i-2] + throughput[i-1] + throughput[i])/4
+ movingAverage := ma.NewMovingAverage(
+ constants.MovingAverageIntervalCount,
+ )
+
+ // The moving average average will be the average of the last
+ // constants.MovingAverageIntervalCount moving averages.
+ // ie, maa[i] = (ma[i-3] + ma[i-2] + ma[i-1] + ma[i])/4
+ movingAverageAverage := ma.NewMovingAverage(
+ constants.MovingAverageIntervalCount,
+ )
+
+ nextSampleStartTime := time.Now().Add(time.Second)
+
+ for currentInterval := uint64(0); true; currentInterval++ {
+
+ // When the program stops operating, then stop.
+ if saturationCtx.Err() != nil {
+ return
+ }
+
+ // We may be asked to stop trying to saturate the
+ // network and return our current status.
+ if saturationCtx.Err() != nil {
+ //break
+ }
+
+ now := time.Now()
+ // At each 1-second interval
+ if nextSampleStartTime.Sub(now) > 0 {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: Sleeping until %v\n",
+ debugging,
+ nextSampleStartTime,
+ )
+ }
+ time.Sleep(nextSampleStartTime.Sub(now))
+ } else {
+ fmt.Fprintf(os.Stderr, "Warning: Missed a one-second deadline.\n")
+ }
+ nextSampleStartTime = time.Now().Add(time.Second)
+
+ // Compute "instantaneous aggregate" goodput which is the number of
+ // bytes transferred within the last second.
+ var totalTransfer float64 = 0
+ allInvalid := true
+ for i := range lgcs {
+ if !lgcs[i].IsValid() {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: Load-generating connection with id %d is invalid ... skipping.\n",
+ debugging,
+ lgcs[i].ClientId(),
+ )
+ }
+ continue
+ }
+ allInvalid = false
+ currentTransferred, currentInterval := lgcs[i].TransferredInInterval()
+ // normalize to a second-long interval!
+ instantaneousTransferred := float64(currentTransferred) / float64(currentInterval.Seconds())
+ totalTransfer += instantaneousTransferred
+ }
+
+ // For some reason, all the lgcs are invalid. This likely means that
+ // the network/server went away.
+ if allInvalid {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: All lgcs were invalid. Assuming that network/server went away.\n",
+ debugging,
+ )
+ }
+ break
+ }
+
+ // Compute a moving average of the last
+ // constants.MovingAverageIntervalCount "instantaneous aggregate
+ // goodput" measurements
+ movingAverage.AddMeasurement(float64(totalTransfer))
+ currentMovingAverage := movingAverage.CalculateAverage()
+ movingAverageAverage.AddMeasurement(currentMovingAverage)
+ movingAverageDelta := utilities.SignedPercentDifference(
+ currentMovingAverage,
+ previousMovingAverage,
+ )
+
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: Instantaneous goodput: %f MB.\n",
+ debugging,
+ utilities.ToMBps(float64(totalTransfer)),
+ )
+ fmt.Printf(
+ "%v: Previous moving average: %f MB.\n",
+ debugging,
+ utilities.ToMBps(previousMovingAverage),
+ )
+ fmt.Printf(
+ "%v: Current moving average: %f MB.\n",
+ debugging,
+ utilities.ToMBps(currentMovingAverage),
+ )
+ fmt.Printf(
+ "%v: Moving average delta: %f.\n",
+ debugging,
+ movingAverageDelta,
+ )
+ }
+
+ previousMovingAverage = currentMovingAverage
+
+ intervalsSinceLastFlowIncrease := currentInterval - previousFlowIncreaseInterval
+
+ // Special case: We won't make any adjustments on the first
+ // iteration.
+ if currentInterval == 0 {
+ continue
+ }
+
+ // If moving average > "previous" moving average + InstabilityDelta:
+ if movingAverageDelta > constants.InstabilityDelta {
+ // Network did not yet reach saturation. If no flows added
+ // within the last 4 seconds, add 4 more flows
+ if intervalsSinceLastFlowIncrease > constants.MovingAverageStabilitySpan {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: Adding flows because we are unsaturated and waited a while.\n",
+ debugging,
+ )
+ }
+ addFlows(
+ saturationCtx,
+ constants.AdditiveNumberOfLoadGeneratingConnections,
+ &lgcs,
+ lgcGenerator,
+ debugging.Level,
+ )
+ previousFlowIncreaseInterval = currentInterval
+ } else {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debugging)
+ }
+ }
+ } else { // Else, network reached saturation for the current flow count.
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf("%v: Network reached saturation with current flow count.\n", debugging)
+ }
+ // If new flows added and for 4 seconds the moving average
+ // throughput did not change: network reached stable saturation
+ if intervalsSinceLastFlowIncrease < constants.MovingAverageStabilitySpan && movingAverageAverage.AllSequentialIncreasesLessThan(constants.InstabilityDelta) {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging)
+ }
+ break
+ } else {
+ // Else, add four more flows
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging)
+ }
+ addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level)
+ previousFlowIncreaseInterval = currentInterval
+ }
+ }
+
+ }
+ saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs}
+ }()
+ return
+}
+
type Probe struct {
client *http.Client
stats *stats.TraceStats
@@ -36,6 +260,9 @@ func (p *Probe) GetTrace() *httptrace.ClientTrace {
}
func (p *Probe) GetDnsDelta() time.Duration {
+ if p.stats.ConnectionReused {
+ return time.Duration(0)
+ }
delta := p.stats.DnsDoneTime.Sub(p.stats.DnsStartTime)
if debug.IsDebug(p.debug) {
fmt.Printf("(Probe %v): DNS Time: %v\n", p.probeid, delta)
@@ -44,6 +271,9 @@ func (p *Probe) GetDnsDelta() time.Duration {
}
func (p *Probe) GetTCPDelta() time.Duration {
+ if p.stats.ConnectionReused {
+ return time.Duration(0)
+ }
delta := p.stats.ConnectDoneTime.Sub(p.stats.ConnectStartTime)
if debug.IsDebug(p.debug) {
fmt.Printf("(Probe %v): TCP Connection Time: %v\n", p.probeid, delta)
@@ -70,7 +300,14 @@ func (p *Probe) GetTLSAndHttpHeaderDelta() time.Duration {
// *and* the TLS handshake RTT, whether we can specifically measure the latter
// or not. Eventually when TLS handshake tracing is fixed, we can break these
// into separate buckets, but for now this workaround is reasonable.
- delta := p.stats.HttpResponseReadyTime.Sub(p.stats.ConnectDoneTime)
+ before := p.stats.ConnectDoneTime
+ if p.stats.ConnectionReused {
+ // When we reuse a connection there will be no time logged for when the
+ // TCP connection was established (obviously). So, fall back to the time
+ // when we were notified about reusing a connection (as a close approximation!).
+ before = p.stats.GetConnectionDoneTime
+ }
+ delta := p.stats.HttpResponseReadyTime.Sub(before)
if debug.IsDebug(p.debug) {
fmt.Printf("(Probe %v): Http TLS and Header Time: %v\n", p.probeid, delta)
}
@@ -184,9 +421,15 @@ func (probe *Probe) SetGotConnTimeInfo(
) {
probe.stats.GetConnectionDoneTime = now
probe.stats.ConnInfo = gotConnInfo
+ probe.stats.ConnectionReused = gotConnInfo.Reused
if debug.IsDebug(probe.debug) {
+ reusedString := "(new)"
+ if probe.stats.ConnectionReused {
+ reusedString = "(reused)"
+ }
fmt.Printf(
- "(Probe) Got connection for %v at %v with info %v\n",
+ "(Probe) Got %v connection for %v at %v with info %v\n",
+ reusedString,
probe.ProbeId(),
probe.stats.GetConnectionDoneTime,
probe.stats.ConnInfo,
@@ -252,91 +495,110 @@ func (probe *Probe) SetHttpResponseReadyTime(
}
}
-func CalculateSequentialRTTsTime(
+func getLatency(ctx context.Context, probe *Probe, url string, debugLevel debug.DebugLevel) utilities.MeasurementResult {
+ time_before_probe := time.Now()
+ probe_req, err := http.NewRequestWithContext(
+ httptrace.WithClientTrace(ctx, probe.GetTrace()),
+ "GET",
+ url,
+ nil,
+ )
+ if err != nil {
+ return utilities.MeasurementResult{Delay: 0, MeasurementCount: 0, Err: err}
+ }
+
+ probe_resp, err := probe.client.Do(probe_req)
+ if err != nil {
+ return utilities.MeasurementResult{Delay: 0, MeasurementCount: 0, Err: err}
+ }
+
+ // TODO: Make this interruptable somehow by using _ctx_.
+ _, err = io.ReadAll(probe_resp.Body)
+ if err != nil {
+ return utilities.MeasurementResult{Delay: 0, Err: err}
+ }
+ time_after_probe := time.Now()
+
+ // Depending on whether we think that Close() requires another RTT (via TCP), we
+ // may need to move this before/after capturing the after time.
+ probe_resp.Body.Close()
+
+ sanity := time_after_probe.Sub(time_before_probe)
+
+ tlsAndHttpHeaderDelta := probe.GetTLSAndHttpHeaderDelta()
+ httpDownloadDelta := probe.GetHttpDownloadDelta(time_after_probe) // Combined with above, constitutes 2 time measurements, per the Spec.
+ tcpDelta := probe.GetTCPDelta() // Constitutes 1 time measurement, per the Spec.
+ totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + tcpDelta
+
+ // By default, assume that there was a reused connection which
+ // means that we only made 1 time measurement.
+ var measurementCount uint16 = 1
+ if !probe.stats.ConnectionReused {
+ // If we did not reuse the connection, then we made three additional time measurements.
+ // See above for details on that calculation.
+ measurementCount = 3
+ }
+
+ if debug.IsDebug(debugLevel) {
+ fmt.Printf(
+ "(Probe %v) sanity vs total: %v vs %v\n",
+ probe.ProbeId(),
+ sanity,
+ totalDelay,
+ )
+ }
+ return utilities.MeasurementResult{Delay: totalDelay, MeasurementCount: measurementCount, Err: nil}
+}
+
+func CalculateProbeMeasurements(
ctx context.Context,
- saturated_rtt_probe *Probe,
- new_rtt_probe *Probe,
+ strict bool,
+ saturated_measurement_probe *Probe,
+ unsaturated_measurement_probe *Probe,
url string,
debugLevel debug.DebugLevel,
-) chan utilities.GetLatency {
- responseChannel := make(chan utilities.GetLatency)
+) chan utilities.MeasurementResult {
+ responseChannel := make(chan utilities.MeasurementResult)
go func() {
- before := time.Now()
- roundTripCount := uint16(0)
/*
- TODO: We are not going to measure round-trip times on the load-generating connection
- right now because we are dealing with a massive amount of buffer bloat on the
- Apple CDN.
+ * Depending on whether the user wants their measurements to be strict, we will
+ * measure on the LGC.
+ */
+ var saturated_probe_latency utilities.MeasurementResult
+ if strict {
- TODO: When this functionality is enabled, we may need to change the assertion in
- the GotConn callback in the Traceable interface in traceable.go because a connection
- will be reused in that case. If such a situation does come to pass, we will want to
- move that assertion in to the various Traceable interface implementations that continue
- to rely on this assertion.
+ if debug.IsDebug(debugLevel) {
+ fmt.Printf("Beginning saturated measurement probe.\n")
+ }
+ saturated_latency := getLatency(ctx, saturated_measurement_probe, url, debugLevel)
- c_a, err := saturated_client.Get(url)
- if err != nil {
- responseChannel <- GetLatency{Delay: 0, RTTs: 0, Err: err}
- return
- }
- // TODO: Make this interruptable somehow
- // by using _ctx_.
- _, err = io.ReadAll(c_a.Body)
- if err != nil {
- responseChannel <- GetLatency{Delay: 0, RTTs: 0, Err: err}
- return
- }
- roundTripCount += 5
- c_a.Body.Close()
- */
- c_b_req, err := http.NewRequestWithContext(
- httptrace.WithClientTrace(ctx, new_rtt_probe.GetTrace()),
- "GET",
- url,
- nil,
- )
- if err != nil {
- responseChannel <- utilities.GetLatency{Delay: 0, RoundTripCount: 0, Err: err}
- return
+ if saturated_latency.Err != nil {
+ fmt.Printf("Error occurred getting the saturated measurement.\n")
+ responseChannel <- saturated_latency
+ return
+ }
}
- c_b, err := new_rtt_probe.client.Do(c_b_req)
- if err != nil {
- responseChannel <- utilities.GetLatency{Delay: 0, RoundTripCount: 0, Err: err}
- return
+ if debug.IsDebug(debugLevel) {
+ fmt.Printf("Beginning unsaturated measurement probe.\n")
}
+ unsaturated_probe_latency := getLatency(ctx, unsaturated_measurement_probe, url, debugLevel)
- // TODO: Make this interruptable somehow by using _ctx_.
- _, err = io.ReadAll(c_b.Body)
- if err != nil {
- responseChannel <- utilities.GetLatency{Delay: 0, Err: err}
+ if unsaturated_probe_latency.Err != nil {
+ fmt.Printf("Error occurred getting the unsaturated measurement.\n")
+ responseChannel <- unsaturated_probe_latency
return
}
- after := time.Now()
-
- // Depending on whether we think that Close() requires another RTT (via TCP), we
- // may need to move this before/after capturing the after time.
- c_b.Body.Close()
-
- sanity := after.Sub(before)
- tlsAndHttpHeaderDelta := new_rtt_probe.GetTLSAndHttpHeaderDelta() // Constitutes 2 RTT, per the Spec.
- httpDownloadDelta := new_rtt_probe.GetHttpDownloadDelta(after) // Constitutes 1 RTT, per the Spec.
- dnsDelta := new_rtt_probe.GetDnsDelta() // Constitutes 1 RTT, per the Spec.
- tcpDelta := new_rtt_probe.GetTCPDelta() // Constitutes 1 RTT, per the Spec.
- totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + dnsDelta + tcpDelta
+ total_latency := unsaturated_probe_latency.Delay
+ total_measurement_count := unsaturated_probe_latency.MeasurementCount
- if debug.IsDebug(debugLevel) {
- fmt.Printf(
- "(Probe %v) sanity vs total: %v vs %v\n",
- new_rtt_probe.ProbeId(),
- sanity,
- totalDelay,
- )
+ if strict {
+ total_latency += saturated_probe_latency.Delay
+ total_measurement_count += saturated_probe_latency.MeasurementCount
}
-
- roundTripCount += 5 // According to addition, there are 5 RTTs that we measured.
- responseChannel <- utilities.GetLatency{Delay: totalDelay, RoundTripCount: roundTripCount, Err: nil}
+ responseChannel <- utilities.MeasurementResult{Delay: total_latency, MeasurementCount: total_measurement_count, Err: nil}
+ return
}()
return responseChannel
}