diff options
| -rw-r--r-- | constants/constants.go | 2 | ||||
| -rw-r--r-- | rpm/rpm.go | 57 |
2 files changed, 34 insertions, 25 deletions
diff --git a/constants/constants.go b/constants/constants.go index 147b643..a015f14 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -9,7 +9,7 @@ var ( // calculation. MovingAverageIntervalCount int = 4 // The number of intervals across which to consider a moving average stable. - MovingAverageStabilitySpan int = 4 + MovingAverageStabilitySpan uint64 = 4 // The number of connections to add to a LBC when unsaturated. AdditiveNumberOfLoadGeneratingConnections uint64 = 4 // The cutoff of the percent difference that defines instability. @@ -66,18 +66,26 @@ func Saturate( debugging.Level, ) - previousFlowIncreaseIteration := uint64(0) + previousFlowIncreaseInterval := uint64(0) previousMovingAverage := float64(0) + + // The moving average will contain the average for the last + // constants.MovingAverageIntervalCount throughputs. + // ie, ma[i] = (throughput[i-3] + throughput[i-2] + throughput[i-1] + throughput[i])/4 movingAverage := ma.NewMovingAverage( constants.MovingAverageIntervalCount, ) + + // The moving average average will be the average of the last + // constants.MovingAverageIntervalCount moving averages. + // ie, maa[i] = (ma[i-3] + ma[i-2] + ma[i-1] + ma[i])/4 movingAverageAverage := ma.NewMovingAverage( constants.MovingAverageIntervalCount, ) nextSampleStartTime := time.Now().Add(time.Second) - for currentIteration := uint64(0); true; currentIteration++ { + for currentInterval := uint64(0); true; currentInterval++ { // When the program stops operating, then stop. if saturationCtx.Err() != nil { @@ -124,7 +132,8 @@ func Saturate( allInvalid = false previousTransferred := lgcsPreviousTransferred[i] currentTransferred := lgcs[i].Transferred() - totalTransfer += (currentTransferred - previousTransferred) + instantaneousTransferred := currentTransferred - previousTransferred + totalTransfer += instantaneousTransferred lgcsPreviousTransferred[i] = currentTransferred } @@ -176,9 +185,11 @@ func Saturate( previousMovingAverage = currentMovingAverage + intervalsSinceLastFlowIncrease := currentInterval - previousFlowIncreaseInterval + // Special case: We won't make any adjustments on the first // iteration. - if currentIteration == 0 { + if currentInterval == 0 { continue } @@ -186,9 +197,7 @@ func Saturate( if movingAverageDelta > constants.InstabilityDelta { // Network did not yet reach saturation. If no flows added // within the last 4 seconds, add 4 more flows - if (currentIteration - previousFlowIncreaseIteration) > uint64( - constants.MovingAverageStabilitySpan, - ) { + if intervalsSinceLastFlowIncrease > constants.MovingAverageStabilitySpan { if debug.IsDebug(debugging.Level) { fmt.Printf( "%v: Adding flows because we are unsaturated and waited a while.\n", @@ -203,7 +212,7 @@ func Saturate( lgcGenerator, debugging.Level, ) - previousFlowIncreaseIteration = currentIteration + previousFlowIncreaseInterval = currentInterval } else { if debug.IsDebug(debugging.Level) { fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debugging) @@ -215,7 +224,7 @@ func Saturate( } // If new flows added and for 4 seconds the moving average // throughput did not change: network reached stable saturation - if (currentIteration-previousFlowIncreaseIteration) < uint64(constants.MovingAverageStabilitySpan) && movingAverageAverage.AllSequentialIncreasesLessThan(float64(5)) { + if intervalsSinceLastFlowIncrease < constants.MovingAverageStabilitySpan && movingAverageAverage.AllSequentialIncreasesLessThan(constants.InstabilityDelta) { if debug.IsDebug(debugging.Level) { fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging) } @@ -226,7 +235,7 @@ func Saturate( fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging) } addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, &lgcsPreviousTransferred, lgcGenerator, debugging.Level) - previousFlowIncreaseIteration = currentIteration + previousFlowIncreaseInterval = currentInterval } } @@ -551,7 +560,7 @@ func CalculateProbeMeasurements( ctx context.Context, strict bool, saturated_measurement_probe *Probe, - new_measurement_probe *Probe, + unsaturated_measurement_probe *Probe, url string, debugLevel debug.DebugLevel, ) chan utilities.MeasurementResult { @@ -562,40 +571,40 @@ func CalculateProbeMeasurements( right now. However, it is not clear if Apple is doing the same in their native client. We will have to adjust based on that. */ - var saturated_latency utilities.MeasurementResult + var saturated_probe_latency utilities.MeasurementResult if strict { if debug.IsDebug(debugLevel) { - fmt.Printf("Beginning saturated RTT probe.\n") + fmt.Printf("Beginning saturated measurement probe.\n") } saturated_latency := getLatency(ctx, saturated_measurement_probe, url, debugLevel) if saturated_latency.Err != nil { - fmt.Printf("Error occurred getting the saturated RTT.\n") + fmt.Printf("Error occurred getting the saturated measurement.\n") responseChannel <- saturated_latency return } } if debug.IsDebug(debugLevel) { - fmt.Printf("Beginning unsaturated RTT probe.\n") + fmt.Printf("Beginning unsaturated measurement probe.\n") } - new_rtt_latency := getLatency(ctx, new_measurement_probe, url, debugLevel) + unsaturated_probe_latency := getLatency(ctx, unsaturated_measurement_probe, url, debugLevel) - if new_rtt_latency.Err != nil { - fmt.Printf("Error occurred getting the unsaturated RTT.\n") - responseChannel <- new_rtt_latency + if unsaturated_probe_latency.Err != nil { + fmt.Printf("Error occurred getting the unsaturated measurement.\n") + responseChannel <- unsaturated_probe_latency return } - total_delay := new_rtt_latency.Delay - total_measurement_count := new_rtt_latency.MeasurementCount + total_latency := unsaturated_probe_latency.Delay + total_measurement_count := unsaturated_probe_latency.MeasurementCount if strict { - total_delay += saturated_latency.Delay - total_measurement_count += saturated_latency.MeasurementCount + total_latency += saturated_probe_latency.Delay + total_measurement_count += saturated_probe_latency.MeasurementCount } - responseChannel <- utilities.MeasurementResult{Delay: total_delay, MeasurementCount: total_measurement_count, Err: nil} + responseChannel <- utilities.MeasurementResult{Delay: total_latency, MeasurementCount: total_measurement_count, Err: nil} return }() return responseChannel |
