diff options
Diffstat (limited to 'networkQuality.go')
| -rw-r--r-- | networkQuality.go | 59 |
1 files changed, 50 insertions, 9 deletions
diff --git a/networkQuality.go b/networkQuality.go index e052e1f..3c31000 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -31,7 +31,8 @@ var ( storeSslKeys = flag.Bool("store-ssl-keys", false, "Store SSL keys from connections for debugging. (currently unused)") // Global configuration - cooldownPeriod int = 4 + cooldownPeriod int = 4 + robustnessProbeIterationCount int = 5 ) type ConfigUrls struct { @@ -119,7 +120,6 @@ func toMBps(bytes float64) float64 { func addFlows(ctx context.Context, toAdd uint64, lbcs *[]lbc.LoadBearingConnection, lbcsPreviousTransferred *[]uint64, lbcGenerator func() lbc.LoadBearingConnection, debug bool) { for i := uint64(0); i < toAdd; i++ { - //mcs[i] = &mc.LoadBearingUpload{Path: config.Urls.UploadUrl} *lbcs = append(*lbcs, lbcGenerator()) *lbcsPreviousTransferred = append(*lbcsPreviousTransferred, 0) if !(*lbcs)[len(*lbcs)-1].Start(ctx, debug) { @@ -190,6 +190,7 @@ func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGe // Compute "instantaneous aggregate" goodput which is the number of bytes transferred within the last second. totalTransfer := uint64(0) + allInvalid := true for i := range lbcs { if !lbcs[i].IsValid() { if debug != nil { @@ -197,12 +198,21 @@ func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGe } continue } + allInvalid = false previousTransferred := lbcsPreviousTransferred[i] currentTransferred := lbcs[i].Transferred() totalTransfer += (currentTransferred - previousTransferred) lbcsPreviousTransferred[i] = currentTransferred } + // For some reason, all the LBCs are invalid. This likely means that the network/server went away. + if allInvalid { + if debug != nil { + fmt.Printf("%v: All LBCs were invalid. Assuming that network/server went away.\n", debug) + } + break + } + // Compute a moving average of the last 4 "instantaneous aggregate goodput" measurements movingAverage.AddMeasurement(float64(totalTransfer)) currentMovingAverage := movingAverage.CalculateAverage() @@ -267,6 +277,7 @@ func main() { flag.Parse() timeoutDuration := time.Second * time.Duration(*timeout) + timeoutAbsoluteTime := time.Now().Add(timeoutDuration) configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort) operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background()) saturationCtx, cancelSaturationCtx := context.WithCancel(context.Background()) @@ -284,7 +295,10 @@ func main() { fmt.Printf("Configuration: %s\n", config) } - timeoutChannel := timeoutat.TimeoutAt(operatingCtx, time.Now().Add(timeoutDuration), *debug) + timeoutChannel := timeoutat.TimeoutAt(operatingCtx, timeoutAbsoluteTime, *debug) + if *debug { + fmt.Printf("Test will end earlier than %v\n", timeoutAbsoluteTime) + } generate_lbd := func() lbc.LoadBearingConnection { return &lbc.LoadBearingConnectionDownload{Path: config.Urls.LargeUrl} @@ -332,7 +346,8 @@ func main() { return } saturationTimeout = true - timeoutChannel = timeoutat.TimeoutAt(operatingCtx, time.Now().Add(5*time.Second), *debug) + timeoutAbsoluteTime = time.Now().Add(5 * time.Second) + timeoutChannel = timeoutat.TimeoutAt(operatingCtx, timeoutAbsoluteTime, *debug) cancelSaturationCtx() if *debug { fmt.Printf("################# timeout reaching saturation!\n") @@ -341,13 +356,35 @@ func main() { } } - robustnessProbeIterationCount := 5 + // If there was a timeout achieving saturation then we already added another 5 seconds + // to the available time for testing. However, if saturated was achieved before the timeout + // then we want to give ourselves another 5 seconds to calculate the RPM. + if !saturationTimeout { + timeoutAbsoluteTime = time.Now().Add(5 * time.Second) + timeoutChannel = timeoutat.TimeoutAt(operatingCtx, time.Now().Add(5*time.Second), *debug) + } + totalRTTsCount := 0 totalRTTTime := float64(0) rttTimeout := false for i := 0; i < robustnessProbeIterationCount && !rttTimeout; i++ { randomLbcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Lbcs) + if !downloadSaturation.Lbcs[randomLbcsIndex].IsValid() { + if debug != nil { + fmt.Printf("%v: The randomly selected download LBC (at index %d) was invalid. Skipping.\n", debug, randomLbcsIndex) + } + + // Protect against pathological cases where we continuously select invalid connections and never + // do the select below + if time.Now().Sub(timeoutAbsoluteTime) > 0 { + if *debug { + fmt.Printf("Pathologically could not find valid LBCs to use for measurement.\n") + } + break + } + continue + } select { case <-timeoutChannel: { @@ -364,12 +401,16 @@ func main() { } } - rpm := float64(60) / (totalRTTTime / (float64(totalRTTsCount) * 5)) - fmt.Printf("Download: %f MBps (%f Mbps), using %d parallel connections.\n", toMBps(downloadSaturation.RateBps), toMbps(downloadSaturation.RateBps), len(downloadSaturation.Lbcs)) fmt.Printf("Upload: %f MBps (%f Mbps), using %d parallel connections.\n", toMBps(uploadSaturation.RateBps), toMbps(uploadSaturation.RateBps), len(uploadSaturation.Lbcs)) - fmt.Printf("Total RTTs measured: %d\n", totalRTTsCount) - fmt.Printf("RPM: %v\n", rpm) + + if totalRTTsCount != 0 { + rpm := float64(60) / (totalRTTTime / (float64(totalRTTsCount) * 5)) + fmt.Printf("Total RTTs measured: %d\n", totalRTTsCount) + fmt.Printf("RPM: %v\n", rpm) + } else { + fmt.Printf("Error occurred calculating RPM -- no probe measurements received.\n") + } cancelOperatingCtx() if *debug { |
