summaryrefslogtreecommitdiff
path: root/networkQuality.go
diff options
context:
space:
mode:
Diffstat (limited to 'networkQuality.go')
-rw-r--r--networkQuality.go59
1 files changed, 50 insertions, 9 deletions
diff --git a/networkQuality.go b/networkQuality.go
index e052e1f..3c31000 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -31,7 +31,8 @@ var (
storeSslKeys = flag.Bool("store-ssl-keys", false, "Store SSL keys from connections for debugging. (currently unused)")
// Global configuration
- cooldownPeriod int = 4
+ cooldownPeriod int = 4
+ robustnessProbeIterationCount int = 5
)
type ConfigUrls struct {
@@ -119,7 +120,6 @@ func toMBps(bytes float64) float64 {
func addFlows(ctx context.Context, toAdd uint64, lbcs *[]lbc.LoadBearingConnection, lbcsPreviousTransferred *[]uint64, lbcGenerator func() lbc.LoadBearingConnection, debug bool) {
for i := uint64(0); i < toAdd; i++ {
- //mcs[i] = &mc.LoadBearingUpload{Path: config.Urls.UploadUrl}
*lbcs = append(*lbcs, lbcGenerator())
*lbcsPreviousTransferred = append(*lbcsPreviousTransferred, 0)
if !(*lbcs)[len(*lbcs)-1].Start(ctx, debug) {
@@ -190,6 +190,7 @@ func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGe
// Compute "instantaneous aggregate" goodput which is the number of bytes transferred within the last second.
totalTransfer := uint64(0)
+ allInvalid := true
for i := range lbcs {
if !lbcs[i].IsValid() {
if debug != nil {
@@ -197,12 +198,21 @@ func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGe
}
continue
}
+ allInvalid = false
previousTransferred := lbcsPreviousTransferred[i]
currentTransferred := lbcs[i].Transferred()
totalTransfer += (currentTransferred - previousTransferred)
lbcsPreviousTransferred[i] = currentTransferred
}
+ // For some reason, all the LBCs are invalid. This likely means that the network/server went away.
+ if allInvalid {
+ if debug != nil {
+ fmt.Printf("%v: All LBCs were invalid. Assuming that network/server went away.\n", debug)
+ }
+ break
+ }
+
// Compute a moving average of the last 4 "instantaneous aggregate goodput" measurements
movingAverage.AddMeasurement(float64(totalTransfer))
currentMovingAverage := movingAverage.CalculateAverage()
@@ -267,6 +277,7 @@ func main() {
flag.Parse()
timeoutDuration := time.Second * time.Duration(*timeout)
+ timeoutAbsoluteTime := time.Now().Add(timeoutDuration)
configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort)
operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background())
saturationCtx, cancelSaturationCtx := context.WithCancel(context.Background())
@@ -284,7 +295,10 @@ func main() {
fmt.Printf("Configuration: %s\n", config)
}
- timeoutChannel := timeoutat.TimeoutAt(operatingCtx, time.Now().Add(timeoutDuration), *debug)
+ timeoutChannel := timeoutat.TimeoutAt(operatingCtx, timeoutAbsoluteTime, *debug)
+ if *debug {
+ fmt.Printf("Test will end earlier than %v\n", timeoutAbsoluteTime)
+ }
generate_lbd := func() lbc.LoadBearingConnection {
return &lbc.LoadBearingConnectionDownload{Path: config.Urls.LargeUrl}
@@ -332,7 +346,8 @@ func main() {
return
}
saturationTimeout = true
- timeoutChannel = timeoutat.TimeoutAt(operatingCtx, time.Now().Add(5*time.Second), *debug)
+ timeoutAbsoluteTime = time.Now().Add(5 * time.Second)
+ timeoutChannel = timeoutat.TimeoutAt(operatingCtx, timeoutAbsoluteTime, *debug)
cancelSaturationCtx()
if *debug {
fmt.Printf("################# timeout reaching saturation!\n")
@@ -341,13 +356,35 @@ func main() {
}
}
- robustnessProbeIterationCount := 5
+ // If there was a timeout achieving saturation then we already added another 5 seconds
+ // to the available time for testing. However, if saturated was achieved before the timeout
+ // then we want to give ourselves another 5 seconds to calculate the RPM.
+ if !saturationTimeout {
+ timeoutAbsoluteTime = time.Now().Add(5 * time.Second)
+ timeoutChannel = timeoutat.TimeoutAt(operatingCtx, time.Now().Add(5*time.Second), *debug)
+ }
+
totalRTTsCount := 0
totalRTTTime := float64(0)
rttTimeout := false
for i := 0; i < robustnessProbeIterationCount && !rttTimeout; i++ {
randomLbcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Lbcs)
+ if !downloadSaturation.Lbcs[randomLbcsIndex].IsValid() {
+ if debug != nil {
+ fmt.Printf("%v: The randomly selected download LBC (at index %d) was invalid. Skipping.\n", debug, randomLbcsIndex)
+ }
+
+ // Protect against pathological cases where we continuously select invalid connections and never
+ // do the select below
+ if time.Now().Sub(timeoutAbsoluteTime) > 0 {
+ if *debug {
+ fmt.Printf("Pathologically could not find valid LBCs to use for measurement.\n")
+ }
+ break
+ }
+ continue
+ }
select {
case <-timeoutChannel:
{
@@ -364,12 +401,16 @@ func main() {
}
}
- rpm := float64(60) / (totalRTTTime / (float64(totalRTTsCount) * 5))
-
fmt.Printf("Download: %f MBps (%f Mbps), using %d parallel connections.\n", toMBps(downloadSaturation.RateBps), toMbps(downloadSaturation.RateBps), len(downloadSaturation.Lbcs))
fmt.Printf("Upload: %f MBps (%f Mbps), using %d parallel connections.\n", toMBps(uploadSaturation.RateBps), toMbps(uploadSaturation.RateBps), len(uploadSaturation.Lbcs))
- fmt.Printf("Total RTTs measured: %d\n", totalRTTsCount)
- fmt.Printf("RPM: %v\n", rpm)
+
+ if totalRTTsCount != 0 {
+ rpm := float64(60) / (totalRTTTime / (float64(totalRTTsCount) * 5))
+ fmt.Printf("Total RTTs measured: %d\n", totalRTTsCount)
+ fmt.Printf("RPM: %v\n", rpm)
+ } else {
+ fmt.Printf("Error occurred calculating RPM -- no probe measurements received.\n")
+ }
cancelOperatingCtx()
if *debug {