diff options
| -rw-r--r-- | networkQuality.go | 238 | ||||
| -rw-r--r-- | utilities/utilities.go | 2 |
2 files changed, 126 insertions, 114 deletions
diff --git a/networkQuality.go b/networkQuality.go index 673f58b..9231970 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -30,6 +30,33 @@ type ConfigUrls struct { type Config struct { Version int Urls ConfigUrls `json:"urls"` + Source string +} + +func (c *Config) Get(configHost string, configPath string) error { + configClient := &http.Client{} + // Extraneous /s in URLs is normally okay, but the Apple CDN does not + // like them. Make sure that we put exactly one (1) / between the host + // and the path. + if !strings.HasPrefix(configPath, "/") { + configPath = "/" + configPath + } + c.Source = fmt.Sprintf("https://%s%s", configHost, configPath) + resp, err := configClient.Get(c.Source) + if err != nil { + return fmt.Errorf("Error: Could not connect to configuration host %s: %v\n", configHost, err) + } + + jsonConfig, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("Error: Could not read configuration content downloaded from %s: %v\n", c.Source, err) + } + + err = json.Unmarshal(jsonConfig, c) + if err != nil { + return fmt.Errorf("Error: Could not parse configuration returned from %s: %v\n", c.Source, err) + } + return nil } func (c *Config) String() string { @@ -80,131 +107,121 @@ type SaturationResult struct { Mcs []mc.MeasurableConnection } -func saturate(ctx context.Context, saturated chan<- SaturationResult, lbcGenerator func() mc.MeasurableConnection, debug bool) { - mcs := make([]mc.MeasurableConnection, 0) - mcsPreviousTransferred := make([]uint64, 0) +func saturate(ctx context.Context, lbcGenerator func() mc.MeasurableConnection, debug bool) (saturated chan SaturationResult) { + saturated = make(chan SaturationResult) + go func() { - // Create 4 load bearing connections - addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug) + mcs := make([]mc.MeasurableConnection, 0) + mcsPreviousTransferred := make([]uint64, 0) - previousFlowIncreaseIteration := uint64(0) - previousMovingAverage := float64(0) - movingAverage := ma.NewMovingAverage(4) - movingAverageAverage := ma.NewMovingAverage(4) + // Create 4 load bearing connections + addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug) - for currentIteration := uint64(0); true; currentIteration++ { + previousFlowIncreaseIteration := uint64(0) + previousMovingAverage := float64(0) + movingAverage := ma.NewMovingAverage(4) + movingAverageAverage := ma.NewMovingAverage(4) - // If we are cancelled, then stop. - if ctx.Err() != nil { - return - } - - // At each 1-second interval - time.Sleep(time.Second) - - // Compute "instantaneous aggregate" goodput which is the number of bytes transferred within the last second. - totalTransfer := uint64(0) - for i := range mcs { - previousTransferred := mcsPreviousTransferred[i] - currentTransferred := mcs[i].Transferred() - totalTransfer += (currentTransferred - previousTransferred) - mcsPreviousTransferred[i] = currentTransferred - } + nextTime := time.Now().Add(time.Second) - // Compute a moving average of the last 4 "instantaneous aggregate goodput" measurements - movingAverage.AddMeasurement(float64(totalTransfer)) - currentMovingAverage := movingAverage.CalculateAverage() - movingAverageAverage.AddMeasurement(currentMovingAverage) - movingAverageDelta := utilities.SignedPercentDifference(currentMovingAverage, previousMovingAverage) - previousMovingAverage = currentMovingAverage + for currentIteration := uint64(0); true; currentIteration++ { - if debug { - fmt.Printf("Instantaneous goodput: %f MB.\n", toMBs(float64(totalTransfer))) - fmt.Printf("Moving average: %f MB.\n", toMBs(currentMovingAverage)) - fmt.Printf("Moving average delta: %f.\n", movingAverageDelta) - } + // If we are cancelled, then stop. + if ctx.Err() != nil { + return + } - // If moving average > "previous" moving average + 5%: - if currentIteration == 0 || movingAverageDelta > float64(5) { - // Network did not yet reach saturation. If no flows added within the last 4 seconds, add 4 more flows - if (currentIteration - previousFlowIncreaseIteration) > 4 { + now := time.Now() + // At each 1-second interval + if nextTime.Second() > now.Second() { if debug { - fmt.Printf("Adding flows because we are unsaturated and waited a while.\n") + fmt.Printf("Sleeping until %v\n", nextTime) } - addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug) - previousFlowIncreaseIteration = currentIteration + time.Sleep(nextTime.Sub(now)) } else { - if debug { - fmt.Printf("We are unsaturated, but it still too early to add anything.\n") - } + fmt.Printf("Warning: Missed a one-second deadline.\n") } - } else { // Else, network reached saturation for the current flow count. - // If new flows added and for 4 seconds the moving average throughput did not change: network reached stable saturation - if (currentIteration-previousFlowIncreaseIteration) < 4 && movingAverageAverage.ConsistentWithin(float64(4)) { - if debug { - fmt.Printf("New flows added within the last four seconds and the moving-average average is consistent!\n") + nextTime = time.Now().Add(time.Second) + + // Compute "instantaneous aggregate" goodput which is the number of bytes transferred within the last second. + totalTransfer := uint64(0) + for i := range mcs { + previousTransferred := mcsPreviousTransferred[i] + currentTransferred := mcs[i].Transferred() + totalTransfer += (currentTransferred - previousTransferred) + mcsPreviousTransferred[i] = currentTransferred + } + + // Compute a moving average of the last 4 "instantaneous aggregate goodput" measurements + movingAverage.AddMeasurement(float64(totalTransfer)) + currentMovingAverage := movingAverage.CalculateAverage() + movingAverageAverage.AddMeasurement(currentMovingAverage) + movingAverageDelta := utilities.SignedPercentDifference(currentMovingAverage, previousMovingAverage) + previousMovingAverage = currentMovingAverage + + if debug { + fmt.Printf("Instantaneous goodput: %f MB.\n", toMBs(float64(totalTransfer))) + fmt.Printf("Moving average: %f MB.\n", toMBs(currentMovingAverage)) + fmt.Printf("Moving average delta: %f.\n", movingAverageDelta) + } + + // If moving average > "previous" moving average + 5%: + if currentIteration == 0 || movingAverageDelta > float64(5) { + // Network did not yet reach saturation. If no flows added within the last 4 seconds, add 4 more flows + if (currentIteration - previousFlowIncreaseIteration) > 4 { + if debug { + fmt.Printf("Adding flows because we are unsaturated and waited a while.\n") + } + addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug) + previousFlowIncreaseIteration = currentIteration + } else { + if debug { + fmt.Printf("We are unsaturated, but it still too early to add anything.\n") + } } - break - } else { - // Else, add four more flows - if debug { - fmt.Printf("New flows to add to try to increase our saturation!\n") + } else { // Else, network reached saturation for the current flow count. + // If new flows added and for 4 seconds the moving average throughput did not change: network reached stable saturation + if (currentIteration-previousFlowIncreaseIteration) < 4 && movingAverageAverage.ConsistentWithin(float64(4)) { + if debug { + fmt.Printf("New flows added within the last four seconds and the moving-average average is consistent!\n") + } + break + } else { + // Else, add four more flows + if debug { + fmt.Printf("New flows to add to try to increase our saturation!\n") + } + addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug) + previousFlowIncreaseIteration = currentIteration } - addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug) - previousFlowIncreaseIteration = currentIteration } - } - } - saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), Mcs: mcs} + } + saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), Mcs: mcs} + }() + return } func main() { flag.Parse() timeoutDuration := time.Second * time.Duration(*timeout) - configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort) + operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background()) + config := &Config{} - if !strings.HasPrefix(*configPath, "/") { - *configPath = "/" + *configPath - } - - configUrl := fmt.Sprintf("https://%s%s", configHostPort, *configPath) - - configClient := &http.Client{} - resp, err := configClient.Get(configUrl) - if err != nil { - fmt.Fprintf(os.Stderr, "Error: Could not connect to configuration host %s: %v\n", configHostPort, err) - return - } - - jsonConfig, err := ioutil.ReadAll(resp.Body) - if err != nil { - fmt.Fprintf(os.Stderr, "Error: Could not read configuration content downloaded from %s: %v\n", configUrl, err) - return - } - - var config Config - err = json.Unmarshal(jsonConfig, &config) - if err != nil { - fmt.Fprintf(os.Stderr, "Error: Could not parse configuration returned from %s: %v\n", configUrl, err) + if err := config.Get(configHostPort, *configPath); err != nil { + fmt.Fprintf(os.Stderr, "%s\n", err) return } - if err := config.IsValid(); err != nil { - fmt.Fprintf(os.Stderr, "Error: Invalid configuration returned from %s: %v\n", configUrl, err) + fmt.Fprintf(os.Stderr, "Error: Invalid configuration returned from %s: %v\n", config.Source, err) return } - if *debug { - fmt.Printf("Configuration: %s\n", &config) + fmt.Printf("Configuration: %s\n", config) } - operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background()) - - uploadSaturationChannel := make(chan SaturationResult) - downloadSaturationChannel := make(chan SaturationResult) timeoutChannel := timeoutat.TimeoutAt(operatingCtx, time.Now().Add(timeoutDuration), *debug) generate_lbd := func() mc.MeasurableConnection { @@ -213,15 +230,14 @@ func main() { generate_lbu := func() mc.MeasurableConnection { return &mc.LoadBearingUpload{Path: config.Urls.UploadUrl} } - - go saturate(operatingCtx, downloadSaturationChannel, generate_lbd, *debug) - go saturate(operatingCtx, uploadSaturationChannel, generate_lbu, *debug) + downloadSaturationChannel := saturate(operatingCtx, generate_lbd, *debug) + uploadSaturationChannel := saturate(operatingCtx, generate_lbu, *debug) test_timeout := false upload_saturated := false download_saturated := false - - var downloadSaturation, uploadSaturation SaturationResult + downloadSaturation := SaturationResult{} + uploadSaturation := SaturationResult{} for !test_timeout && !(upload_saturated && download_saturated) { select { @@ -251,39 +267,35 @@ func main() { if test_timeout { cancelOperatingCtx() - fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation before test time expired.\n.", timeoutDuration) + fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation before test time expired (%v).\n.", timeoutDuration) return } - // We are guaranteed to have an upload and download saturation result! - robustnessProbeIterationCount := 5 - actualProbeCount := 0 + actualRTTCount := 0 + totalRTTTime := float64(0) - totalProbeTime := float64(0) for i := 0; i < robustnessProbeIterationCount && !test_timeout; i++ { - // There are len(downloadSaturation.Mcs) flows with http2 connections that - // we can piggy back. Let's choose one at random. - mcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Mcs) + randomMcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Mcs) select { case <-timeoutChannel: { test_timeout = true } - case probeTime := <-utilities.TimedSequentialGets(operatingCtx, downloadSaturation.Mcs[mcsIndex].Client(), &http.Client{}, config.Urls.SmallUrl): + case fiveRTTsTime := <-utilities.TimedSequentialRTTs(operatingCtx, downloadSaturation.Mcs[randomMcsIndex].Client(), &http.Client{}, config.Urls.SmallUrl): { - actualProbeCount++ - totalProbeTime += probeTime.Delay.Seconds() + actualRTTCount += 5 + totalRTTTime += fiveRTTsTime.Delay.Seconds() if *debug { - fmt.Printf("probeTime: %v\n", probeTime.Delay.Seconds()) + fmt.Printf("fiveRTTsTime: %v\n", fiveRTTsTime.Delay.Seconds()) } } } } - averageProbeTime := totalProbeTime / (float64(actualProbeCount) * 5) + rpm := float64(60) / (totalRTTTime / (float64(actualRTTCount) * 5)) - fmt.Printf("RPM: %v\n", float64(60)/averageProbeTime) + fmt.Printf("RPM: %v\n", rpm) cancelOperatingCtx() if *debug { diff --git a/utilities/utilities.go b/utilities/utilities.go index 16585d6..0046544 100644 --- a/utilities/utilities.go +++ b/utilities/utilities.go @@ -27,7 +27,7 @@ type GetLatency struct { Err error } -func TimedSequentialGets(ctx context.Context, client_a *http.Client, client_b *http.Client, url string) chan GetLatency { +func TimedSequentialRTTs(ctx context.Context, client_a *http.Client, client_b *http.Client, url string) chan GetLatency { responseChannel := make(chan GetLatency) go func() { before := time.Now() |
