summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--networkQuality.go238
-rw-r--r--utilities/utilities.go2
2 files changed, 126 insertions, 114 deletions
diff --git a/networkQuality.go b/networkQuality.go
index 673f58b..9231970 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -30,6 +30,33 @@ type ConfigUrls struct {
type Config struct {
Version int
Urls ConfigUrls `json:"urls"`
+ Source string
+}
+
+func (c *Config) Get(configHost string, configPath string) error {
+ configClient := &http.Client{}
+ // Extraneous /s in URLs is normally okay, but the Apple CDN does not
+ // like them. Make sure that we put exactly one (1) / between the host
+ // and the path.
+ if !strings.HasPrefix(configPath, "/") {
+ configPath = "/" + configPath
+ }
+ c.Source = fmt.Sprintf("https://%s%s", configHost, configPath)
+ resp, err := configClient.Get(c.Source)
+ if err != nil {
+ return fmt.Errorf("Error: Could not connect to configuration host %s: %v\n", configHost, err)
+ }
+
+ jsonConfig, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return fmt.Errorf("Error: Could not read configuration content downloaded from %s: %v\n", c.Source, err)
+ }
+
+ err = json.Unmarshal(jsonConfig, c)
+ if err != nil {
+ return fmt.Errorf("Error: Could not parse configuration returned from %s: %v\n", c.Source, err)
+ }
+ return nil
}
func (c *Config) String() string {
@@ -80,131 +107,121 @@ type SaturationResult struct {
Mcs []mc.MeasurableConnection
}
-func saturate(ctx context.Context, saturated chan<- SaturationResult, lbcGenerator func() mc.MeasurableConnection, debug bool) {
- mcs := make([]mc.MeasurableConnection, 0)
- mcsPreviousTransferred := make([]uint64, 0)
+func saturate(ctx context.Context, lbcGenerator func() mc.MeasurableConnection, debug bool) (saturated chan SaturationResult) {
+ saturated = make(chan SaturationResult)
+ go func() {
- // Create 4 load bearing connections
- addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug)
+ mcs := make([]mc.MeasurableConnection, 0)
+ mcsPreviousTransferred := make([]uint64, 0)
- previousFlowIncreaseIteration := uint64(0)
- previousMovingAverage := float64(0)
- movingAverage := ma.NewMovingAverage(4)
- movingAverageAverage := ma.NewMovingAverage(4)
+ // Create 4 load bearing connections
+ addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug)
- for currentIteration := uint64(0); true; currentIteration++ {
+ previousFlowIncreaseIteration := uint64(0)
+ previousMovingAverage := float64(0)
+ movingAverage := ma.NewMovingAverage(4)
+ movingAverageAverage := ma.NewMovingAverage(4)
- // If we are cancelled, then stop.
- if ctx.Err() != nil {
- return
- }
-
- // At each 1-second interval
- time.Sleep(time.Second)
-
- // Compute "instantaneous aggregate" goodput which is the number of bytes transferred within the last second.
- totalTransfer := uint64(0)
- for i := range mcs {
- previousTransferred := mcsPreviousTransferred[i]
- currentTransferred := mcs[i].Transferred()
- totalTransfer += (currentTransferred - previousTransferred)
- mcsPreviousTransferred[i] = currentTransferred
- }
+ nextTime := time.Now().Add(time.Second)
- // Compute a moving average of the last 4 "instantaneous aggregate goodput" measurements
- movingAverage.AddMeasurement(float64(totalTransfer))
- currentMovingAverage := movingAverage.CalculateAverage()
- movingAverageAverage.AddMeasurement(currentMovingAverage)
- movingAverageDelta := utilities.SignedPercentDifference(currentMovingAverage, previousMovingAverage)
- previousMovingAverage = currentMovingAverage
+ for currentIteration := uint64(0); true; currentIteration++ {
- if debug {
- fmt.Printf("Instantaneous goodput: %f MB.\n", toMBs(float64(totalTransfer)))
- fmt.Printf("Moving average: %f MB.\n", toMBs(currentMovingAverage))
- fmt.Printf("Moving average delta: %f.\n", movingAverageDelta)
- }
+ // If we are cancelled, then stop.
+ if ctx.Err() != nil {
+ return
+ }
- // If moving average > "previous" moving average + 5%:
- if currentIteration == 0 || movingAverageDelta > float64(5) {
- // Network did not yet reach saturation. If no flows added within the last 4 seconds, add 4 more flows
- if (currentIteration - previousFlowIncreaseIteration) > 4 {
+ now := time.Now()
+ // At each 1-second interval
+ if nextTime.Second() > now.Second() {
if debug {
- fmt.Printf("Adding flows because we are unsaturated and waited a while.\n")
+ fmt.Printf("Sleeping until %v\n", nextTime)
}
- addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug)
- previousFlowIncreaseIteration = currentIteration
+ time.Sleep(nextTime.Sub(now))
} else {
- if debug {
- fmt.Printf("We are unsaturated, but it still too early to add anything.\n")
- }
+ fmt.Printf("Warning: Missed a one-second deadline.\n")
}
- } else { // Else, network reached saturation for the current flow count.
- // If new flows added and for 4 seconds the moving average throughput did not change: network reached stable saturation
- if (currentIteration-previousFlowIncreaseIteration) < 4 && movingAverageAverage.ConsistentWithin(float64(4)) {
- if debug {
- fmt.Printf("New flows added within the last four seconds and the moving-average average is consistent!\n")
+ nextTime = time.Now().Add(time.Second)
+
+ // Compute "instantaneous aggregate" goodput which is the number of bytes transferred within the last second.
+ totalTransfer := uint64(0)
+ for i := range mcs {
+ previousTransferred := mcsPreviousTransferred[i]
+ currentTransferred := mcs[i].Transferred()
+ totalTransfer += (currentTransferred - previousTransferred)
+ mcsPreviousTransferred[i] = currentTransferred
+ }
+
+ // Compute a moving average of the last 4 "instantaneous aggregate goodput" measurements
+ movingAverage.AddMeasurement(float64(totalTransfer))
+ currentMovingAverage := movingAverage.CalculateAverage()
+ movingAverageAverage.AddMeasurement(currentMovingAverage)
+ movingAverageDelta := utilities.SignedPercentDifference(currentMovingAverage, previousMovingAverage)
+ previousMovingAverage = currentMovingAverage
+
+ if debug {
+ fmt.Printf("Instantaneous goodput: %f MB.\n", toMBs(float64(totalTransfer)))
+ fmt.Printf("Moving average: %f MB.\n", toMBs(currentMovingAverage))
+ fmt.Printf("Moving average delta: %f.\n", movingAverageDelta)
+ }
+
+ // If moving average > "previous" moving average + 5%:
+ if currentIteration == 0 || movingAverageDelta > float64(5) {
+ // Network did not yet reach saturation. If no flows added within the last 4 seconds, add 4 more flows
+ if (currentIteration - previousFlowIncreaseIteration) > 4 {
+ if debug {
+ fmt.Printf("Adding flows because we are unsaturated and waited a while.\n")
+ }
+ addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug)
+ previousFlowIncreaseIteration = currentIteration
+ } else {
+ if debug {
+ fmt.Printf("We are unsaturated, but it still too early to add anything.\n")
+ }
}
- break
- } else {
- // Else, add four more flows
- if debug {
- fmt.Printf("New flows to add to try to increase our saturation!\n")
+ } else { // Else, network reached saturation for the current flow count.
+ // If new flows added and for 4 seconds the moving average throughput did not change: network reached stable saturation
+ if (currentIteration-previousFlowIncreaseIteration) < 4 && movingAverageAverage.ConsistentWithin(float64(4)) {
+ if debug {
+ fmt.Printf("New flows added within the last four seconds and the moving-average average is consistent!\n")
+ }
+ break
+ } else {
+ // Else, add four more flows
+ if debug {
+ fmt.Printf("New flows to add to try to increase our saturation!\n")
+ }
+ addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug)
+ previousFlowIncreaseIteration = currentIteration
}
- addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug)
- previousFlowIncreaseIteration = currentIteration
}
- }
- }
- saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), Mcs: mcs}
+ }
+ saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), Mcs: mcs}
+ }()
+ return
}
func main() {
flag.Parse()
timeoutDuration := time.Second * time.Duration(*timeout)
-
configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort)
+ operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background())
+ config := &Config{}
- if !strings.HasPrefix(*configPath, "/") {
- *configPath = "/" + *configPath
- }
-
- configUrl := fmt.Sprintf("https://%s%s", configHostPort, *configPath)
-
- configClient := &http.Client{}
- resp, err := configClient.Get(configUrl)
- if err != nil {
- fmt.Fprintf(os.Stderr, "Error: Could not connect to configuration host %s: %v\n", configHostPort, err)
- return
- }
-
- jsonConfig, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- fmt.Fprintf(os.Stderr, "Error: Could not read configuration content downloaded from %s: %v\n", configUrl, err)
- return
- }
-
- var config Config
- err = json.Unmarshal(jsonConfig, &config)
- if err != nil {
- fmt.Fprintf(os.Stderr, "Error: Could not parse configuration returned from %s: %v\n", configUrl, err)
+ if err := config.Get(configHostPort, *configPath); err != nil {
+ fmt.Fprintf(os.Stderr, "%s\n", err)
return
}
-
if err := config.IsValid(); err != nil {
- fmt.Fprintf(os.Stderr, "Error: Invalid configuration returned from %s: %v\n", configUrl, err)
+ fmt.Fprintf(os.Stderr, "Error: Invalid configuration returned from %s: %v\n", config.Source, err)
return
}
-
if *debug {
- fmt.Printf("Configuration: %s\n", &config)
+ fmt.Printf("Configuration: %s\n", config)
}
- operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background())
-
- uploadSaturationChannel := make(chan SaturationResult)
- downloadSaturationChannel := make(chan SaturationResult)
timeoutChannel := timeoutat.TimeoutAt(operatingCtx, time.Now().Add(timeoutDuration), *debug)
generate_lbd := func() mc.MeasurableConnection {
@@ -213,15 +230,14 @@ func main() {
generate_lbu := func() mc.MeasurableConnection {
return &mc.LoadBearingUpload{Path: config.Urls.UploadUrl}
}
-
- go saturate(operatingCtx, downloadSaturationChannel, generate_lbd, *debug)
- go saturate(operatingCtx, uploadSaturationChannel, generate_lbu, *debug)
+ downloadSaturationChannel := saturate(operatingCtx, generate_lbd, *debug)
+ uploadSaturationChannel := saturate(operatingCtx, generate_lbu, *debug)
test_timeout := false
upload_saturated := false
download_saturated := false
-
- var downloadSaturation, uploadSaturation SaturationResult
+ downloadSaturation := SaturationResult{}
+ uploadSaturation := SaturationResult{}
for !test_timeout && !(upload_saturated && download_saturated) {
select {
@@ -251,39 +267,35 @@ func main() {
if test_timeout {
cancelOperatingCtx()
- fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation before test time expired.\n.", timeoutDuration)
+ fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation before test time expired (%v).\n.", timeoutDuration)
return
}
- // We are guaranteed to have an upload and download saturation result!
-
robustnessProbeIterationCount := 5
- actualProbeCount := 0
+ actualRTTCount := 0
+ totalRTTTime := float64(0)
- totalProbeTime := float64(0)
for i := 0; i < robustnessProbeIterationCount && !test_timeout; i++ {
- // There are len(downloadSaturation.Mcs) flows with http2 connections that
- // we can piggy back. Let's choose one at random.
- mcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Mcs)
+ randomMcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Mcs)
select {
case <-timeoutChannel:
{
test_timeout = true
}
- case probeTime := <-utilities.TimedSequentialGets(operatingCtx, downloadSaturation.Mcs[mcsIndex].Client(), &http.Client{}, config.Urls.SmallUrl):
+ case fiveRTTsTime := <-utilities.TimedSequentialRTTs(operatingCtx, downloadSaturation.Mcs[randomMcsIndex].Client(), &http.Client{}, config.Urls.SmallUrl):
{
- actualProbeCount++
- totalProbeTime += probeTime.Delay.Seconds()
+ actualRTTCount += 5
+ totalRTTTime += fiveRTTsTime.Delay.Seconds()
if *debug {
- fmt.Printf("probeTime: %v\n", probeTime.Delay.Seconds())
+ fmt.Printf("fiveRTTsTime: %v\n", fiveRTTsTime.Delay.Seconds())
}
}
}
}
- averageProbeTime := totalProbeTime / (float64(actualProbeCount) * 5)
+ rpm := float64(60) / (totalRTTTime / (float64(actualRTTCount) * 5))
- fmt.Printf("RPM: %v\n", float64(60)/averageProbeTime)
+ fmt.Printf("RPM: %v\n", rpm)
cancelOperatingCtx()
if *debug {
diff --git a/utilities/utilities.go b/utilities/utilities.go
index 16585d6..0046544 100644
--- a/utilities/utilities.go
+++ b/utilities/utilities.go
@@ -27,7 +27,7 @@ type GetLatency struct {
Err error
}
-func TimedSequentialGets(ctx context.Context, client_a *http.Client, client_b *http.Client, url string) chan GetLatency {
+func TimedSequentialRTTs(ctx context.Context, client_a *http.Client, client_b *http.Client, url string) chan GetLatency {
responseChannel := make(chan GetLatency)
go func() {
before := time.Now()