summaryrefslogtreecommitdiff
path: root/networkQuality.go
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2021-12-15 23:44:19 -0500
committerWill Hawkins <[email protected]>2021-12-15 23:44:19 -0500
commitc24e70fd98bb61ddf735a7f54e278258f2dfc990 (patch)
treedfa9ffba2f46ec7269d77915f7c1cd1739c81e9f /networkQuality.go
parentb74e4d4261d8618ef5f99118d2b784cde2614ca2 (diff)
More general work.
Diffstat (limited to 'networkQuality.go')
-rw-r--r--networkQuality.go205
1 files changed, 139 insertions, 66 deletions
diff --git a/networkQuality.go b/networkQuality.go
index 44d64b1..54c4260 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -21,6 +21,19 @@ import (
"github.com/hawkinsw/goresponsiveness/utilities"
)
+var (
+ // Variables to hold CLI arguments.
+ configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.")
+ configPort = flag.Int("port", 4043, "port number on which to access responsiveness configuration server.")
+ configPath = flag.String("path", "config", "path on the server to the configuration endpoint.")
+ debug = flag.Bool("debug", false, "Enable debugging.")
+ timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.")
+ storeSslKeys = flag.Bool("store-ssl-keys", false, "Store SSL keys from connections for debugging. (currently unused)")
+
+ // Global configuration
+ cooldownPeriod int = 4
+)
+
type ConfigUrls struct {
SmallUrl string `json:"small_https_download_url"`
LargeUrl string `json:"large_https_download_url"`
@@ -28,9 +41,10 @@ type ConfigUrls struct {
}
type Config struct {
- Version int
- Urls ConfigUrls `json:"urls"`
- Source string
+ Version int
+ Urls ConfigUrls `json:"urls"`
+ Source string
+ Test_Endpoint string
}
func (c *Config) Get(configHost string, configPath string) error {
@@ -56,11 +70,30 @@ func (c *Config) Get(configHost string, configPath string) error {
if err != nil {
return fmt.Errorf("Error: Could not parse configuration returned from %s: %v\n", c.Source, err)
}
+
+ //if len(c.Test_Endpoint) != 0 {
+ if false {
+ tempUrl, err := url.Parse(c.Urls.LargeUrl)
+ if err != nil {
+ return fmt.Errorf("Error parsing large_https_download_url: %v", err)
+ }
+ c.Urls.LargeUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path
+ tempUrl, err = url.Parse(c.Urls.SmallUrl)
+ if err != nil {
+ return fmt.Errorf("Error parsing small_https_download_url: %v", err)
+ }
+ c.Urls.SmallUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path
+ tempUrl, err = url.Parse(c.Urls.UploadUrl)
+ if err != nil {
+ return fmt.Errorf("Error parsing https_upload_url: %v", err)
+ }
+ c.Urls.UploadUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path
+ }
return nil
}
func (c *Config) String() string {
- return fmt.Sprintf("Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s", c.Version, c.Urls.SmallUrl, c.Urls.LargeUrl, c.Urls.UploadUrl)
+ return fmt.Sprintf("Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s\nEndpoint: %s\n", c.Version, c.Urls.SmallUrl, c.Urls.LargeUrl, c.Urls.UploadUrl, c.Test_Endpoint)
}
func (c *Config) IsValid() error {
@@ -76,19 +109,13 @@ func (c *Config) IsValid() error {
return nil
}
-func toMBs(bytes float64) float64 {
- return float64(bytes) / float64(1024*1024)
+func toMbps(bytes float64) float64 {
+ return toMBps(bytes) * float64(8)
}
-var (
- // Variables to hold CLI arguments.
- configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.")
- configPort = flag.Int("port", 4043, "port number on which to access responsiveness configuration server.")
- configPath = flag.String("path", "config", "path on the server to the configuration endpoint.")
- debug = flag.Bool("debug", false, "Enable debugging.")
- timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.")
- storeSslKeys = flag.Bool("store-ssl-keys", false, "Store SSL keys from connections for debugging. (currently unused)")
-)
+func toMBps(bytes float64) float64 {
+ return float64(bytes) / float64(1024*1024)
+}
func addFlows(ctx context.Context, toAdd uint64, lbcs *[]lbc.LoadBearingConnection, lbcsPreviousTransferred *[]uint64, lbcGenerator func() lbc.LoadBearingConnection, debug bool) {
for i := uint64(0); i < toAdd; i++ {
@@ -107,7 +134,19 @@ type SaturationResult struct {
Lbcs []lbc.LoadBearingConnection
}
-func saturate(ctx context.Context, lbcGenerator func() lbc.LoadBearingConnection, debug bool) (saturated chan SaturationResult) {
+type Debugging struct {
+ Prefix string
+}
+
+func NewDebugging(prefix string) *Debugging {
+ return &Debugging{Prefix: prefix}
+}
+
+func (d *Debugging) String() string {
+ return d.Prefix
+}
+
+func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGenerator func() lbc.LoadBearingConnection, debug *Debugging) (saturated chan SaturationResult) {
saturated = make(chan SaturationResult)
go func() {
@@ -115,37 +154,49 @@ func saturate(ctx context.Context, lbcGenerator func() lbc.LoadBearingConnection
lbcsPreviousTransferred := make([]uint64, 0)
// Create 4 load bearing connections
- addFlows(ctx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug)
+ addFlows(operatingCtx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil)
previousFlowIncreaseIteration := uint64(0)
previousMovingAverage := float64(0)
movingAverage := ma.NewMovingAverage(4)
movingAverageAverage := ma.NewMovingAverage(4)
- nextTime := time.Now().Add(time.Second)
+ nextSampleStartTime := time.Now().Add(time.Second)
for currentIteration := uint64(0); true; currentIteration++ {
- // If we are cancelled, then stop.
- if ctx.Err() != nil {
+ // When the program stops operating, then stop.
+ if operatingCtx.Err() != nil {
return
}
+ // We may be asked to stop trying to saturate the
+ // network and return our current status.
+ if saturationCtx.Err() != nil {
+ //break
+ }
+
now := time.Now()
// At each 1-second interval
- if nextTime.Second() > now.Second() {
- if debug {
- fmt.Printf("Sleeping until %v\n", nextTime)
+ if nextSampleStartTime.Sub(now) > 0 {
+ if debug != nil {
+ fmt.Printf("%v: Sleeping until %v\n", debug, nextSampleStartTime)
}
- time.Sleep(nextTime.Sub(now))
+ time.Sleep(nextSampleStartTime.Sub(now))
} else {
- fmt.Printf("Warning: Missed a one-second deadline.\n")
+ fmt.Fprintf(os.Stderr, "Warning: Missed a one-second deadline.\n")
}
- nextTime = time.Now().Add(time.Second)
+ nextSampleStartTime = time.Now().Add(time.Second)
// Compute "instantaneous aggregate" goodput which is the number of bytes transferred within the last second.
totalTransfer := uint64(0)
for i := range lbcs {
+ if !lbcs[i].IsValid() {
+ if debug != nil {
+ fmt.Printf("%v: Load-bearing connection at index %d is invalid ... skipping.\n", debug, i)
+ }
+ continue
+ }
previousTransferred := lbcsPreviousTransferred[i]
currentTransferred := lbcs[i].Transferred()
totalTransfer += (currentTransferred - previousTransferred)
@@ -157,41 +208,51 @@ func saturate(ctx context.Context, lbcGenerator func() lbc.LoadBearingConnection
currentMovingAverage := movingAverage.CalculateAverage()
movingAverageAverage.AddMeasurement(currentMovingAverage)
movingAverageDelta := utilities.SignedPercentDifference(currentMovingAverage, previousMovingAverage)
+
+ if debug != nil {
+ fmt.Printf("%v: Instantaneous goodput: %f MB.\n", debug, toMBps(float64(totalTransfer)))
+ fmt.Printf("%v: Previous moving average: %f MB.\n", debug, toMBps(previousMovingAverage))
+ fmt.Printf("%v: Current moving average: %f MB.\n", debug, toMBps(currentMovingAverage))
+ fmt.Printf("%v: Moving average delta: %f.\n", debug, movingAverageDelta)
+ }
+
previousMovingAverage = currentMovingAverage
- if debug {
- fmt.Printf("Instantaneous goodput: %f MB.\n", toMBs(float64(totalTransfer)))
- fmt.Printf("Moving average: %f MB.\n", toMBs(currentMovingAverage))
- fmt.Printf("Moving average delta: %f.\n", movingAverageDelta)
+ // Special case: We won't make any adjustments on the first iteration.
+ if currentIteration == 0 {
+ continue
}
// If moving average > "previous" moving average + 5%:
- if currentIteration == 0 || movingAverageDelta > float64(5) {
+ if movingAverageDelta > float64(5) {
// Network did not yet reach saturation. If no flows added within the last 4 seconds, add 4 more flows
if (currentIteration - previousFlowIncreaseIteration) > 4 {
- if debug {
- fmt.Printf("Adding flows because we are unsaturated and waited a while.\n")
+ if debug != nil {
+ fmt.Printf("%v: Adding flows because we are unsaturated and waited a while.\n", debug)
}
- addFlows(ctx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug)
+ addFlows(operatingCtx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil)
previousFlowIncreaseIteration = currentIteration
} else {
- if debug {
- fmt.Printf("We are unsaturated, but it still too early to add anything.\n")
+ if debug != nil {
+ fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debug)
}
}
} else { // Else, network reached saturation for the current flow count.
+ if debug != nil {
+ fmt.Printf("%v: Network reached saturation with current flow count.\n", debug)
+ }
// If new flows added and for 4 seconds the moving average throughput did not change: network reached stable saturation
- if (currentIteration-previousFlowIncreaseIteration) < 4 && movingAverageAverage.ConsistentWithin(float64(4)) {
- if debug {
- fmt.Printf("New flows added within the last four seconds and the moving-average average is consistent!\n")
+ if (currentIteration-previousFlowIncreaseIteration) < 4 && movingAverageAverage.IncreasesLessThan(float64(5)) {
+ if debug != nil {
+ fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debug)
}
break
} else {
// Else, add four more flows
- if debug {
- fmt.Printf("New flows to add to try to increase our saturation!\n")
+ if debug != nil {
+ fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debug)
}
- addFlows(ctx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug)
+ addFlows(operatingCtx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil)
previousFlowIncreaseIteration = currentIteration
}
}
@@ -208,6 +269,7 @@ func main() {
timeoutDuration := time.Second * time.Duration(*timeout)
configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort)
operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background())
+ saturationCtx, cancelSaturationCtx := context.WithCancel(context.Background())
config := &Config{}
if err := config.Get(configHostPort, *configPath); err != nil {
@@ -230,34 +292,48 @@ func main() {
generate_lbu := func() lbc.LoadBearingConnection {
return &lbc.LoadBearingConnectionUpload{Path: config.Urls.UploadUrl}
}
- downloadSaturationChannel := saturate(operatingCtx, generate_lbd, *debug)
- uploadSaturationChannel := saturate(operatingCtx, generate_lbu, *debug)
- test_timeout := false
- upload_saturated := false
- download_saturated := false
+ downloadSaturationChannel := saturate(saturationCtx, operatingCtx, generate_lbd, NewDebugging("download"))
+ uploadSaturationChannel := saturate(saturationCtx, operatingCtx, generate_lbu, NewDebugging("upload"))
+
+ saturationTimeout := false
+ uploadSaturated := false
+ downloadSaturated := false
downloadSaturation := SaturationResult{}
uploadSaturation := SaturationResult{}
- for !test_timeout && !(upload_saturated && download_saturated) {
+ for !(uploadSaturated && downloadSaturated) {
select {
case downloadSaturation = <-downloadSaturationChannel:
{
- download_saturated = true
+ downloadSaturated = true
if *debug {
- fmt.Printf("################# download is saturated (%fMBps, %d flows)!\n", toMBs(downloadSaturation.RateBps), len(downloadSaturation.Lbcs))
+ fmt.Printf("################# download is %s saturated (%fMBps, %d flows)!\n", utilities.Conditional(saturationTimeout, "(provisionally)", ""), toMBps(downloadSaturation.RateBps), len(downloadSaturation.Lbcs))
}
}
case uploadSaturation = <-uploadSaturationChannel:
{
- upload_saturated = true
+ uploadSaturated = true
if *debug {
- fmt.Printf("################# upload is saturated (%fMBps, %d flows)!\n", toMBs(uploadSaturation.RateBps), len(uploadSaturation.Lbcs))
+ fmt.Printf("################# upload is %s saturated (%fMBps, %d flows)!\n", utilities.Conditional(saturationTimeout, "(provisionally)", ""), toMBps(uploadSaturation.RateBps), len(uploadSaturation.Lbcs))
}
}
case <-timeoutChannel:
{
- test_timeout = true
+ if saturationTimeout {
+ // We already timedout on saturation. This signal means that
+ // we are timedout on getting the provisional saturation. We
+ // will exit!
+ fmt.Fprint(os.Stderr, "Error: Saturation could not be completed in time and no provisional rates could be accessed. Test failed.\n")
+ cancelOperatingCtx()
+ if *debug {
+ time.Sleep(time.Duration(cooldownPeriod) * time.Second)
+ }
+ return
+ }
+ saturationTimeout = true
+ timeoutChannel = timeoutat.TimeoutAt(operatingCtx, time.Now().Add(5*time.Second), *debug)
+ cancelSaturationCtx()
if *debug {
fmt.Printf("################# timeout reaching saturation!\n")
}
@@ -265,26 +341,21 @@ func main() {
}
}
- if test_timeout {
- cancelOperatingCtx()
- fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation before test time expired (%v).\n.", timeoutDuration)
- return
- }
-
robustnessProbeIterationCount := 5
- actualRTTCount := 0
+ totalRTTsCount := 0
totalRTTTime := float64(0)
+ rttTimeout := false
- for i := 0; i < robustnessProbeIterationCount && !test_timeout; i++ {
+ for i := 0; i < robustnessProbeIterationCount && !rttTimeout; i++ {
randomLbcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Lbcs)
select {
case <-timeoutChannel:
{
- test_timeout = true
+ rttTimeout = true
}
case fiveRTTsTime := <-utilities.TimedSequentialRTTs(operatingCtx, downloadSaturation.Lbcs[randomLbcsIndex].Client(), &http.Client{}, config.Urls.SmallUrl):
{
- actualRTTCount += 5
+ totalRTTsCount += 5
totalRTTTime += fiveRTTsTime.Delay.Seconds()
if *debug {
fmt.Printf("fiveRTTsTime: %v\n", fiveRTTsTime.Delay.Seconds())
@@ -293,13 +364,15 @@ func main() {
}
}
- rpm := float64(60) / (totalRTTTime / (float64(actualRTTCount) * 5))
+ rpm := float64(60) / (totalRTTTime / (float64(totalRTTsCount) * 5))
+ fmt.Printf("Download: %f MBps (%f Mbps), using %d parallel connections.\n", toMBps(downloadSaturation.RateBps), toMbps(downloadSaturation.RateBps), len(downloadSaturation.Lbcs))
+ fmt.Printf("Upload: %f MBps (%f Mbps), using %d parallel connections.\n", toMBps(uploadSaturation.RateBps), toMbps(uploadSaturation.RateBps), len(uploadSaturation.Lbcs))
+ fmt.Printf("Total RTTs measured: %d\n", totalRTTsCount)
fmt.Printf("RPM: %v\n", rpm)
cancelOperatingCtx()
if *debug {
- // Hold on to cool down.
- time.Sleep(4 * time.Second)
+ time.Sleep(time.Duration(cooldownPeriod) * time.Second)
}
}