diff options
| -rw-r--r-- | lbc/lbc.go | 69 | ||||
| -rw-r--r-- | ma/ma.go | 8 | ||||
| -rw-r--r-- | networkQuality.go | 205 |
3 files changed, 192 insertions, 90 deletions
@@ -6,24 +6,32 @@ import ( "io" "io/ioutil" "net/http" + "sync/atomic" ) -var chunkSize int = 5000 +var chunkSize int = 50 type LoadBearingConnection interface { Start(context.Context, bool) bool Transferred() uint64 Client() *http.Client + IsValid() bool } type LoadBearingConnectionDownload struct { Path string downloaded uint64 client *http.Client + debug bool + valid bool } func (lbd *LoadBearingConnectionDownload) Transferred() uint64 { - return lbd.downloaded + transferred := atomic.LoadUint64(&lbd.downloaded) + if lbd.debug { + fmt.Printf("download: Transferred: %v\n", transferred) + } + return transferred } func (lbd *LoadBearingConnectionDownload) Client() *http.Client { @@ -33,6 +41,8 @@ func (lbd *LoadBearingConnectionDownload) Client() *http.Client { func (lbd *LoadBearingConnectionDownload) Start(ctx context.Context, debug bool) bool { lbd.downloaded = 0 lbd.client = &http.Client{} + lbd.debug = debug + lbd.valid = true // At some point this might be useful: It is a snippet of code that will enable // logging of per-session TLS key material in order to make debugging easier in @@ -51,26 +61,31 @@ func (lbd *LoadBearingConnectionDownload) Start(ctx context.Context, debug bool) */ if debug { - fmt.Printf("Started a load bearing download.\n") + fmt.Printf("Started a load-bearing download.\n") } - go doDownload(ctx, lbd.client, lbd.Path, &lbd.downloaded, debug) + go lbd.doDownload(ctx) return true } +func (lbd *LoadBearingConnectionDownload) IsValid() bool { + return lbd.valid +} -func doDownload(ctx context.Context, client *http.Client, path string, count *uint64, debug bool) { - get, err := client.Get(path) +func (lbd *LoadBearingConnectionDownload) doDownload(ctx context.Context) { + get, err := lbd.client.Get(lbd.Path) if err != nil { + lbd.valid = false return } for ctx.Err() == nil { n, err := io.CopyN(ioutil.Discard, get.Body, int64(chunkSize)) if err != nil { + lbd.valid = false break } - *count += uint64(n) + atomic.AddUint64(&lbd.downloaded, uint64(n)) } get.Body.Close() - if debug { + if lbd.debug { fmt.Printf("Ending a load-bearing download.\n") } } @@ -79,14 +94,24 @@ type LoadBearingConnectionUpload struct { Path string uploaded uint64 client *http.Client + debug bool + valid bool } func (lbu *LoadBearingConnectionUpload) Transferred() uint64 { - return lbu.uploaded + transferred := atomic.LoadUint64(&lbu.uploaded) + if lbu.debug { + fmt.Printf("upload: Transferred: %v\n", transferred) + } + return transferred } -func (lbd *LoadBearingConnectionUpload) Client() *http.Client { - return lbd.client +func (lbu *LoadBearingConnectionUpload) Client() *http.Client { + return lbu.client +} + +func (lbu *LoadBearingConnectionUpload) IsValid() bool { + return lbu.valid } type syntheticCountingReader struct { @@ -101,16 +126,17 @@ func (s *syntheticCountingReader) Read(p []byte) (n int, err error) { err = nil n = len(p) n = chunkSize - *s.n += uint64(n) + atomic.AddUint64(s.n, uint64(n)) return } -func doUpload(ctx context.Context, client *http.Client, path string, count *uint64, debug bool) bool { - *count = 0 - s := &syntheticCountingReader{n: count, ctx: ctx} - resp, _ := client.Post(path, "application/octet-stream", s) +func (lbu *LoadBearingConnectionUpload) doUpload(ctx context.Context) bool { + lbu.uploaded = 0 + s := &syntheticCountingReader{n: &lbu.uploaded, ctx: ctx} + resp, _ := lbu.client.Post(lbu.Path, "application/octet-stream", s) + lbu.valid = false resp.Body.Close() - if debug { + if lbu.debug { fmt.Printf("Ending a load-bearing upload.\n") } return true @@ -119,7 +145,12 @@ func doUpload(ctx context.Context, client *http.Client, path string, count *uint func (lbu *LoadBearingConnectionUpload) Start(ctx context.Context, debug bool) bool { lbu.uploaded = 0 lbu.client = &http.Client{} - fmt.Printf("Started a load bearing upload.\n") - go doUpload(ctx, lbu.client, lbu.Path, &lbu.uploaded, debug) + lbu.debug = debug + lbu.valid = true + + if debug { + fmt.Printf("Started a load-bearing upload.\n") + } + go lbu.doUpload(ctx) return true } @@ -1,8 +1,6 @@ package ma import ( - "math" - "github.com/hawkinsw/goresponsiveness/saturating" "github.com/hawkinsw/goresponsiveness/utilities" ) @@ -34,13 +32,13 @@ func (ma *MovingAverage) CalculateAverage() float64 { return float64(total) / float64(ma.divisor.Value()) } -func (ma *MovingAverage) ConsistentWithin(limit float64) bool { +func (ma *MovingAverage) IncreasesLessThan(limit float64) bool { previous := ma.instants[0] for i := 1; i < ma.intervals; i++ { current := ma.instants[i] - percentChange := utilities.AbsPercentDifference(current, previous) + percentChange := utilities.SignedPercentDifference(current, previous) previous = current - if math.Abs(percentChange) > limit { + if percentChange > limit { return false } } diff --git a/networkQuality.go b/networkQuality.go index 44d64b1..54c4260 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -21,6 +21,19 @@ import ( "github.com/hawkinsw/goresponsiveness/utilities" ) +var ( + // Variables to hold CLI arguments. + configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.") + configPort = flag.Int("port", 4043, "port number on which to access responsiveness configuration server.") + configPath = flag.String("path", "config", "path on the server to the configuration endpoint.") + debug = flag.Bool("debug", false, "Enable debugging.") + timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.") + storeSslKeys = flag.Bool("store-ssl-keys", false, "Store SSL keys from connections for debugging. (currently unused)") + + // Global configuration + cooldownPeriod int = 4 +) + type ConfigUrls struct { SmallUrl string `json:"small_https_download_url"` LargeUrl string `json:"large_https_download_url"` @@ -28,9 +41,10 @@ type ConfigUrls struct { } type Config struct { - Version int - Urls ConfigUrls `json:"urls"` - Source string + Version int + Urls ConfigUrls `json:"urls"` + Source string + Test_Endpoint string } func (c *Config) Get(configHost string, configPath string) error { @@ -56,11 +70,30 @@ func (c *Config) Get(configHost string, configPath string) error { if err != nil { return fmt.Errorf("Error: Could not parse configuration returned from %s: %v\n", c.Source, err) } + + //if len(c.Test_Endpoint) != 0 { + if false { + tempUrl, err := url.Parse(c.Urls.LargeUrl) + if err != nil { + return fmt.Errorf("Error parsing large_https_download_url: %v", err) + } + c.Urls.LargeUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path + tempUrl, err = url.Parse(c.Urls.SmallUrl) + if err != nil { + return fmt.Errorf("Error parsing small_https_download_url: %v", err) + } + c.Urls.SmallUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path + tempUrl, err = url.Parse(c.Urls.UploadUrl) + if err != nil { + return fmt.Errorf("Error parsing https_upload_url: %v", err) + } + c.Urls.UploadUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path + } return nil } func (c *Config) String() string { - return fmt.Sprintf("Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s", c.Version, c.Urls.SmallUrl, c.Urls.LargeUrl, c.Urls.UploadUrl) + return fmt.Sprintf("Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s\nEndpoint: %s\n", c.Version, c.Urls.SmallUrl, c.Urls.LargeUrl, c.Urls.UploadUrl, c.Test_Endpoint) } func (c *Config) IsValid() error { @@ -76,19 +109,13 @@ func (c *Config) IsValid() error { return nil } -func toMBs(bytes float64) float64 { - return float64(bytes) / float64(1024*1024) +func toMbps(bytes float64) float64 { + return toMBps(bytes) * float64(8) } -var ( - // Variables to hold CLI arguments. - configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.") - configPort = flag.Int("port", 4043, "port number on which to access responsiveness configuration server.") - configPath = flag.String("path", "config", "path on the server to the configuration endpoint.") - debug = flag.Bool("debug", false, "Enable debugging.") - timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.") - storeSslKeys = flag.Bool("store-ssl-keys", false, "Store SSL keys from connections for debugging. (currently unused)") -) +func toMBps(bytes float64) float64 { + return float64(bytes) / float64(1024*1024) +} func addFlows(ctx context.Context, toAdd uint64, lbcs *[]lbc.LoadBearingConnection, lbcsPreviousTransferred *[]uint64, lbcGenerator func() lbc.LoadBearingConnection, debug bool) { for i := uint64(0); i < toAdd; i++ { @@ -107,7 +134,19 @@ type SaturationResult struct { Lbcs []lbc.LoadBearingConnection } -func saturate(ctx context.Context, lbcGenerator func() lbc.LoadBearingConnection, debug bool) (saturated chan SaturationResult) { +type Debugging struct { + Prefix string +} + +func NewDebugging(prefix string) *Debugging { + return &Debugging{Prefix: prefix} +} + +func (d *Debugging) String() string { + return d.Prefix +} + +func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGenerator func() lbc.LoadBearingConnection, debug *Debugging) (saturated chan SaturationResult) { saturated = make(chan SaturationResult) go func() { @@ -115,37 +154,49 @@ func saturate(ctx context.Context, lbcGenerator func() lbc.LoadBearingConnection lbcsPreviousTransferred := make([]uint64, 0) // Create 4 load bearing connections - addFlows(ctx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug) + addFlows(operatingCtx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil) previousFlowIncreaseIteration := uint64(0) previousMovingAverage := float64(0) movingAverage := ma.NewMovingAverage(4) movingAverageAverage := ma.NewMovingAverage(4) - nextTime := time.Now().Add(time.Second) + nextSampleStartTime := time.Now().Add(time.Second) for currentIteration := uint64(0); true; currentIteration++ { - // If we are cancelled, then stop. - if ctx.Err() != nil { + // When the program stops operating, then stop. + if operatingCtx.Err() != nil { return } + // We may be asked to stop trying to saturate the + // network and return our current status. + if saturationCtx.Err() != nil { + //break + } + now := time.Now() // At each 1-second interval - if nextTime.Second() > now.Second() { - if debug { - fmt.Printf("Sleeping until %v\n", nextTime) + if nextSampleStartTime.Sub(now) > 0 { + if debug != nil { + fmt.Printf("%v: Sleeping until %v\n", debug, nextSampleStartTime) } - time.Sleep(nextTime.Sub(now)) + time.Sleep(nextSampleStartTime.Sub(now)) } else { - fmt.Printf("Warning: Missed a one-second deadline.\n") + fmt.Fprintf(os.Stderr, "Warning: Missed a one-second deadline.\n") } - nextTime = time.Now().Add(time.Second) + nextSampleStartTime = time.Now().Add(time.Second) // Compute "instantaneous aggregate" goodput which is the number of bytes transferred within the last second. totalTransfer := uint64(0) for i := range lbcs { + if !lbcs[i].IsValid() { + if debug != nil { + fmt.Printf("%v: Load-bearing connection at index %d is invalid ... skipping.\n", debug, i) + } + continue + } previousTransferred := lbcsPreviousTransferred[i] currentTransferred := lbcs[i].Transferred() totalTransfer += (currentTransferred - previousTransferred) @@ -157,41 +208,51 @@ func saturate(ctx context.Context, lbcGenerator func() lbc.LoadBearingConnection currentMovingAverage := movingAverage.CalculateAverage() movingAverageAverage.AddMeasurement(currentMovingAverage) movingAverageDelta := utilities.SignedPercentDifference(currentMovingAverage, previousMovingAverage) + + if debug != nil { + fmt.Printf("%v: Instantaneous goodput: %f MB.\n", debug, toMBps(float64(totalTransfer))) + fmt.Printf("%v: Previous moving average: %f MB.\n", debug, toMBps(previousMovingAverage)) + fmt.Printf("%v: Current moving average: %f MB.\n", debug, toMBps(currentMovingAverage)) + fmt.Printf("%v: Moving average delta: %f.\n", debug, movingAverageDelta) + } + previousMovingAverage = currentMovingAverage - if debug { - fmt.Printf("Instantaneous goodput: %f MB.\n", toMBs(float64(totalTransfer))) - fmt.Printf("Moving average: %f MB.\n", toMBs(currentMovingAverage)) - fmt.Printf("Moving average delta: %f.\n", movingAverageDelta) + // Special case: We won't make any adjustments on the first iteration. + if currentIteration == 0 { + continue } // If moving average > "previous" moving average + 5%: - if currentIteration == 0 || movingAverageDelta > float64(5) { + if movingAverageDelta > float64(5) { // Network did not yet reach saturation. If no flows added within the last 4 seconds, add 4 more flows if (currentIteration - previousFlowIncreaseIteration) > 4 { - if debug { - fmt.Printf("Adding flows because we are unsaturated and waited a while.\n") + if debug != nil { + fmt.Printf("%v: Adding flows because we are unsaturated and waited a while.\n", debug) } - addFlows(ctx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug) + addFlows(operatingCtx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil) previousFlowIncreaseIteration = currentIteration } else { - if debug { - fmt.Printf("We are unsaturated, but it still too early to add anything.\n") + if debug != nil { + fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debug) } } } else { // Else, network reached saturation for the current flow count. + if debug != nil { + fmt.Printf("%v: Network reached saturation with current flow count.\n", debug) + } // If new flows added and for 4 seconds the moving average throughput did not change: network reached stable saturation - if (currentIteration-previousFlowIncreaseIteration) < 4 && movingAverageAverage.ConsistentWithin(float64(4)) { - if debug { - fmt.Printf("New flows added within the last four seconds and the moving-average average is consistent!\n") + if (currentIteration-previousFlowIncreaseIteration) < 4 && movingAverageAverage.IncreasesLessThan(float64(5)) { + if debug != nil { + fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debug) } break } else { // Else, add four more flows - if debug { - fmt.Printf("New flows to add to try to increase our saturation!\n") + if debug != nil { + fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debug) } - addFlows(ctx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug) + addFlows(operatingCtx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil) previousFlowIncreaseIteration = currentIteration } } @@ -208,6 +269,7 @@ func main() { timeoutDuration := time.Second * time.Duration(*timeout) configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort) operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background()) + saturationCtx, cancelSaturationCtx := context.WithCancel(context.Background()) config := &Config{} if err := config.Get(configHostPort, *configPath); err != nil { @@ -230,34 +292,48 @@ func main() { generate_lbu := func() lbc.LoadBearingConnection { return &lbc.LoadBearingConnectionUpload{Path: config.Urls.UploadUrl} } - downloadSaturationChannel := saturate(operatingCtx, generate_lbd, *debug) - uploadSaturationChannel := saturate(operatingCtx, generate_lbu, *debug) - test_timeout := false - upload_saturated := false - download_saturated := false + downloadSaturationChannel := saturate(saturationCtx, operatingCtx, generate_lbd, NewDebugging("download")) + uploadSaturationChannel := saturate(saturationCtx, operatingCtx, generate_lbu, NewDebugging("upload")) + + saturationTimeout := false + uploadSaturated := false + downloadSaturated := false downloadSaturation := SaturationResult{} uploadSaturation := SaturationResult{} - for !test_timeout && !(upload_saturated && download_saturated) { + for !(uploadSaturated && downloadSaturated) { select { case downloadSaturation = <-downloadSaturationChannel: { - download_saturated = true + downloadSaturated = true if *debug { - fmt.Printf("################# download is saturated (%fMBps, %d flows)!\n", toMBs(downloadSaturation.RateBps), len(downloadSaturation.Lbcs)) + fmt.Printf("################# download is %s saturated (%fMBps, %d flows)!\n", utilities.Conditional(saturationTimeout, "(provisionally)", ""), toMBps(downloadSaturation.RateBps), len(downloadSaturation.Lbcs)) } } case uploadSaturation = <-uploadSaturationChannel: { - upload_saturated = true + uploadSaturated = true if *debug { - fmt.Printf("################# upload is saturated (%fMBps, %d flows)!\n", toMBs(uploadSaturation.RateBps), len(uploadSaturation.Lbcs)) + fmt.Printf("################# upload is %s saturated (%fMBps, %d flows)!\n", utilities.Conditional(saturationTimeout, "(provisionally)", ""), toMBps(uploadSaturation.RateBps), len(uploadSaturation.Lbcs)) } } case <-timeoutChannel: { - test_timeout = true + if saturationTimeout { + // We already timedout on saturation. This signal means that + // we are timedout on getting the provisional saturation. We + // will exit! + fmt.Fprint(os.Stderr, "Error: Saturation could not be completed in time and no provisional rates could be accessed. Test failed.\n") + cancelOperatingCtx() + if *debug { + time.Sleep(time.Duration(cooldownPeriod) * time.Second) + } + return + } + saturationTimeout = true + timeoutChannel = timeoutat.TimeoutAt(operatingCtx, time.Now().Add(5*time.Second), *debug) + cancelSaturationCtx() if *debug { fmt.Printf("################# timeout reaching saturation!\n") } @@ -265,26 +341,21 @@ func main() { } } - if test_timeout { - cancelOperatingCtx() - fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation before test time expired (%v).\n.", timeoutDuration) - return - } - robustnessProbeIterationCount := 5 - actualRTTCount := 0 + totalRTTsCount := 0 totalRTTTime := float64(0) + rttTimeout := false - for i := 0; i < robustnessProbeIterationCount && !test_timeout; i++ { + for i := 0; i < robustnessProbeIterationCount && !rttTimeout; i++ { randomLbcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Lbcs) select { case <-timeoutChannel: { - test_timeout = true + rttTimeout = true } case fiveRTTsTime := <-utilities.TimedSequentialRTTs(operatingCtx, downloadSaturation.Lbcs[randomLbcsIndex].Client(), &http.Client{}, config.Urls.SmallUrl): { - actualRTTCount += 5 + totalRTTsCount += 5 totalRTTTime += fiveRTTsTime.Delay.Seconds() if *debug { fmt.Printf("fiveRTTsTime: %v\n", fiveRTTsTime.Delay.Seconds()) @@ -293,13 +364,15 @@ func main() { } } - rpm := float64(60) / (totalRTTTime / (float64(actualRTTCount) * 5)) + rpm := float64(60) / (totalRTTTime / (float64(totalRTTsCount) * 5)) + fmt.Printf("Download: %f MBps (%f Mbps), using %d parallel connections.\n", toMBps(downloadSaturation.RateBps), toMbps(downloadSaturation.RateBps), len(downloadSaturation.Lbcs)) + fmt.Printf("Upload: %f MBps (%f Mbps), using %d parallel connections.\n", toMBps(uploadSaturation.RateBps), toMbps(uploadSaturation.RateBps), len(uploadSaturation.Lbcs)) + fmt.Printf("Total RTTs measured: %d\n", totalRTTsCount) fmt.Printf("RPM: %v\n", rpm) cancelOperatingCtx() if *debug { - // Hold on to cool down. - time.Sleep(4 * time.Second) + time.Sleep(time.Duration(cooldownPeriod) * time.Second) } } |
