diff options
Diffstat (limited to 'networkQuality.go')
| -rw-r--r-- | networkQuality.go | 205 |
1 files changed, 139 insertions, 66 deletions
diff --git a/networkQuality.go b/networkQuality.go index 44d64b1..54c4260 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -21,6 +21,19 @@ import ( "github.com/hawkinsw/goresponsiveness/utilities" ) +var ( + // Variables to hold CLI arguments. + configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.") + configPort = flag.Int("port", 4043, "port number on which to access responsiveness configuration server.") + configPath = flag.String("path", "config", "path on the server to the configuration endpoint.") + debug = flag.Bool("debug", false, "Enable debugging.") + timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.") + storeSslKeys = flag.Bool("store-ssl-keys", false, "Store SSL keys from connections for debugging. (currently unused)") + + // Global configuration + cooldownPeriod int = 4 +) + type ConfigUrls struct { SmallUrl string `json:"small_https_download_url"` LargeUrl string `json:"large_https_download_url"` @@ -28,9 +41,10 @@ type ConfigUrls struct { } type Config struct { - Version int - Urls ConfigUrls `json:"urls"` - Source string + Version int + Urls ConfigUrls `json:"urls"` + Source string + Test_Endpoint string } func (c *Config) Get(configHost string, configPath string) error { @@ -56,11 +70,30 @@ func (c *Config) Get(configHost string, configPath string) error { if err != nil { return fmt.Errorf("Error: Could not parse configuration returned from %s: %v\n", c.Source, err) } + + //if len(c.Test_Endpoint) != 0 { + if false { + tempUrl, err := url.Parse(c.Urls.LargeUrl) + if err != nil { + return fmt.Errorf("Error parsing large_https_download_url: %v", err) + } + c.Urls.LargeUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path + tempUrl, err = url.Parse(c.Urls.SmallUrl) + if err != nil { + return fmt.Errorf("Error parsing small_https_download_url: %v", err) + } + c.Urls.SmallUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path + tempUrl, err = url.Parse(c.Urls.UploadUrl) + if err != nil { + return fmt.Errorf("Error parsing https_upload_url: %v", err) + } + c.Urls.UploadUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path + } return nil } func (c *Config) String() string { - return fmt.Sprintf("Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s", c.Version, c.Urls.SmallUrl, c.Urls.LargeUrl, c.Urls.UploadUrl) + return fmt.Sprintf("Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s\nEndpoint: %s\n", c.Version, c.Urls.SmallUrl, c.Urls.LargeUrl, c.Urls.UploadUrl, c.Test_Endpoint) } func (c *Config) IsValid() error { @@ -76,19 +109,13 @@ func (c *Config) IsValid() error { return nil } -func toMBs(bytes float64) float64 { - return float64(bytes) / float64(1024*1024) +func toMbps(bytes float64) float64 { + return toMBps(bytes) * float64(8) } -var ( - // Variables to hold CLI arguments. - configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.") - configPort = flag.Int("port", 4043, "port number on which to access responsiveness configuration server.") - configPath = flag.String("path", "config", "path on the server to the configuration endpoint.") - debug = flag.Bool("debug", false, "Enable debugging.") - timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.") - storeSslKeys = flag.Bool("store-ssl-keys", false, "Store SSL keys from connections for debugging. (currently unused)") -) +func toMBps(bytes float64) float64 { + return float64(bytes) / float64(1024*1024) +} func addFlows(ctx context.Context, toAdd uint64, lbcs *[]lbc.LoadBearingConnection, lbcsPreviousTransferred *[]uint64, lbcGenerator func() lbc.LoadBearingConnection, debug bool) { for i := uint64(0); i < toAdd; i++ { @@ -107,7 +134,19 @@ type SaturationResult struct { Lbcs []lbc.LoadBearingConnection } -func saturate(ctx context.Context, lbcGenerator func() lbc.LoadBearingConnection, debug bool) (saturated chan SaturationResult) { +type Debugging struct { + Prefix string +} + +func NewDebugging(prefix string) *Debugging { + return &Debugging{Prefix: prefix} +} + +func (d *Debugging) String() string { + return d.Prefix +} + +func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGenerator func() lbc.LoadBearingConnection, debug *Debugging) (saturated chan SaturationResult) { saturated = make(chan SaturationResult) go func() { @@ -115,37 +154,49 @@ func saturate(ctx context.Context, lbcGenerator func() lbc.LoadBearingConnection lbcsPreviousTransferred := make([]uint64, 0) // Create 4 load bearing connections - addFlows(ctx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug) + addFlows(operatingCtx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil) previousFlowIncreaseIteration := uint64(0) previousMovingAverage := float64(0) movingAverage := ma.NewMovingAverage(4) movingAverageAverage := ma.NewMovingAverage(4) - nextTime := time.Now().Add(time.Second) + nextSampleStartTime := time.Now().Add(time.Second) for currentIteration := uint64(0); true; currentIteration++ { - // If we are cancelled, then stop. - if ctx.Err() != nil { + // When the program stops operating, then stop. + if operatingCtx.Err() != nil { return } + // We may be asked to stop trying to saturate the + // network and return our current status. + if saturationCtx.Err() != nil { + //break + } + now := time.Now() // At each 1-second interval - if nextTime.Second() > now.Second() { - if debug { - fmt.Printf("Sleeping until %v\n", nextTime) + if nextSampleStartTime.Sub(now) > 0 { + if debug != nil { + fmt.Printf("%v: Sleeping until %v\n", debug, nextSampleStartTime) } - time.Sleep(nextTime.Sub(now)) + time.Sleep(nextSampleStartTime.Sub(now)) } else { - fmt.Printf("Warning: Missed a one-second deadline.\n") + fmt.Fprintf(os.Stderr, "Warning: Missed a one-second deadline.\n") } - nextTime = time.Now().Add(time.Second) + nextSampleStartTime = time.Now().Add(time.Second) // Compute "instantaneous aggregate" goodput which is the number of bytes transferred within the last second. totalTransfer := uint64(0) for i := range lbcs { + if !lbcs[i].IsValid() { + if debug != nil { + fmt.Printf("%v: Load-bearing connection at index %d is invalid ... skipping.\n", debug, i) + } + continue + } previousTransferred := lbcsPreviousTransferred[i] currentTransferred := lbcs[i].Transferred() totalTransfer += (currentTransferred - previousTransferred) @@ -157,41 +208,51 @@ func saturate(ctx context.Context, lbcGenerator func() lbc.LoadBearingConnection currentMovingAverage := movingAverage.CalculateAverage() movingAverageAverage.AddMeasurement(currentMovingAverage) movingAverageDelta := utilities.SignedPercentDifference(currentMovingAverage, previousMovingAverage) + + if debug != nil { + fmt.Printf("%v: Instantaneous goodput: %f MB.\n", debug, toMBps(float64(totalTransfer))) + fmt.Printf("%v: Previous moving average: %f MB.\n", debug, toMBps(previousMovingAverage)) + fmt.Printf("%v: Current moving average: %f MB.\n", debug, toMBps(currentMovingAverage)) + fmt.Printf("%v: Moving average delta: %f.\n", debug, movingAverageDelta) + } + previousMovingAverage = currentMovingAverage - if debug { - fmt.Printf("Instantaneous goodput: %f MB.\n", toMBs(float64(totalTransfer))) - fmt.Printf("Moving average: %f MB.\n", toMBs(currentMovingAverage)) - fmt.Printf("Moving average delta: %f.\n", movingAverageDelta) + // Special case: We won't make any adjustments on the first iteration. + if currentIteration == 0 { + continue } // If moving average > "previous" moving average + 5%: - if currentIteration == 0 || movingAverageDelta > float64(5) { + if movingAverageDelta > float64(5) { // Network did not yet reach saturation. If no flows added within the last 4 seconds, add 4 more flows if (currentIteration - previousFlowIncreaseIteration) > 4 { - if debug { - fmt.Printf("Adding flows because we are unsaturated and waited a while.\n") + if debug != nil { + fmt.Printf("%v: Adding flows because we are unsaturated and waited a while.\n", debug) } - addFlows(ctx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug) + addFlows(operatingCtx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil) previousFlowIncreaseIteration = currentIteration } else { - if debug { - fmt.Printf("We are unsaturated, but it still too early to add anything.\n") + if debug != nil { + fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debug) } } } else { // Else, network reached saturation for the current flow count. + if debug != nil { + fmt.Printf("%v: Network reached saturation with current flow count.\n", debug) + } // If new flows added and for 4 seconds the moving average throughput did not change: network reached stable saturation - if (currentIteration-previousFlowIncreaseIteration) < 4 && movingAverageAverage.ConsistentWithin(float64(4)) { - if debug { - fmt.Printf("New flows added within the last four seconds and the moving-average average is consistent!\n") + if (currentIteration-previousFlowIncreaseIteration) < 4 && movingAverageAverage.IncreasesLessThan(float64(5)) { + if debug != nil { + fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debug) } break } else { // Else, add four more flows - if debug { - fmt.Printf("New flows to add to try to increase our saturation!\n") + if debug != nil { + fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debug) } - addFlows(ctx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug) + addFlows(operatingCtx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil) previousFlowIncreaseIteration = currentIteration } } @@ -208,6 +269,7 @@ func main() { timeoutDuration := time.Second * time.Duration(*timeout) configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort) operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background()) + saturationCtx, cancelSaturationCtx := context.WithCancel(context.Background()) config := &Config{} if err := config.Get(configHostPort, *configPath); err != nil { @@ -230,34 +292,48 @@ func main() { generate_lbu := func() lbc.LoadBearingConnection { return &lbc.LoadBearingConnectionUpload{Path: config.Urls.UploadUrl} } - downloadSaturationChannel := saturate(operatingCtx, generate_lbd, *debug) - uploadSaturationChannel := saturate(operatingCtx, generate_lbu, *debug) - test_timeout := false - upload_saturated := false - download_saturated := false + downloadSaturationChannel := saturate(saturationCtx, operatingCtx, generate_lbd, NewDebugging("download")) + uploadSaturationChannel := saturate(saturationCtx, operatingCtx, generate_lbu, NewDebugging("upload")) + + saturationTimeout := false + uploadSaturated := false + downloadSaturated := false downloadSaturation := SaturationResult{} uploadSaturation := SaturationResult{} - for !test_timeout && !(upload_saturated && download_saturated) { + for !(uploadSaturated && downloadSaturated) { select { case downloadSaturation = <-downloadSaturationChannel: { - download_saturated = true + downloadSaturated = true if *debug { - fmt.Printf("################# download is saturated (%fMBps, %d flows)!\n", toMBs(downloadSaturation.RateBps), len(downloadSaturation.Lbcs)) + fmt.Printf("################# download is %s saturated (%fMBps, %d flows)!\n", utilities.Conditional(saturationTimeout, "(provisionally)", ""), toMBps(downloadSaturation.RateBps), len(downloadSaturation.Lbcs)) } } case uploadSaturation = <-uploadSaturationChannel: { - upload_saturated = true + uploadSaturated = true if *debug { - fmt.Printf("################# upload is saturated (%fMBps, %d flows)!\n", toMBs(uploadSaturation.RateBps), len(uploadSaturation.Lbcs)) + fmt.Printf("################# upload is %s saturated (%fMBps, %d flows)!\n", utilities.Conditional(saturationTimeout, "(provisionally)", ""), toMBps(uploadSaturation.RateBps), len(uploadSaturation.Lbcs)) } } case <-timeoutChannel: { - test_timeout = true + if saturationTimeout { + // We already timedout on saturation. This signal means that + // we are timedout on getting the provisional saturation. We + // will exit! + fmt.Fprint(os.Stderr, "Error: Saturation could not be completed in time and no provisional rates could be accessed. Test failed.\n") + cancelOperatingCtx() + if *debug { + time.Sleep(time.Duration(cooldownPeriod) * time.Second) + } + return + } + saturationTimeout = true + timeoutChannel = timeoutat.TimeoutAt(operatingCtx, time.Now().Add(5*time.Second), *debug) + cancelSaturationCtx() if *debug { fmt.Printf("################# timeout reaching saturation!\n") } @@ -265,26 +341,21 @@ func main() { } } - if test_timeout { - cancelOperatingCtx() - fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation before test time expired (%v).\n.", timeoutDuration) - return - } - robustnessProbeIterationCount := 5 - actualRTTCount := 0 + totalRTTsCount := 0 totalRTTTime := float64(0) + rttTimeout := false - for i := 0; i < robustnessProbeIterationCount && !test_timeout; i++ { + for i := 0; i < robustnessProbeIterationCount && !rttTimeout; i++ { randomLbcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Lbcs) select { case <-timeoutChannel: { - test_timeout = true + rttTimeout = true } case fiveRTTsTime := <-utilities.TimedSequentialRTTs(operatingCtx, downloadSaturation.Lbcs[randomLbcsIndex].Client(), &http.Client{}, config.Urls.SmallUrl): { - actualRTTCount += 5 + totalRTTsCount += 5 totalRTTTime += fiveRTTsTime.Delay.Seconds() if *debug { fmt.Printf("fiveRTTsTime: %v\n", fiveRTTsTime.Delay.Seconds()) @@ -293,13 +364,15 @@ func main() { } } - rpm := float64(60) / (totalRTTTime / (float64(actualRTTCount) * 5)) + rpm := float64(60) / (totalRTTTime / (float64(totalRTTsCount) * 5)) + fmt.Printf("Download: %f MBps (%f Mbps), using %d parallel connections.\n", toMBps(downloadSaturation.RateBps), toMbps(downloadSaturation.RateBps), len(downloadSaturation.Lbcs)) + fmt.Printf("Upload: %f MBps (%f Mbps), using %d parallel connections.\n", toMBps(uploadSaturation.RateBps), toMbps(uploadSaturation.RateBps), len(uploadSaturation.Lbcs)) + fmt.Printf("Total RTTs measured: %d\n", totalRTTsCount) fmt.Printf("RPM: %v\n", rpm) cancelOperatingCtx() if *debug { - // Hold on to cool down. - time.Sleep(4 * time.Second) + time.Sleep(time.Duration(cooldownPeriod) * time.Second) } } |
