summaryrefslogtreecommitdiff
path: root/networkQuality.go
diff options
context:
space:
mode:
Diffstat (limited to 'networkQuality.go')
-rw-r--r--networkQuality.go146
1 files changed, 111 insertions, 35 deletions
diff --git a/networkQuality.go b/networkQuality.go
index f254707..83346ac 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -5,13 +5,17 @@ import (
"encoding/json"
"flag"
"fmt"
- "github.com/hawkinsw/goresponsiveness/ma"
- "github.com/hawkinsw/goresponsiveness/mc"
_ "io"
"io/ioutil"
_ "log"
"net/http"
+ "os"
"time"
+
+ "github.com/hawkinsw/goresponsiveness/ma"
+ "github.com/hawkinsw/goresponsiveness/mc"
+ "github.com/hawkinsw/goresponsiveness/timeoutat"
+ "github.com/hawkinsw/goresponsiveness/utilities"
)
type ConfigUrls struct {
@@ -38,24 +42,33 @@ var (
configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.")
configPort = flag.Int("port", 4043, "port number on which to access responsiveness configuration server.")
debug = flag.Bool("debug", false, "Enable debugging.")
+ timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.")
)
-func saturate(ctx context.Context, saturated chan<- interface{}, lbcGenerator func() mc.MeasurableConnection) {
- mcs := make([]mc.MeasurableConnection, 4)
- mcsPreviousTransferred := make([]uint64, 4)
- for i := range mcs {
+func addFlows(ctx context.Context, toAdd uint64, mcs *[]mc.MeasurableConnection, mcsPreviousTransferred *[]uint64, lbcGenerator func() mc.MeasurableConnection) {
+ for i := uint64(0); i < toAdd; i++ {
//mcs[i] = &mc.LoadBearingUpload{Path: config.Urls.UploadUrl}
- mcs[i] = lbcGenerator()
- mcsPreviousTransferred[i] = 0
- if !mcs[i].Start(ctx) {
+ *mcs = append(*mcs, lbcGenerator())
+ *mcsPreviousTransferred = append(*mcsPreviousTransferred, 0)
+ if !(*mcs)[len(*mcs)-1].Start(ctx) {
fmt.Printf("Error starting %dth MC!\n", i)
return
}
}
+}
+
+func saturate(ctx context.Context, saturated chan<- float64, lbcGenerator func() mc.MeasurableConnection, debug bool) {
+ mcs := make([]mc.MeasurableConnection, 0)
+ mcsPreviousTransferred := make([]uint64, 0)
+ // Create 4 load bearing connections
+ addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator)
+
+ previousFlowIncreaseIteration := uint64(0)
previousMovingAverage := float64(0)
movingAverage := ma.NewMovingAverage(4)
- //lastFlowIncrease := uint64(0)
+ movingAverageAverage := ma.NewMovingAverage(4)
+
for currentIteration := uint64(0); true; currentIteration++ {
// If we are cancelled, then stop.
@@ -63,9 +76,10 @@ func saturate(ctx context.Context, saturated chan<- interface{}, lbcGenerator fu
return
}
+ // At each 1-second interval
time.Sleep(time.Second)
- // 1. Calculate the most recent goodput.
+ // Compute "instantaneous aggregate" goodput which is the number of bytes transferred within the last second.
totalTransfer := uint64(0)
for i := range mcs {
previousTransferred := mcsPreviousTransferred[i]
@@ -74,33 +88,59 @@ func saturate(ctx context.Context, saturated chan<- interface{}, lbcGenerator fu
mcsPreviousTransferred[i] = currentTransferred
}
- // 2. Calculate the delta
- movingAverage.AddMeasurement(totalTransfer)
+ // Compute a moving average of the last 4 "instantaneous aggregate goodput" measurements
+ movingAverage.AddMeasurement(float64(totalTransfer))
currentMovingAverage := movingAverage.CalculateAverage()
- movingAverageDelta := ((currentMovingAverage - previousMovingAverage) / (float64(currentMovingAverage+previousMovingAverage) / 2.0)) * float64(100)
+ movingAverageAverage.AddMeasurement(currentMovingAverage)
+ movingAverageDelta := utilities.PercentDifference(currentMovingAverage, previousMovingAverage)
previousMovingAverage = currentMovingAverage
- fmt.Printf("Instantaneous goodput: %f MB.\n", toMBs(float64(totalTransfer)))
- fmt.Printf("Moving average: %f MB.\n", toMBs(currentMovingAverage))
- fmt.Printf("Moving average delta: %f.\n", movingAverageDelta)
-
+ if debug {
+ fmt.Printf("Instantaneous goodput: %f MB.\n", toMBs(float64(totalTransfer)))
+ fmt.Printf("Moving average: %f MB.\n", toMBs(currentMovingAverage))
+ fmt.Printf("Moving average delta: %f.\n", movingAverageDelta)
+ }
- // 3. Are we stable or not?
- if currentIteration != 0 && movingAverageDelta < float64(5) {
- // We are stable!
- fmt.Printf("Stable\n")
- break
- } else {
- // We are unstable!
- fmt.Printf("Unstable\n")
+ // If moving average > "previous" moving average + 5%:
+ if currentIteration == 0 || movingAverageDelta > float64(5) {
+ // Network did not yet reach saturation. If no flows added within the last 4 seconds, add 4 more flows
+ if (currentIteration - previousFlowIncreaseIteration) > 4 {
+ if debug {
+ fmt.Printf("Adding flows because we are unsaturated and waited a while.\n")
+ }
+ addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator)
+ previousFlowIncreaseIteration = currentIteration
+ } else {
+ if debug {
+ fmt.Printf("We are unsaturated, but it still too early to add anything.\n")
+ }
+ }
+ } else { // Else, network reached saturation for the current flow count.
+ // If new flows added and for 4 seconds the moving average throughput did not change: network reached stable saturation
+ if (currentIteration-previousFlowIncreaseIteration) < 4 && movingAverageAverage.ConsistentWithin(float64(4)) {
+ if debug {
+ fmt.Printf("New flows added within the last four seconds and the moving-average average is consistent!\n")
+ }
+ break
+ } else {
+ // Else, add four more flows
+ if debug {
+ fmt.Printf("New flows to add to try to increase our saturation!\n")
+ }
+ addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator)
+ previousFlowIncreaseIteration = currentIteration
+ }
}
+
}
- saturated <- struct{}{}
+ saturated <- movingAverage.CalculateAverage()
}
func main() {
flag.Parse()
+ timeoutDuration := time.Second * time.Duration(*timeout)
+
configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort)
configUrl := fmt.Sprintf("https://%s/config", configHostPort)
@@ -130,22 +170,58 @@ func main() {
operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background())
- uploadSaturationChannel := make(chan interface{})
+ uploadSaturationChannel := make(chan float64)
+ downloadSaturationChannel := make(chan float64)
+ timeoutChannel := make(chan interface{})
+
+ _ = timeoutat.NewTimeoutAt(operatingCtx, time.Now().Add(timeoutDuration), timeoutChannel)
- g := func() mc.MeasurableConnection {
- return &mc.LoadBearingDownload{Path: config.Urls.LargeUrl}
+ generate_lbd := func() mc.MeasurableConnection {
+ return &mc.LoadBearingDownload{Path: config.Urls.LargeUrl}
+ }
+ generate_lbu := func() mc.MeasurableConnection {
+ return &mc.LoadBearingUpload{Path: config.Urls.UploadUrl}
}
+ go saturate(operatingCtx, downloadSaturationChannel, generate_lbd, *debug)
+ go saturate(operatingCtx, uploadSaturationChannel, generate_lbu, *debug)
- go saturate(operatingCtx, uploadSaturationChannel, g)
+ saturation_timeout := false
+ upload_saturated := false
+ download_saturated := false
- select {
- case <-uploadSaturationChannel:
- {
- fmt.Printf("upload is saturated!\n")
+ for !saturation_timeout && !(upload_saturated && download_saturated) {
+ select {
+ case saturatedDownloadRate := <-downloadSaturationChannel:
+ {
+ download_saturated = true
+ if *debug {
+ fmt.Printf("################## download is saturated (%f)!\n", toMBs(saturatedDownloadRate))
+ }
+ }
+ case saturatedUploadRate := <-uploadSaturationChannel:
+ {
+ upload_saturated = true
+ if *debug {
+ fmt.Printf("################# upload is saturated (%f)!\n", toMBs(saturatedUploadRate))
+ }
+ }
+ case <-timeoutChannel:
+ {
+ saturation_timeout = true
+ if *debug {
+ fmt.Printf("################# timeout reaching saturation!\n")
+ }
+ }
}
}
+ if saturation_timeout {
+ cancelOperatingCtx()
+ fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation in maximum time of %v.", timeoutDuration)
+ return
+ }
+
time.Sleep(10 * time.Second)
cancelOperatingCtx()