diff options
| -rw-r--r-- | config/config.go | 134 | ||||
| -rw-r--r-- | constants/constants.go | 2 | ||||
| -rw-r--r-- | networkQuality.go | 443 | ||||
| -rw-r--r-- | rpm/rpm.go | 221 | ||||
| -rw-r--r-- | utilities/utilities.go | 5 |
5 files changed, 412 insertions, 393 deletions
diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..05ec61d --- /dev/null +++ b/config/config.go @@ -0,0 +1,134 @@ +package config + +import ( + "crypto/tls" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strings" + + "github.com/network-quality/goresponsiveness/utilities" + "golang.org/x/net/http2" +) + +type ConfigUrls struct { + SmallUrl string `json:"small_https_download_url"` + LargeUrl string `json:"large_https_download_url"` + UploadUrl string `json:"https_upload_url"` +} + +type Config struct { + Version int + Urls ConfigUrls `json:"urls"` + Source string + Test_Endpoint string +} + +func (c *Config) Get(configHost string, configPath string) error { + configTransport := http2.Transport{} + configTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + configClient := &http.Client{Transport: &configTransport} + // Extraneous /s in URLs is normally okay, but the Apple CDN does not + // like them. Make sure that we put exactly one (1) / between the host + // and the path. + if !strings.HasPrefix(configPath, "/") { + configPath = "/" + configPath + } + c.Source = fmt.Sprintf("https://%s%s", configHost, configPath) + resp, err := configClient.Get(c.Source) + if err != nil { + return fmt.Errorf( + "Error: Could not connect to configuration host %s: %v\n", + configHost, + err, + ) + } + + jsonConfig, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf( + "Error: Could not read configuration content downloaded from %s: %v\n", + c.Source, + err, + ) + } + + err = json.Unmarshal(jsonConfig, c) + if err != nil { + return fmt.Errorf( + "Error: Could not parse configuration returned from %s: %v\n", + c.Source, + err, + ) + } + + //if len(c.Test_Endpoint) != 0 { + if false { + tempUrl, err := url.Parse(c.Urls.LargeUrl) + if err != nil { + return fmt.Errorf("Error parsing large_https_download_url: %v", err) + } + c.Urls.LargeUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path + tempUrl, err = url.Parse(c.Urls.SmallUrl) + if err != nil { + return fmt.Errorf("Error parsing small_https_download_url: %v", err) + } + c.Urls.SmallUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path + tempUrl, err = url.Parse(c.Urls.UploadUrl) + if err != nil { + return fmt.Errorf("Error parsing https_upload_url: %v", err) + } + c.Urls.UploadUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path + } + return nil +} + +func (c *Config) String() string { + return fmt.Sprintf( + "Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s\nEndpoint: %s\n", + c.Version, + c.Urls.SmallUrl, + c.Urls.LargeUrl, + c.Urls.UploadUrl, + c.Test_Endpoint, + ) +} + +func (c *Config) IsValid() error { + if parsedUrl, err := url.ParseRequestURI(c.Urls.LargeUrl); err != nil || + parsedUrl.Scheme != "https" { + return fmt.Errorf( + "Configuration url large_https_download_url is invalid: %s", + utilities.Conditional( + len(c.Urls.LargeUrl) != 0, + c.Urls.LargeUrl, + "Missing", + ), + ) + } + if parsedUrl, err := url.ParseRequestURI(c.Urls.SmallUrl); err != nil || + parsedUrl.Scheme != "https" { + return fmt.Errorf( + "Configuration url small_https_download_url is invalid: %s", + utilities.Conditional( + len(c.Urls.SmallUrl) != 0, + c.Urls.SmallUrl, + "Missing", + ), + ) + } + if parsedUrl, err := url.ParseRequestURI(c.Urls.UploadUrl); err != nil || + parsedUrl.Scheme != "https" { + return fmt.Errorf( + "Configuration url https_upload_url is invalid: %s", + utilities.Conditional( + len(c.Urls.UploadUrl) != 0, + c.Urls.UploadUrl, + "Missing", + ), + ) + } + return nil +} diff --git a/constants/constants.go b/constants/constants.go index 2f906b6..147b643 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -18,7 +18,7 @@ var ( // The amount of time that the client will cooldown if it is in debug mode. CooldownPeriod time.Duration = 4 * time.Second // The number of probes to send when calculating RTT. - RPMProbeCount int = 5 + MeasurementProbeCount int = 5 // The amount of time that we give ourselves to calculate the RPM. RPMCalculationTime time.Duration = 10 * time.Second diff --git a/networkQuality.go b/networkQuality.go index 2aba054..ab9517e 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -17,25 +17,20 @@ package main import ( "context" "crypto/tls" - "encoding/json" "flag" "fmt" _ "io" - "io/ioutil" _ "log" - "math/rand" "net/http" - "net/url" "os" "runtime/pprof" - "strings" "time" "github.com/network-quality/goresponsiveness/ccw" + "github.com/network-quality/goresponsiveness/config" "github.com/network-quality/goresponsiveness/constants" "github.com/network-quality/goresponsiveness/debug" "github.com/network-quality/goresponsiveness/lgc" - "github.com/network-quality/goresponsiveness/ma" "github.com/network-quality/goresponsiveness/rpm" "github.com/network-quality/goresponsiveness/timeoutat" "github.com/network-quality/goresponsiveness/utilities" @@ -86,343 +81,6 @@ var ( ) ) -type ConfigUrls struct { - SmallUrl string `json:"small_https_download_url"` - LargeUrl string `json:"large_https_download_url"` - UploadUrl string `json:"https_upload_url"` -} - -type Config struct { - Version int - Urls ConfigUrls `json:"urls"` - Source string - Test_Endpoint string -} - -func (c *Config) Get(configHost string, configPath string) error { - configTransport := http2.Transport{} - configTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} - configClient := &http.Client{Transport: &configTransport} - // Extraneous /s in URLs is normally okay, but the Apple CDN does not - // like them. Make sure that we put exactly one (1) / between the host - // and the path. - if !strings.HasPrefix(configPath, "/") { - configPath = "/" + configPath - } - c.Source = fmt.Sprintf("https://%s%s", configHost, configPath) - resp, err := configClient.Get(c.Source) - if err != nil { - return fmt.Errorf( - "Error: Could not connect to configuration host %s: %v\n", - configHost, - err, - ) - } - - jsonConfig, err := ioutil.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf( - "Error: Could not read configuration content downloaded from %s: %v\n", - c.Source, - err, - ) - } - - err = json.Unmarshal(jsonConfig, c) - if err != nil { - return fmt.Errorf( - "Error: Could not parse configuration returned from %s: %v\n", - c.Source, - err, - ) - } - - //if len(c.Test_Endpoint) != 0 { - if false { - tempUrl, err := url.Parse(c.Urls.LargeUrl) - if err != nil { - return fmt.Errorf("Error parsing large_https_download_url: %v", err) - } - c.Urls.LargeUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path - tempUrl, err = url.Parse(c.Urls.SmallUrl) - if err != nil { - return fmt.Errorf("Error parsing small_https_download_url: %v", err) - } - c.Urls.SmallUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path - tempUrl, err = url.Parse(c.Urls.UploadUrl) - if err != nil { - return fmt.Errorf("Error parsing https_upload_url: %v", err) - } - c.Urls.UploadUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path - } - return nil -} - -func (c *Config) String() string { - return fmt.Sprintf( - "Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s\nEndpoint: %s\n", - c.Version, - c.Urls.SmallUrl, - c.Urls.LargeUrl, - c.Urls.UploadUrl, - c.Test_Endpoint, - ) -} - -func (c *Config) IsValid() error { - if parsedUrl, err := url.ParseRequestURI(c.Urls.LargeUrl); err != nil || - parsedUrl.Scheme != "https" { - return fmt.Errorf( - "Configuration url large_https_download_url is invalid: %s", - utilities.Conditional( - len(c.Urls.LargeUrl) != 0, - c.Urls.LargeUrl, - "Missing", - ), - ) - } - if parsedUrl, err := url.ParseRequestURI(c.Urls.SmallUrl); err != nil || - parsedUrl.Scheme != "https" { - return fmt.Errorf( - "Configuration url small_https_download_url is invalid: %s", - utilities.Conditional( - len(c.Urls.SmallUrl) != 0, - c.Urls.SmallUrl, - "Missing", - ), - ) - } - if parsedUrl, err := url.ParseRequestURI(c.Urls.UploadUrl); err != nil || - parsedUrl.Scheme != "https" { - return fmt.Errorf( - "Configuration url https_upload_url is invalid: %s", - utilities.Conditional( - len(c.Urls.UploadUrl) != 0, - c.Urls.UploadUrl, - "Missing", - ), - ) - } - return nil -} - -func addFlows( - ctx context.Context, - toAdd uint64, - lgcs *[]lgc.LoadGeneratingConnection, - lgcsPreviousTransferred *[]uint64, - lgcGenerator func() lgc.LoadGeneratingConnection, - debug debug.DebugLevel, -) { - for i := uint64(0); i < toAdd; i++ { - *lgcs = append(*lgcs, lgcGenerator()) - *lgcsPreviousTransferred = append(*lgcsPreviousTransferred, 0) - if !(*lgcs)[len(*lgcs)-1].Start(ctx, debug) { - fmt.Printf( - "Error starting lgc with id %d!\n", - (*lgcs)[len(*lgcs)-1].ClientId(), - ) - return - } - } -} - -type SaturationResult struct { - RateBps float64 - lgcs []lgc.LoadGeneratingConnection -} - -func saturate( - saturationCtx context.Context, - operatingCtx context.Context, - lgcGenerator func() lgc.LoadGeneratingConnection, - debugging *debug.DebugWithPrefix, -) (saturated chan SaturationResult) { - saturated = make(chan SaturationResult) - go func() { - - lgcs := make([]lgc.LoadGeneratingConnection, 0) - lgcsPreviousTransferred := make([]uint64, 0) - - addFlows( - saturationCtx, - constants.StartingNumberOfLoadGeneratingConnections, - &lgcs, - &lgcsPreviousTransferred, - lgcGenerator, - debugging.Level, - ) - - previousFlowIncreaseIteration := uint64(0) - previousMovingAverage := float64(0) - movingAverage := ma.NewMovingAverage( - constants.MovingAverageIntervalCount, - ) - movingAverageAverage := ma.NewMovingAverage( - constants.MovingAverageIntervalCount, - ) - - nextSampleStartTime := time.Now().Add(time.Second) - - for currentIteration := uint64(0); true; currentIteration++ { - - // When the program stops operating, then stop. - if saturationCtx.Err() != nil { - return - } - - // We may be asked to stop trying to saturate the - // network and return our current status. - if saturationCtx.Err() != nil { - //break - } - - now := time.Now() - // At each 1-second interval - if nextSampleStartTime.Sub(now) > 0 { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Sleeping until %v\n", - debugging, - nextSampleStartTime, - ) - } - time.Sleep(nextSampleStartTime.Sub(now)) - } else { - fmt.Fprintf(os.Stderr, "Warning: Missed a one-second deadline.\n") - } - nextSampleStartTime = time.Now().Add(time.Second) - - // Compute "instantaneous aggregate" goodput which is the number of - // bytes transferred within the last second. - totalTransfer := uint64(0) - allInvalid := true - for i := range lgcs { - if !lgcs[i].IsValid() { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Load-generating connection with id %d is invalid ... skipping.\n", - debugging, - lgcs[i].ClientId(), - ) - } - continue - } - allInvalid = false - previousTransferred := lgcsPreviousTransferred[i] - currentTransferred := lgcs[i].Transferred() - totalTransfer += (currentTransferred - previousTransferred) - lgcsPreviousTransferred[i] = currentTransferred - } - - // For some reason, all the lgcs are invalid. This likely means that - // the network/server went away. - if allInvalid { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: All lgcs were invalid. Assuming that network/server went away.\n", - debugging, - ) - } - break - } - - // Compute a moving average of the last - // constants.MovingAverageIntervalCount "instantaneous aggregate - // goodput" measurements - movingAverage.AddMeasurement(float64(totalTransfer)) - currentMovingAverage := movingAverage.CalculateAverage() - movingAverageAverage.AddMeasurement(currentMovingAverage) - movingAverageDelta := utilities.SignedPercentDifference( - currentMovingAverage, - previousMovingAverage, - ) - - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Instantaneous goodput: %f MB.\n", - debugging, - utilities.ToMBps(float64(totalTransfer)), - ) - fmt.Printf( - "%v: Previous moving average: %f MB.\n", - debugging, - utilities.ToMBps(previousMovingAverage), - ) - fmt.Printf( - "%v: Current moving average: %f MB.\n", - debugging, - utilities.ToMBps(currentMovingAverage), - ) - fmt.Printf( - "%v: Moving average delta: %f.\n", - debugging, - movingAverageDelta, - ) - } - - previousMovingAverage = currentMovingAverage - - // Special case: We won't make any adjustments on the first - // iteration. - if currentIteration == 0 { - continue - } - - // If moving average > "previous" moving average + InstabilityDelta: - if movingAverageDelta > constants.InstabilityDelta { - // Network did not yet reach saturation. If no flows added - // within the last 4 seconds, add 4 more flows - if (currentIteration - previousFlowIncreaseIteration) > uint64( - constants.MovingAverageStabilitySpan, - ) { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Adding flows because we are unsaturated and waited a while.\n", - debugging, - ) - } - addFlows( - saturationCtx, - constants.AdditiveNumberOfLoadGeneratingConnections, - &lgcs, - &lgcsPreviousTransferred, - lgcGenerator, - debugging.Level, - ) - previousFlowIncreaseIteration = currentIteration - } else { - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debugging) - } - } - } else { // Else, network reached saturation for the current flow count. - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: Network reached saturation with current flow count.\n", debugging) - } - // If new flows added and for 4 seconds the moving average - // throughput did not change: network reached stable saturation - if (currentIteration-previousFlowIncreaseIteration) < uint64(constants.MovingAverageStabilitySpan) && movingAverageAverage.AllSequentialIncreasesLessThan(float64(5)) { - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging) - } - break - } else { - // Else, add four more flows - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging) - } - addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, &lgcsPreviousTransferred, lgcGenerator, debugging.Level) - previousFlowIncreaseIteration = currentIteration - } - } - - } - saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), lgcs: lgcs} - }() - return -} - func main() { flag.Parse() @@ -433,7 +91,7 @@ func main() { saturationCtx, cancelSaturationCtx := context.WithCancel( context.Background(), ) - config := &Config{} + config := &config.Config{} var debugLevel debug.DebugLevel = debug.Error if *debugCliFlag { @@ -508,6 +166,10 @@ func main() { } } + /* + * Create (and then, ironically, name) two anonymous functions that, when invoked, + * will create load-generating connections for upload/download/ + */ generate_lbd := func() lgc.LoadGeneratingConnection { return &lgc.LoadGeneratingConnectionDownload{ Path: config.Urls.LargeUrl, @@ -524,13 +186,13 @@ func main() { var downloadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "download") var uploadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "upload") - downloadSaturationChannel := saturate( + downloadSaturationChannel := rpm.Saturate( saturationCtx, operatingCtx, generate_lbd, downloadDebugging, ) - uploadSaturationChannel := saturate( + uploadSaturationChannel := rpm.Saturate( saturationCtx, operatingCtx, generate_lbu, @@ -540,8 +202,8 @@ func main() { saturationTimeout := false uploadSaturated := false downloadSaturated := false - downloadSaturation := SaturationResult{} - uploadSaturation := SaturationResult{} + downloadSaturation := rpm.SaturationResult{} + uploadSaturation := rpm.SaturationResult{} for !(uploadSaturated && downloadSaturated) { select { @@ -557,7 +219,7 @@ func main() { "", ), utilities.ToMBps(downloadSaturation.RateBps), - len(downloadSaturation.lgcs), + len(downloadSaturation.LGCs), ) } } @@ -573,7 +235,7 @@ func main() { "", ), utilities.ToMBps(uploadSaturation.RateBps), - len(uploadSaturation.lgcs), + len(uploadSaturation.LGCs), ) } } @@ -585,7 +247,7 @@ func main() { // will exit! fmt.Fprint( os.Stderr, - "Error: Saturation could not be completed in time and no provisional rates could be accessed. Test failed.\n", + "Error: Saturation could not be completed in time and no provisional rates could be assessed. Test failed.\n", ) cancelOperatingCtx() if *debugCliFlag { @@ -629,25 +291,21 @@ func main() { ) } - totalRTsCount := uint64(0) - totalRTTimes := float64(0) - rttTimeout := false + totalMeasurements := uint64(0) + totalMeasurementTimes := float64(0) + measurementTimeout := false - for i := 0; i < constants.RPMProbeCount && !rttTimeout; i++ { - if len(downloadSaturation.lgcs) == 0 { + for i := 0; i < constants.MeasurementProbeCount && !measurementTimeout; i++ { + if len(downloadSaturation.LGCs) == 0 { continue } - randomlgcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))). - Int() % - len( - downloadSaturation.lgcs, - ) - if !downloadSaturation.lgcs[randomlgcsIndex].IsValid() { + randomLGCsIndex := utilities.RandBetween(len(downloadSaturation.LGCs)) + if !downloadSaturation.LGCs[randomLGCsIndex].IsValid() { if *debugCliFlag { fmt.Printf( - "%v: The randomly selected download lgc (with id %d) was invalid. Skipping.\n", + "%v: The randomly selected saturated connection (with id %d) was invalid. Skipping.\n", debugCliFlag, - downloadSaturation.lgcs[randomlgcsIndex].ClientId(), + downloadSaturation.LGCs[randomLGCsIndex].ClientId(), ) } @@ -657,7 +315,7 @@ func main() { if time.Since(timeoutAbsoluteTime) > 0 { if *debugCliFlag { fmt.Printf( - "Pathologically could not find valid lgcs to use for measurement.\n", + "Pathologically could not find valid saturated connections use for measurement.\n", ) } break @@ -665,46 +323,47 @@ func main() { continue } - newTransport := http2.Transport{} - newTransport.TLSClientConfig = &tls.Config{} + unsaturatedMeasurementTransport := http2.Transport{} + unsaturatedMeasurementTransport.TLSClientConfig = &tls.Config{} if sslKeyFileConcurrentWriter != nil { - newTransport.TLSClientConfig.KeyLogWriter = sslKeyFileConcurrentWriter + unsaturatedMeasurementTransport.TLSClientConfig.KeyLogWriter = sslKeyFileConcurrentWriter } - newTransport.TLSClientConfig.InsecureSkipVerify = true - newClient := http.Client{Transport: &newTransport} + unsaturatedMeasurementTransport.TLSClientConfig.InsecureSkipVerify = true + newClient := http.Client{Transport: &unsaturatedMeasurementTransport} - newRTTProbe := rpm.NewProbe(&newClient, debugLevel) + unsaturatedMeasurementProbe := rpm.NewProbe(&newClient, debugLevel) - saturatedRTTProbe := rpm.NewProbe( - downloadSaturation.lgcs[randomlgcsIndex].Client(), + saturatedMeasurementProbe := rpm.NewProbe( + downloadSaturation.LGCs[randomLGCsIndex].Client(), debugLevel, ) select { case <-timeoutChannel: { - rttTimeout = true + measurementTimeout = true } - case sequentialRTTimes := <-rpm.CalculateProbeMeasurements(operatingCtx, *strictFlag, saturatedRTTProbe, newRTTProbe, config.Urls.SmallUrl, debugLevel): + case sequentialMeasurementTimes := <-rpm.CalculateProbeMeasurements(operatingCtx, *strictFlag, saturatedMeasurementProbe, unsaturatedMeasurementProbe, config.Urls.SmallUrl, debugLevel): { - if sequentialRTTimes.Err != nil { + if sequentialMeasurementTimes.Err != nil { fmt.Printf( - "Failed to calculate a time for sequential RTTs: %v\n", - sequentialRTTimes.Err, + "Failed to calculate a time for sequential measurements: %v\n", + sequentialMeasurementTimes.Err, ) continue } if debug.IsDebug(debugLevel) { - fmt.Printf("rttProbe: %v\n", newRTTProbe) + fmt.Printf("unsaturatedMeasurementProbe: %v\n", unsaturatedMeasurementProbe) } - // We know that we have a good Sequential RTT. - totalRTsCount += uint64(sequentialRTTimes.MeasurementCount) - totalRTTimes += sequentialRTTimes.Delay.Seconds() + // We know that we have a good Sequential measurement. + totalMeasurements += uint64(sequentialMeasurementTimes.MeasurementCount) + totalMeasurementTimes += sequentialMeasurementTimes.Delay.Seconds() if debug.IsDebug(debugLevel) { fmt.Printf( - "sequentialRTTsTime: %v\n", - sequentialRTTimes.Delay.Seconds(), + "most-recent sequential measurement time: %v; most-recent sequential measurement count: %v\n", + sequentialMeasurementTimes.Delay.Seconds(), + sequentialMeasurementTimes.MeasurementCount, ) } } @@ -715,31 +374,31 @@ func main() { "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", utilities.ToMbps(downloadSaturation.RateBps), utilities.ToMBps(downloadSaturation.RateBps), - len(downloadSaturation.lgcs), + len(downloadSaturation.LGCs), ) fmt.Printf( "Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", utilities.ToMbps(uploadSaturation.RateBps), utilities.ToMBps(uploadSaturation.RateBps), - len(uploadSaturation.lgcs), + len(uploadSaturation.LGCs), ) - if totalRTsCount != 0 { + if totalMeasurements != 0 { // "... it sums the five time values for each probe, and divides by the // total // number of probes to compute an average probe duration. The // reciprocal of this, normalized to 60 seconds, gives the Round-trips // Per Minute (RPM)." - // "average probe duration" = totalRTTimes / totalRTsCount. - // The reciprocol of this = 1 / (totalRTTimes / totalRTsCount) <- + // "average probe duration" = totalMeasurementTimes / totalMeasurements. + // The reciprocol of this = 1 / (totalMeasurementTimes / totalMeasurements) <- // semantically the probes-per-second. // Normalized to 60 seconds: 60 * (1 - // / (totalRTTimes / totalRTsCount))) <- semantically the number of + // / ((totalMeasurementTimes / totalMeasurements)))) <- semantically the number of // probes per minute. rpm := float64( time.Minute.Seconds(), - ) / (totalRTTimes / (float64(totalRTsCount))) - fmt.Printf("Total RTTs measured: %d\n", totalRTsCount) + ) / (totalMeasurementTimes / (float64(totalMeasurements))) + fmt.Printf("Total measurements: %d\n", totalMeasurements) fmt.Printf("RPM: %5.0f\n", rpm) } else { fmt.Printf("Error occurred calculating RPM -- no probe measurements received.\n") @@ -7,14 +7,235 @@ import ( "io" "net/http" "net/http/httptrace" + "os" "time" + "github.com/network-quality/goresponsiveness/constants" "github.com/network-quality/goresponsiveness/debug" + "github.com/network-quality/goresponsiveness/lgc" + "github.com/network-quality/goresponsiveness/ma" "github.com/network-quality/goresponsiveness/stats" "github.com/network-quality/goresponsiveness/traceable" "github.com/network-quality/goresponsiveness/utilities" ) +func addFlows( + ctx context.Context, + toAdd uint64, + lgcs *[]lgc.LoadGeneratingConnection, + lgcsPreviousTransferred *[]uint64, + lgcGenerator func() lgc.LoadGeneratingConnection, + debug debug.DebugLevel, +) { + for i := uint64(0); i < toAdd; i++ { + *lgcs = append(*lgcs, lgcGenerator()) + *lgcsPreviousTransferred = append(*lgcsPreviousTransferred, 0) + if !(*lgcs)[len(*lgcs)-1].Start(ctx, debug) { + fmt.Printf( + "Error starting lgc with id %d!\n", + (*lgcs)[len(*lgcs)-1].ClientId(), + ) + return + } + } +} + +type SaturationResult struct { + RateBps float64 + LGCs []lgc.LoadGeneratingConnection +} + +func Saturate( + saturationCtx context.Context, + operatingCtx context.Context, + lgcGenerator func() lgc.LoadGeneratingConnection, + debugging *debug.DebugWithPrefix, +) (saturated chan SaturationResult) { + saturated = make(chan SaturationResult) + go func() { + + lgcs := make([]lgc.LoadGeneratingConnection, 0) + lgcsPreviousTransferred := make([]uint64, 0) + + addFlows( + saturationCtx, + constants.StartingNumberOfLoadGeneratingConnections, + &lgcs, + &lgcsPreviousTransferred, + lgcGenerator, + debugging.Level, + ) + + previousFlowIncreaseIteration := uint64(0) + previousMovingAverage := float64(0) + movingAverage := ma.NewMovingAverage( + constants.MovingAverageIntervalCount, + ) + movingAverageAverage := ma.NewMovingAverage( + constants.MovingAverageIntervalCount, + ) + + nextSampleStartTime := time.Now().Add(time.Second) + + for currentIteration := uint64(0); true; currentIteration++ { + + // When the program stops operating, then stop. + if saturationCtx.Err() != nil { + return + } + + // We may be asked to stop trying to saturate the + // network and return our current status. + if saturationCtx.Err() != nil { + //break + } + + now := time.Now() + // At each 1-second interval + if nextSampleStartTime.Sub(now) > 0 { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Sleeping until %v\n", + debugging, + nextSampleStartTime, + ) + } + time.Sleep(nextSampleStartTime.Sub(now)) + } else { + fmt.Fprintf(os.Stderr, "Warning: Missed a one-second deadline.\n") + } + nextSampleStartTime = time.Now().Add(time.Second) + + // Compute "instantaneous aggregate" goodput which is the number of + // bytes transferred within the last second. + totalTransfer := uint64(0) + allInvalid := true + for i := range lgcs { + if !lgcs[i].IsValid() { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Load-generating connection with id %d is invalid ... skipping.\n", + debugging, + lgcs[i].ClientId(), + ) + } + continue + } + allInvalid = false + previousTransferred := lgcsPreviousTransferred[i] + currentTransferred := lgcs[i].Transferred() + totalTransfer += (currentTransferred - previousTransferred) + lgcsPreviousTransferred[i] = currentTransferred + } + + // For some reason, all the lgcs are invalid. This likely means that + // the network/server went away. + if allInvalid { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: All lgcs were invalid. Assuming that network/server went away.\n", + debugging, + ) + } + break + } + + // Compute a moving average of the last + // constants.MovingAverageIntervalCount "instantaneous aggregate + // goodput" measurements + movingAverage.AddMeasurement(float64(totalTransfer)) + currentMovingAverage := movingAverage.CalculateAverage() + movingAverageAverage.AddMeasurement(currentMovingAverage) + movingAverageDelta := utilities.SignedPercentDifference( + currentMovingAverage, + previousMovingAverage, + ) + + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Instantaneous goodput: %f MB.\n", + debugging, + utilities.ToMBps(float64(totalTransfer)), + ) + fmt.Printf( + "%v: Previous moving average: %f MB.\n", + debugging, + utilities.ToMBps(previousMovingAverage), + ) + fmt.Printf( + "%v: Current moving average: %f MB.\n", + debugging, + utilities.ToMBps(currentMovingAverage), + ) + fmt.Printf( + "%v: Moving average delta: %f.\n", + debugging, + movingAverageDelta, + ) + } + + previousMovingAverage = currentMovingAverage + + // Special case: We won't make any adjustments on the first + // iteration. + if currentIteration == 0 { + continue + } + + // If moving average > "previous" moving average + InstabilityDelta: + if movingAverageDelta > constants.InstabilityDelta { + // Network did not yet reach saturation. If no flows added + // within the last 4 seconds, add 4 more flows + if (currentIteration - previousFlowIncreaseIteration) > uint64( + constants.MovingAverageStabilitySpan, + ) { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Adding flows because we are unsaturated and waited a while.\n", + debugging, + ) + } + addFlows( + saturationCtx, + constants.AdditiveNumberOfLoadGeneratingConnections, + &lgcs, + &lgcsPreviousTransferred, + lgcGenerator, + debugging.Level, + ) + previousFlowIncreaseIteration = currentIteration + } else { + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debugging) + } + } + } else { // Else, network reached saturation for the current flow count. + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: Network reached saturation with current flow count.\n", debugging) + } + // If new flows added and for 4 seconds the moving average + // throughput did not change: network reached stable saturation + if (currentIteration-previousFlowIncreaseIteration) < uint64(constants.MovingAverageStabilitySpan) && movingAverageAverage.AllSequentialIncreasesLessThan(float64(5)) { + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging) + } + break + } else { + // Else, add four more flows + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging) + } + addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, &lgcsPreviousTransferred, lgcGenerator, debugging.Level) + previousFlowIncreaseIteration = currentIteration + } + } + + } + saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs} + }() + return +} + type Probe struct { client *http.Client stats *stats.TraceStats diff --git a/utilities/utilities.go b/utilities/utilities.go index 4b114ba..a143d31 100644 --- a/utilities/utilities.go +++ b/utilities/utilities.go @@ -17,6 +17,7 @@ package utilities import ( "fmt" "math" + "math/rand" "os" "reflect" "sync/atomic" @@ -117,3 +118,7 @@ func (optional Optional[S]) String() string { return "None" } } + +func RandBetween(max int) int { + return rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % max +} |
