diff options
Diffstat (limited to 'networkQuality.go')
| -rw-r--r-- | networkQuality.go | 443 |
1 files changed, 51 insertions, 392 deletions
diff --git a/networkQuality.go b/networkQuality.go index 2aba054..ab9517e 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -17,25 +17,20 @@ package main import ( "context" "crypto/tls" - "encoding/json" "flag" "fmt" _ "io" - "io/ioutil" _ "log" - "math/rand" "net/http" - "net/url" "os" "runtime/pprof" - "strings" "time" "github.com/network-quality/goresponsiveness/ccw" + "github.com/network-quality/goresponsiveness/config" "github.com/network-quality/goresponsiveness/constants" "github.com/network-quality/goresponsiveness/debug" "github.com/network-quality/goresponsiveness/lgc" - "github.com/network-quality/goresponsiveness/ma" "github.com/network-quality/goresponsiveness/rpm" "github.com/network-quality/goresponsiveness/timeoutat" "github.com/network-quality/goresponsiveness/utilities" @@ -86,343 +81,6 @@ var ( ) ) -type ConfigUrls struct { - SmallUrl string `json:"small_https_download_url"` - LargeUrl string `json:"large_https_download_url"` - UploadUrl string `json:"https_upload_url"` -} - -type Config struct { - Version int - Urls ConfigUrls `json:"urls"` - Source string - Test_Endpoint string -} - -func (c *Config) Get(configHost string, configPath string) error { - configTransport := http2.Transport{} - configTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} - configClient := &http.Client{Transport: &configTransport} - // Extraneous /s in URLs is normally okay, but the Apple CDN does not - // like them. Make sure that we put exactly one (1) / between the host - // and the path. - if !strings.HasPrefix(configPath, "/") { - configPath = "/" + configPath - } - c.Source = fmt.Sprintf("https://%s%s", configHost, configPath) - resp, err := configClient.Get(c.Source) - if err != nil { - return fmt.Errorf( - "Error: Could not connect to configuration host %s: %v\n", - configHost, - err, - ) - } - - jsonConfig, err := ioutil.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf( - "Error: Could not read configuration content downloaded from %s: %v\n", - c.Source, - err, - ) - } - - err = json.Unmarshal(jsonConfig, c) - if err != nil { - return fmt.Errorf( - "Error: Could not parse configuration returned from %s: %v\n", - c.Source, - err, - ) - } - - //if len(c.Test_Endpoint) != 0 { - if false { - tempUrl, err := url.Parse(c.Urls.LargeUrl) - if err != nil { - return fmt.Errorf("Error parsing large_https_download_url: %v", err) - } - c.Urls.LargeUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path - tempUrl, err = url.Parse(c.Urls.SmallUrl) - if err != nil { - return fmt.Errorf("Error parsing small_https_download_url: %v", err) - } - c.Urls.SmallUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path - tempUrl, err = url.Parse(c.Urls.UploadUrl) - if err != nil { - return fmt.Errorf("Error parsing https_upload_url: %v", err) - } - c.Urls.UploadUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path - } - return nil -} - -func (c *Config) String() string { - return fmt.Sprintf( - "Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s\nEndpoint: %s\n", - c.Version, - c.Urls.SmallUrl, - c.Urls.LargeUrl, - c.Urls.UploadUrl, - c.Test_Endpoint, - ) -} - -func (c *Config) IsValid() error { - if parsedUrl, err := url.ParseRequestURI(c.Urls.LargeUrl); err != nil || - parsedUrl.Scheme != "https" { - return fmt.Errorf( - "Configuration url large_https_download_url is invalid: %s", - utilities.Conditional( - len(c.Urls.LargeUrl) != 0, - c.Urls.LargeUrl, - "Missing", - ), - ) - } - if parsedUrl, err := url.ParseRequestURI(c.Urls.SmallUrl); err != nil || - parsedUrl.Scheme != "https" { - return fmt.Errorf( - "Configuration url small_https_download_url is invalid: %s", - utilities.Conditional( - len(c.Urls.SmallUrl) != 0, - c.Urls.SmallUrl, - "Missing", - ), - ) - } - if parsedUrl, err := url.ParseRequestURI(c.Urls.UploadUrl); err != nil || - parsedUrl.Scheme != "https" { - return fmt.Errorf( - "Configuration url https_upload_url is invalid: %s", - utilities.Conditional( - len(c.Urls.UploadUrl) != 0, - c.Urls.UploadUrl, - "Missing", - ), - ) - } - return nil -} - -func addFlows( - ctx context.Context, - toAdd uint64, - lgcs *[]lgc.LoadGeneratingConnection, - lgcsPreviousTransferred *[]uint64, - lgcGenerator func() lgc.LoadGeneratingConnection, - debug debug.DebugLevel, -) { - for i := uint64(0); i < toAdd; i++ { - *lgcs = append(*lgcs, lgcGenerator()) - *lgcsPreviousTransferred = append(*lgcsPreviousTransferred, 0) - if !(*lgcs)[len(*lgcs)-1].Start(ctx, debug) { - fmt.Printf( - "Error starting lgc with id %d!\n", - (*lgcs)[len(*lgcs)-1].ClientId(), - ) - return - } - } -} - -type SaturationResult struct { - RateBps float64 - lgcs []lgc.LoadGeneratingConnection -} - -func saturate( - saturationCtx context.Context, - operatingCtx context.Context, - lgcGenerator func() lgc.LoadGeneratingConnection, - debugging *debug.DebugWithPrefix, -) (saturated chan SaturationResult) { - saturated = make(chan SaturationResult) - go func() { - - lgcs := make([]lgc.LoadGeneratingConnection, 0) - lgcsPreviousTransferred := make([]uint64, 0) - - addFlows( - saturationCtx, - constants.StartingNumberOfLoadGeneratingConnections, - &lgcs, - &lgcsPreviousTransferred, - lgcGenerator, - debugging.Level, - ) - - previousFlowIncreaseIteration := uint64(0) - previousMovingAverage := float64(0) - movingAverage := ma.NewMovingAverage( - constants.MovingAverageIntervalCount, - ) - movingAverageAverage := ma.NewMovingAverage( - constants.MovingAverageIntervalCount, - ) - - nextSampleStartTime := time.Now().Add(time.Second) - - for currentIteration := uint64(0); true; currentIteration++ { - - // When the program stops operating, then stop. - if saturationCtx.Err() != nil { - return - } - - // We may be asked to stop trying to saturate the - // network and return our current status. - if saturationCtx.Err() != nil { - //break - } - - now := time.Now() - // At each 1-second interval - if nextSampleStartTime.Sub(now) > 0 { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Sleeping until %v\n", - debugging, - nextSampleStartTime, - ) - } - time.Sleep(nextSampleStartTime.Sub(now)) - } else { - fmt.Fprintf(os.Stderr, "Warning: Missed a one-second deadline.\n") - } - nextSampleStartTime = time.Now().Add(time.Second) - - // Compute "instantaneous aggregate" goodput which is the number of - // bytes transferred within the last second. - totalTransfer := uint64(0) - allInvalid := true - for i := range lgcs { - if !lgcs[i].IsValid() { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Load-generating connection with id %d is invalid ... skipping.\n", - debugging, - lgcs[i].ClientId(), - ) - } - continue - } - allInvalid = false - previousTransferred := lgcsPreviousTransferred[i] - currentTransferred := lgcs[i].Transferred() - totalTransfer += (currentTransferred - previousTransferred) - lgcsPreviousTransferred[i] = currentTransferred - } - - // For some reason, all the lgcs are invalid. This likely means that - // the network/server went away. - if allInvalid { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: All lgcs were invalid. Assuming that network/server went away.\n", - debugging, - ) - } - break - } - - // Compute a moving average of the last - // constants.MovingAverageIntervalCount "instantaneous aggregate - // goodput" measurements - movingAverage.AddMeasurement(float64(totalTransfer)) - currentMovingAverage := movingAverage.CalculateAverage() - movingAverageAverage.AddMeasurement(currentMovingAverage) - movingAverageDelta := utilities.SignedPercentDifference( - currentMovingAverage, - previousMovingAverage, - ) - - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Instantaneous goodput: %f MB.\n", - debugging, - utilities.ToMBps(float64(totalTransfer)), - ) - fmt.Printf( - "%v: Previous moving average: %f MB.\n", - debugging, - utilities.ToMBps(previousMovingAverage), - ) - fmt.Printf( - "%v: Current moving average: %f MB.\n", - debugging, - utilities.ToMBps(currentMovingAverage), - ) - fmt.Printf( - "%v: Moving average delta: %f.\n", - debugging, - movingAverageDelta, - ) - } - - previousMovingAverage = currentMovingAverage - - // Special case: We won't make any adjustments on the first - // iteration. - if currentIteration == 0 { - continue - } - - // If moving average > "previous" moving average + InstabilityDelta: - if movingAverageDelta > constants.InstabilityDelta { - // Network did not yet reach saturation. If no flows added - // within the last 4 seconds, add 4 more flows - if (currentIteration - previousFlowIncreaseIteration) > uint64( - constants.MovingAverageStabilitySpan, - ) { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Adding flows because we are unsaturated and waited a while.\n", - debugging, - ) - } - addFlows( - saturationCtx, - constants.AdditiveNumberOfLoadGeneratingConnections, - &lgcs, - &lgcsPreviousTransferred, - lgcGenerator, - debugging.Level, - ) - previousFlowIncreaseIteration = currentIteration - } else { - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debugging) - } - } - } else { // Else, network reached saturation for the current flow count. - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: Network reached saturation with current flow count.\n", debugging) - } - // If new flows added and for 4 seconds the moving average - // throughput did not change: network reached stable saturation - if (currentIteration-previousFlowIncreaseIteration) < uint64(constants.MovingAverageStabilitySpan) && movingAverageAverage.AllSequentialIncreasesLessThan(float64(5)) { - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging) - } - break - } else { - // Else, add four more flows - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging) - } - addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, &lgcsPreviousTransferred, lgcGenerator, debugging.Level) - previousFlowIncreaseIteration = currentIteration - } - } - - } - saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), lgcs: lgcs} - }() - return -} - func main() { flag.Parse() @@ -433,7 +91,7 @@ func main() { saturationCtx, cancelSaturationCtx := context.WithCancel( context.Background(), ) - config := &Config{} + config := &config.Config{} var debugLevel debug.DebugLevel = debug.Error if *debugCliFlag { @@ -508,6 +166,10 @@ func main() { } } + /* + * Create (and then, ironically, name) two anonymous functions that, when invoked, + * will create load-generating connections for upload/download/ + */ generate_lbd := func() lgc.LoadGeneratingConnection { return &lgc.LoadGeneratingConnectionDownload{ Path: config.Urls.LargeUrl, @@ -524,13 +186,13 @@ func main() { var downloadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "download") var uploadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "upload") - downloadSaturationChannel := saturate( + downloadSaturationChannel := rpm.Saturate( saturationCtx, operatingCtx, generate_lbd, downloadDebugging, ) - uploadSaturationChannel := saturate( + uploadSaturationChannel := rpm.Saturate( saturationCtx, operatingCtx, generate_lbu, @@ -540,8 +202,8 @@ func main() { saturationTimeout := false uploadSaturated := false downloadSaturated := false - downloadSaturation := SaturationResult{} - uploadSaturation := SaturationResult{} + downloadSaturation := rpm.SaturationResult{} + uploadSaturation := rpm.SaturationResult{} for !(uploadSaturated && downloadSaturated) { select { @@ -557,7 +219,7 @@ func main() { "", ), utilities.ToMBps(downloadSaturation.RateBps), - len(downloadSaturation.lgcs), + len(downloadSaturation.LGCs), ) } } @@ -573,7 +235,7 @@ func main() { "", ), utilities.ToMBps(uploadSaturation.RateBps), - len(uploadSaturation.lgcs), + len(uploadSaturation.LGCs), ) } } @@ -585,7 +247,7 @@ func main() { // will exit! fmt.Fprint( os.Stderr, - "Error: Saturation could not be completed in time and no provisional rates could be accessed. Test failed.\n", + "Error: Saturation could not be completed in time and no provisional rates could be assessed. Test failed.\n", ) cancelOperatingCtx() if *debugCliFlag { @@ -629,25 +291,21 @@ func main() { ) } - totalRTsCount := uint64(0) - totalRTTimes := float64(0) - rttTimeout := false + totalMeasurements := uint64(0) + totalMeasurementTimes := float64(0) + measurementTimeout := false - for i := 0; i < constants.RPMProbeCount && !rttTimeout; i++ { - if len(downloadSaturation.lgcs) == 0 { + for i := 0; i < constants.MeasurementProbeCount && !measurementTimeout; i++ { + if len(downloadSaturation.LGCs) == 0 { continue } - randomlgcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))). - Int() % - len( - downloadSaturation.lgcs, - ) - if !downloadSaturation.lgcs[randomlgcsIndex].IsValid() { + randomLGCsIndex := utilities.RandBetween(len(downloadSaturation.LGCs)) + if !downloadSaturation.LGCs[randomLGCsIndex].IsValid() { if *debugCliFlag { fmt.Printf( - "%v: The randomly selected download lgc (with id %d) was invalid. Skipping.\n", + "%v: The randomly selected saturated connection (with id %d) was invalid. Skipping.\n", debugCliFlag, - downloadSaturation.lgcs[randomlgcsIndex].ClientId(), + downloadSaturation.LGCs[randomLGCsIndex].ClientId(), ) } @@ -657,7 +315,7 @@ func main() { if time.Since(timeoutAbsoluteTime) > 0 { if *debugCliFlag { fmt.Printf( - "Pathologically could not find valid lgcs to use for measurement.\n", + "Pathologically could not find valid saturated connections use for measurement.\n", ) } break @@ -665,46 +323,47 @@ func main() { continue } - newTransport := http2.Transport{} - newTransport.TLSClientConfig = &tls.Config{} + unsaturatedMeasurementTransport := http2.Transport{} + unsaturatedMeasurementTransport.TLSClientConfig = &tls.Config{} if sslKeyFileConcurrentWriter != nil { - newTransport.TLSClientConfig.KeyLogWriter = sslKeyFileConcurrentWriter + unsaturatedMeasurementTransport.TLSClientConfig.KeyLogWriter = sslKeyFileConcurrentWriter } - newTransport.TLSClientConfig.InsecureSkipVerify = true - newClient := http.Client{Transport: &newTransport} + unsaturatedMeasurementTransport.TLSClientConfig.InsecureSkipVerify = true + newClient := http.Client{Transport: &unsaturatedMeasurementTransport} - newRTTProbe := rpm.NewProbe(&newClient, debugLevel) + unsaturatedMeasurementProbe := rpm.NewProbe(&newClient, debugLevel) - saturatedRTTProbe := rpm.NewProbe( - downloadSaturation.lgcs[randomlgcsIndex].Client(), + saturatedMeasurementProbe := rpm.NewProbe( + downloadSaturation.LGCs[randomLGCsIndex].Client(), debugLevel, ) select { case <-timeoutChannel: { - rttTimeout = true + measurementTimeout = true } - case sequentialRTTimes := <-rpm.CalculateProbeMeasurements(operatingCtx, *strictFlag, saturatedRTTProbe, newRTTProbe, config.Urls.SmallUrl, debugLevel): + case sequentialMeasurementTimes := <-rpm.CalculateProbeMeasurements(operatingCtx, *strictFlag, saturatedMeasurementProbe, unsaturatedMeasurementProbe, config.Urls.SmallUrl, debugLevel): { - if sequentialRTTimes.Err != nil { + if sequentialMeasurementTimes.Err != nil { fmt.Printf( - "Failed to calculate a time for sequential RTTs: %v\n", - sequentialRTTimes.Err, + "Failed to calculate a time for sequential measurements: %v\n", + sequentialMeasurementTimes.Err, ) continue } if debug.IsDebug(debugLevel) { - fmt.Printf("rttProbe: %v\n", newRTTProbe) + fmt.Printf("unsaturatedMeasurementProbe: %v\n", unsaturatedMeasurementProbe) } - // We know that we have a good Sequential RTT. - totalRTsCount += uint64(sequentialRTTimes.MeasurementCount) - totalRTTimes += sequentialRTTimes.Delay.Seconds() + // We know that we have a good Sequential measurement. + totalMeasurements += uint64(sequentialMeasurementTimes.MeasurementCount) + totalMeasurementTimes += sequentialMeasurementTimes.Delay.Seconds() if debug.IsDebug(debugLevel) { fmt.Printf( - "sequentialRTTsTime: %v\n", - sequentialRTTimes.Delay.Seconds(), + "most-recent sequential measurement time: %v; most-recent sequential measurement count: %v\n", + sequentialMeasurementTimes.Delay.Seconds(), + sequentialMeasurementTimes.MeasurementCount, ) } } @@ -715,31 +374,31 @@ func main() { "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", utilities.ToMbps(downloadSaturation.RateBps), utilities.ToMBps(downloadSaturation.RateBps), - len(downloadSaturation.lgcs), + len(downloadSaturation.LGCs), ) fmt.Printf( "Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", utilities.ToMbps(uploadSaturation.RateBps), utilities.ToMBps(uploadSaturation.RateBps), - len(uploadSaturation.lgcs), + len(uploadSaturation.LGCs), ) - if totalRTsCount != 0 { + if totalMeasurements != 0 { // "... it sums the five time values for each probe, and divides by the // total // number of probes to compute an average probe duration. The // reciprocal of this, normalized to 60 seconds, gives the Round-trips // Per Minute (RPM)." - // "average probe duration" = totalRTTimes / totalRTsCount. - // The reciprocol of this = 1 / (totalRTTimes / totalRTsCount) <- + // "average probe duration" = totalMeasurementTimes / totalMeasurements. + // The reciprocol of this = 1 / (totalMeasurementTimes / totalMeasurements) <- // semantically the probes-per-second. // Normalized to 60 seconds: 60 * (1 - // / (totalRTTimes / totalRTsCount))) <- semantically the number of + // / ((totalMeasurementTimes / totalMeasurements)))) <- semantically the number of // probes per minute. rpm := float64( time.Minute.Seconds(), - ) / (totalRTTimes / (float64(totalRTsCount))) - fmt.Printf("Total RTTs measured: %d\n", totalRTsCount) + ) / (totalMeasurementTimes / (float64(totalMeasurements))) + fmt.Printf("Total measurements: %d\n", totalMeasurements) fmt.Printf("RPM: %5.0f\n", rpm) } else { fmt.Printf("Error occurred calculating RPM -- no probe measurements received.\n") |
