summaryrefslogtreecommitdiff
path: root/rpm/rpm.go
diff options
context:
space:
mode:
Diffstat (limited to 'rpm/rpm.go')
-rw-r--r--rpm/rpm.go221
1 files changed, 221 insertions, 0 deletions
diff --git a/rpm/rpm.go b/rpm/rpm.go
index a349cee..8f431b6 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -7,14 +7,235 @@ import (
"io"
"net/http"
"net/http/httptrace"
+ "os"
"time"
+ "github.com/network-quality/goresponsiveness/constants"
"github.com/network-quality/goresponsiveness/debug"
+ "github.com/network-quality/goresponsiveness/lgc"
+ "github.com/network-quality/goresponsiveness/ma"
"github.com/network-quality/goresponsiveness/stats"
"github.com/network-quality/goresponsiveness/traceable"
"github.com/network-quality/goresponsiveness/utilities"
)
+func addFlows(
+ ctx context.Context,
+ toAdd uint64,
+ lgcs *[]lgc.LoadGeneratingConnection,
+ lgcsPreviousTransferred *[]uint64,
+ lgcGenerator func() lgc.LoadGeneratingConnection,
+ debug debug.DebugLevel,
+) {
+ for i := uint64(0); i < toAdd; i++ {
+ *lgcs = append(*lgcs, lgcGenerator())
+ *lgcsPreviousTransferred = append(*lgcsPreviousTransferred, 0)
+ if !(*lgcs)[len(*lgcs)-1].Start(ctx, debug) {
+ fmt.Printf(
+ "Error starting lgc with id %d!\n",
+ (*lgcs)[len(*lgcs)-1].ClientId(),
+ )
+ return
+ }
+ }
+}
+
+type SaturationResult struct {
+ RateBps float64
+ LGCs []lgc.LoadGeneratingConnection
+}
+
+func Saturate(
+ saturationCtx context.Context,
+ operatingCtx context.Context,
+ lgcGenerator func() lgc.LoadGeneratingConnection,
+ debugging *debug.DebugWithPrefix,
+) (saturated chan SaturationResult) {
+ saturated = make(chan SaturationResult)
+ go func() {
+
+ lgcs := make([]lgc.LoadGeneratingConnection, 0)
+ lgcsPreviousTransferred := make([]uint64, 0)
+
+ addFlows(
+ saturationCtx,
+ constants.StartingNumberOfLoadGeneratingConnections,
+ &lgcs,
+ &lgcsPreviousTransferred,
+ lgcGenerator,
+ debugging.Level,
+ )
+
+ previousFlowIncreaseIteration := uint64(0)
+ previousMovingAverage := float64(0)
+ movingAverage := ma.NewMovingAverage(
+ constants.MovingAverageIntervalCount,
+ )
+ movingAverageAverage := ma.NewMovingAverage(
+ constants.MovingAverageIntervalCount,
+ )
+
+ nextSampleStartTime := time.Now().Add(time.Second)
+
+ for currentIteration := uint64(0); true; currentIteration++ {
+
+ // When the program stops operating, then stop.
+ if saturationCtx.Err() != nil {
+ return
+ }
+
+ // We may be asked to stop trying to saturate the
+ // network and return our current status.
+ if saturationCtx.Err() != nil {
+ //break
+ }
+
+ now := time.Now()
+ // At each 1-second interval
+ if nextSampleStartTime.Sub(now) > 0 {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: Sleeping until %v\n",
+ debugging,
+ nextSampleStartTime,
+ )
+ }
+ time.Sleep(nextSampleStartTime.Sub(now))
+ } else {
+ fmt.Fprintf(os.Stderr, "Warning: Missed a one-second deadline.\n")
+ }
+ nextSampleStartTime = time.Now().Add(time.Second)
+
+ // Compute "instantaneous aggregate" goodput which is the number of
+ // bytes transferred within the last second.
+ totalTransfer := uint64(0)
+ allInvalid := true
+ for i := range lgcs {
+ if !lgcs[i].IsValid() {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: Load-generating connection with id %d is invalid ... skipping.\n",
+ debugging,
+ lgcs[i].ClientId(),
+ )
+ }
+ continue
+ }
+ allInvalid = false
+ previousTransferred := lgcsPreviousTransferred[i]
+ currentTransferred := lgcs[i].Transferred()
+ totalTransfer += (currentTransferred - previousTransferred)
+ lgcsPreviousTransferred[i] = currentTransferred
+ }
+
+ // For some reason, all the lgcs are invalid. This likely means that
+ // the network/server went away.
+ if allInvalid {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: All lgcs were invalid. Assuming that network/server went away.\n",
+ debugging,
+ )
+ }
+ break
+ }
+
+ // Compute a moving average of the last
+ // constants.MovingAverageIntervalCount "instantaneous aggregate
+ // goodput" measurements
+ movingAverage.AddMeasurement(float64(totalTransfer))
+ currentMovingAverage := movingAverage.CalculateAverage()
+ movingAverageAverage.AddMeasurement(currentMovingAverage)
+ movingAverageDelta := utilities.SignedPercentDifference(
+ currentMovingAverage,
+ previousMovingAverage,
+ )
+
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: Instantaneous goodput: %f MB.\n",
+ debugging,
+ utilities.ToMBps(float64(totalTransfer)),
+ )
+ fmt.Printf(
+ "%v: Previous moving average: %f MB.\n",
+ debugging,
+ utilities.ToMBps(previousMovingAverage),
+ )
+ fmt.Printf(
+ "%v: Current moving average: %f MB.\n",
+ debugging,
+ utilities.ToMBps(currentMovingAverage),
+ )
+ fmt.Printf(
+ "%v: Moving average delta: %f.\n",
+ debugging,
+ movingAverageDelta,
+ )
+ }
+
+ previousMovingAverage = currentMovingAverage
+
+ // Special case: We won't make any adjustments on the first
+ // iteration.
+ if currentIteration == 0 {
+ continue
+ }
+
+ // If moving average > "previous" moving average + InstabilityDelta:
+ if movingAverageDelta > constants.InstabilityDelta {
+ // Network did not yet reach saturation. If no flows added
+ // within the last 4 seconds, add 4 more flows
+ if (currentIteration - previousFlowIncreaseIteration) > uint64(
+ constants.MovingAverageStabilitySpan,
+ ) {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: Adding flows because we are unsaturated and waited a while.\n",
+ debugging,
+ )
+ }
+ addFlows(
+ saturationCtx,
+ constants.AdditiveNumberOfLoadGeneratingConnections,
+ &lgcs,
+ &lgcsPreviousTransferred,
+ lgcGenerator,
+ debugging.Level,
+ )
+ previousFlowIncreaseIteration = currentIteration
+ } else {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debugging)
+ }
+ }
+ } else { // Else, network reached saturation for the current flow count.
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf("%v: Network reached saturation with current flow count.\n", debugging)
+ }
+ // If new flows added and for 4 seconds the moving average
+ // throughput did not change: network reached stable saturation
+ if (currentIteration-previousFlowIncreaseIteration) < uint64(constants.MovingAverageStabilitySpan) && movingAverageAverage.AllSequentialIncreasesLessThan(float64(5)) {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging)
+ }
+ break
+ } else {
+ // Else, add four more flows
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging)
+ }
+ addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, &lgcsPreviousTransferred, lgcGenerator, debugging.Level)
+ previousFlowIncreaseIteration = currentIteration
+ }
+ }
+
+ }
+ saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs}
+ }()
+ return
+}
+
type Probe struct {
client *http.Client
stats *stats.TraceStats