summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lbc/lbc.go5
-rw-r--r--ma/ma.go7
-rw-r--r--networkQuality.go59
3 files changed, 58 insertions, 13 deletions
diff --git a/lbc/lbc.go b/lbc/lbc.go
index d1cfdc8..4542676 100644
--- a/lbc/lbc.go
+++ b/lbc/lbc.go
@@ -133,9 +133,10 @@ func (s *syntheticCountingReader) Read(p []byte) (n int, err error) {
func (lbu *LoadBearingConnectionUpload) doUpload(ctx context.Context) bool {
lbu.uploaded = 0
s := &syntheticCountingReader{n: &lbu.uploaded, ctx: ctx}
- resp, _ := lbu.client.Post(lbu.Path, "application/octet-stream", s)
+ if resp, err := lbu.client.Post(lbu.Path, "application/octet-stream", s); err == nil {
+ resp.Body.Close()
+ }
lbu.valid = false
- resp.Body.Close()
if lbu.debug {
fmt.Printf("Ending a load-bearing upload.\n")
}
diff --git a/ma/ma.go b/ma/ma.go
index 8d06a67..f11dbbe 100644
--- a/ma/ma.go
+++ b/ma/ma.go
@@ -21,6 +21,7 @@ func NewMovingAverage(intervals int) *MovingAverage {
func (ma *MovingAverage) AddMeasurement(measurement float64) {
ma.instants[ma.index] = measurement
ma.divisor.Add(1)
+ // Invariant: ma.index always points to the oldest measurement
ma.index = (ma.index + 1) % ma.intervals
}
@@ -40,9 +41,11 @@ func (ma *MovingAverage) AllSequentialIncreasesLessThan(limit float64) bool {
return false
}
- previous := ma.instants[ma.index]
+ // Invariant: ma.index always points to the oldest (see AddMeasurement above)
+ oldestIndex := ma.index
+ previous := ma.instants[oldestIndex]
for i := 1; i < ma.intervals; i++ {
- currentIndex := (ma.index + i) % ma.intervals
+ currentIndex := (oldestIndex + i) % ma.intervals
current := ma.instants[currentIndex]
percentChange := utilities.SignedPercentDifference(current, previous)
previous = current
diff --git a/networkQuality.go b/networkQuality.go
index e052e1f..3c31000 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -31,7 +31,8 @@ var (
storeSslKeys = flag.Bool("store-ssl-keys", false, "Store SSL keys from connections for debugging. (currently unused)")
// Global configuration
- cooldownPeriod int = 4
+ cooldownPeriod int = 4
+ robustnessProbeIterationCount int = 5
)
type ConfigUrls struct {
@@ -119,7 +120,6 @@ func toMBps(bytes float64) float64 {
func addFlows(ctx context.Context, toAdd uint64, lbcs *[]lbc.LoadBearingConnection, lbcsPreviousTransferred *[]uint64, lbcGenerator func() lbc.LoadBearingConnection, debug bool) {
for i := uint64(0); i < toAdd; i++ {
- //mcs[i] = &mc.LoadBearingUpload{Path: config.Urls.UploadUrl}
*lbcs = append(*lbcs, lbcGenerator())
*lbcsPreviousTransferred = append(*lbcsPreviousTransferred, 0)
if !(*lbcs)[len(*lbcs)-1].Start(ctx, debug) {
@@ -190,6 +190,7 @@ func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGe
// Compute "instantaneous aggregate" goodput which is the number of bytes transferred within the last second.
totalTransfer := uint64(0)
+ allInvalid := true
for i := range lbcs {
if !lbcs[i].IsValid() {
if debug != nil {
@@ -197,12 +198,21 @@ func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGe
}
continue
}
+ allInvalid = false
previousTransferred := lbcsPreviousTransferred[i]
currentTransferred := lbcs[i].Transferred()
totalTransfer += (currentTransferred - previousTransferred)
lbcsPreviousTransferred[i] = currentTransferred
}
+ // For some reason, all the LBCs are invalid. This likely means that the network/server went away.
+ if allInvalid {
+ if debug != nil {
+ fmt.Printf("%v: All LBCs were invalid. Assuming that network/server went away.\n", debug)
+ }
+ break
+ }
+
// Compute a moving average of the last 4 "instantaneous aggregate goodput" measurements
movingAverage.AddMeasurement(float64(totalTransfer))
currentMovingAverage := movingAverage.CalculateAverage()
@@ -267,6 +277,7 @@ func main() {
flag.Parse()
timeoutDuration := time.Second * time.Duration(*timeout)
+ timeoutAbsoluteTime := time.Now().Add(timeoutDuration)
configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort)
operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background())
saturationCtx, cancelSaturationCtx := context.WithCancel(context.Background())
@@ -284,7 +295,10 @@ func main() {
fmt.Printf("Configuration: %s\n", config)
}
- timeoutChannel := timeoutat.TimeoutAt(operatingCtx, time.Now().Add(timeoutDuration), *debug)
+ timeoutChannel := timeoutat.TimeoutAt(operatingCtx, timeoutAbsoluteTime, *debug)
+ if *debug {
+ fmt.Printf("Test will end earlier than %v\n", timeoutAbsoluteTime)
+ }
generate_lbd := func() lbc.LoadBearingConnection {
return &lbc.LoadBearingConnectionDownload{Path: config.Urls.LargeUrl}
@@ -332,7 +346,8 @@ func main() {
return
}
saturationTimeout = true
- timeoutChannel = timeoutat.TimeoutAt(operatingCtx, time.Now().Add(5*time.Second), *debug)
+ timeoutAbsoluteTime = time.Now().Add(5 * time.Second)
+ timeoutChannel = timeoutat.TimeoutAt(operatingCtx, timeoutAbsoluteTime, *debug)
cancelSaturationCtx()
if *debug {
fmt.Printf("################# timeout reaching saturation!\n")
@@ -341,13 +356,35 @@ func main() {
}
}
- robustnessProbeIterationCount := 5
+ // If there was a timeout achieving saturation then we already added another 5 seconds
+ // to the available time for testing. However, if saturated was achieved before the timeout
+ // then we want to give ourselves another 5 seconds to calculate the RPM.
+ if !saturationTimeout {
+ timeoutAbsoluteTime = time.Now().Add(5 * time.Second)
+ timeoutChannel = timeoutat.TimeoutAt(operatingCtx, time.Now().Add(5*time.Second), *debug)
+ }
+
totalRTTsCount := 0
totalRTTTime := float64(0)
rttTimeout := false
for i := 0; i < robustnessProbeIterationCount && !rttTimeout; i++ {
randomLbcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Lbcs)
+ if !downloadSaturation.Lbcs[randomLbcsIndex].IsValid() {
+ if debug != nil {
+ fmt.Printf("%v: The randomly selected download LBC (at index %d) was invalid. Skipping.\n", debug, randomLbcsIndex)
+ }
+
+ // Protect against pathological cases where we continuously select invalid connections and never
+ // do the select below
+ if time.Now().Sub(timeoutAbsoluteTime) > 0 {
+ if *debug {
+ fmt.Printf("Pathologically could not find valid LBCs to use for measurement.\n")
+ }
+ break
+ }
+ continue
+ }
select {
case <-timeoutChannel:
{
@@ -364,12 +401,16 @@ func main() {
}
}
- rpm := float64(60) / (totalRTTTime / (float64(totalRTTsCount) * 5))
-
fmt.Printf("Download: %f MBps (%f Mbps), using %d parallel connections.\n", toMBps(downloadSaturation.RateBps), toMbps(downloadSaturation.RateBps), len(downloadSaturation.Lbcs))
fmt.Printf("Upload: %f MBps (%f Mbps), using %d parallel connections.\n", toMBps(uploadSaturation.RateBps), toMbps(uploadSaturation.RateBps), len(uploadSaturation.Lbcs))
- fmt.Printf("Total RTTs measured: %d\n", totalRTTsCount)
- fmt.Printf("RPM: %v\n", rpm)
+
+ if totalRTTsCount != 0 {
+ rpm := float64(60) / (totalRTTTime / (float64(totalRTTsCount) * 5))
+ fmt.Printf("Total RTTs measured: %d\n", totalRTTsCount)
+ fmt.Printf("RPM: %v\n", rpm)
+ } else {
+ fmt.Printf("Error occurred calculating RPM -- no probe measurements received.\n")
+ }
cancelOperatingCtx()
if *debug {