diff options
| -rw-r--r-- | lbc/lbc.go | 5 | ||||
| -rw-r--r-- | ma/ma.go | 7 | ||||
| -rw-r--r-- | networkQuality.go | 59 |
3 files changed, 58 insertions, 13 deletions
@@ -133,9 +133,10 @@ func (s *syntheticCountingReader) Read(p []byte) (n int, err error) { func (lbu *LoadBearingConnectionUpload) doUpload(ctx context.Context) bool { lbu.uploaded = 0 s := &syntheticCountingReader{n: &lbu.uploaded, ctx: ctx} - resp, _ := lbu.client.Post(lbu.Path, "application/octet-stream", s) + if resp, err := lbu.client.Post(lbu.Path, "application/octet-stream", s); err == nil { + resp.Body.Close() + } lbu.valid = false - resp.Body.Close() if lbu.debug { fmt.Printf("Ending a load-bearing upload.\n") } @@ -21,6 +21,7 @@ func NewMovingAverage(intervals int) *MovingAverage { func (ma *MovingAverage) AddMeasurement(measurement float64) { ma.instants[ma.index] = measurement ma.divisor.Add(1) + // Invariant: ma.index always points to the oldest measurement ma.index = (ma.index + 1) % ma.intervals } @@ -40,9 +41,11 @@ func (ma *MovingAverage) AllSequentialIncreasesLessThan(limit float64) bool { return false } - previous := ma.instants[ma.index] + // Invariant: ma.index always points to the oldest (see AddMeasurement above) + oldestIndex := ma.index + previous := ma.instants[oldestIndex] for i := 1; i < ma.intervals; i++ { - currentIndex := (ma.index + i) % ma.intervals + currentIndex := (oldestIndex + i) % ma.intervals current := ma.instants[currentIndex] percentChange := utilities.SignedPercentDifference(current, previous) previous = current diff --git a/networkQuality.go b/networkQuality.go index e052e1f..3c31000 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -31,7 +31,8 @@ var ( storeSslKeys = flag.Bool("store-ssl-keys", false, "Store SSL keys from connections for debugging. (currently unused)") // Global configuration - cooldownPeriod int = 4 + cooldownPeriod int = 4 + robustnessProbeIterationCount int = 5 ) type ConfigUrls struct { @@ -119,7 +120,6 @@ func toMBps(bytes float64) float64 { func addFlows(ctx context.Context, toAdd uint64, lbcs *[]lbc.LoadBearingConnection, lbcsPreviousTransferred *[]uint64, lbcGenerator func() lbc.LoadBearingConnection, debug bool) { for i := uint64(0); i < toAdd; i++ { - //mcs[i] = &mc.LoadBearingUpload{Path: config.Urls.UploadUrl} *lbcs = append(*lbcs, lbcGenerator()) *lbcsPreviousTransferred = append(*lbcsPreviousTransferred, 0) if !(*lbcs)[len(*lbcs)-1].Start(ctx, debug) { @@ -190,6 +190,7 @@ func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGe // Compute "instantaneous aggregate" goodput which is the number of bytes transferred within the last second. totalTransfer := uint64(0) + allInvalid := true for i := range lbcs { if !lbcs[i].IsValid() { if debug != nil { @@ -197,12 +198,21 @@ func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGe } continue } + allInvalid = false previousTransferred := lbcsPreviousTransferred[i] currentTransferred := lbcs[i].Transferred() totalTransfer += (currentTransferred - previousTransferred) lbcsPreviousTransferred[i] = currentTransferred } + // For some reason, all the LBCs are invalid. This likely means that the network/server went away. + if allInvalid { + if debug != nil { + fmt.Printf("%v: All LBCs were invalid. Assuming that network/server went away.\n", debug) + } + break + } + // Compute a moving average of the last 4 "instantaneous aggregate goodput" measurements movingAverage.AddMeasurement(float64(totalTransfer)) currentMovingAverage := movingAverage.CalculateAverage() @@ -267,6 +277,7 @@ func main() { flag.Parse() timeoutDuration := time.Second * time.Duration(*timeout) + timeoutAbsoluteTime := time.Now().Add(timeoutDuration) configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort) operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background()) saturationCtx, cancelSaturationCtx := context.WithCancel(context.Background()) @@ -284,7 +295,10 @@ func main() { fmt.Printf("Configuration: %s\n", config) } - timeoutChannel := timeoutat.TimeoutAt(operatingCtx, time.Now().Add(timeoutDuration), *debug) + timeoutChannel := timeoutat.TimeoutAt(operatingCtx, timeoutAbsoluteTime, *debug) + if *debug { + fmt.Printf("Test will end earlier than %v\n", timeoutAbsoluteTime) + } generate_lbd := func() lbc.LoadBearingConnection { return &lbc.LoadBearingConnectionDownload{Path: config.Urls.LargeUrl} @@ -332,7 +346,8 @@ func main() { return } saturationTimeout = true - timeoutChannel = timeoutat.TimeoutAt(operatingCtx, time.Now().Add(5*time.Second), *debug) + timeoutAbsoluteTime = time.Now().Add(5 * time.Second) + timeoutChannel = timeoutat.TimeoutAt(operatingCtx, timeoutAbsoluteTime, *debug) cancelSaturationCtx() if *debug { fmt.Printf("################# timeout reaching saturation!\n") @@ -341,13 +356,35 @@ func main() { } } - robustnessProbeIterationCount := 5 + // If there was a timeout achieving saturation then we already added another 5 seconds + // to the available time for testing. However, if saturated was achieved before the timeout + // then we want to give ourselves another 5 seconds to calculate the RPM. + if !saturationTimeout { + timeoutAbsoluteTime = time.Now().Add(5 * time.Second) + timeoutChannel = timeoutat.TimeoutAt(operatingCtx, time.Now().Add(5*time.Second), *debug) + } + totalRTTsCount := 0 totalRTTTime := float64(0) rttTimeout := false for i := 0; i < robustnessProbeIterationCount && !rttTimeout; i++ { randomLbcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Lbcs) + if !downloadSaturation.Lbcs[randomLbcsIndex].IsValid() { + if debug != nil { + fmt.Printf("%v: The randomly selected download LBC (at index %d) was invalid. Skipping.\n", debug, randomLbcsIndex) + } + + // Protect against pathological cases where we continuously select invalid connections and never + // do the select below + if time.Now().Sub(timeoutAbsoluteTime) > 0 { + if *debug { + fmt.Printf("Pathologically could not find valid LBCs to use for measurement.\n") + } + break + } + continue + } select { case <-timeoutChannel: { @@ -364,12 +401,16 @@ func main() { } } - rpm := float64(60) / (totalRTTTime / (float64(totalRTTsCount) * 5)) - fmt.Printf("Download: %f MBps (%f Mbps), using %d parallel connections.\n", toMBps(downloadSaturation.RateBps), toMbps(downloadSaturation.RateBps), len(downloadSaturation.Lbcs)) fmt.Printf("Upload: %f MBps (%f Mbps), using %d parallel connections.\n", toMBps(uploadSaturation.RateBps), toMbps(uploadSaturation.RateBps), len(uploadSaturation.Lbcs)) - fmt.Printf("Total RTTs measured: %d\n", totalRTTsCount) - fmt.Printf("RPM: %v\n", rpm) + + if totalRTTsCount != 0 { + rpm := float64(60) / (totalRTTTime / (float64(totalRTTsCount) * 5)) + fmt.Printf("Total RTTs measured: %d\n", totalRTTsCount) + fmt.Printf("RPM: %v\n", rpm) + } else { + fmt.Printf("Error occurred calculating RPM -- no probe measurements received.\n") + } cancelOperatingCtx() if *debug { |
