From fdfe96b53ef8f4532e5b6f65f86ba39fe242cc5b Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Wed, 11 May 2022 16:49:09 -0400 Subject: [Refactor/Bugfix] Refactor RPM calculation and fix calculation algorithm As it turns out, I was misreading the algorithm for calculating the RPM based upon the measurements taken during execution. This patch fixes that mistake and also (starts) renames "RTT" as "measurement" (those are technically a better nomenclature according to the spec.) --- networkQuality.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'networkQuality.go') diff --git a/networkQuality.go b/networkQuality.go index 6b80a7e..2aba054 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -64,6 +64,11 @@ var ( constants.DefaultDebug, "Enable debugging.", ) + strictFlag = flag.Bool( + "strict", + constants.DefaultStrict, + "Whether to run the test in strict mode (measure HTTP get time on load-generating connection)", + ) timeout = flag.Int( "timeout", constants.DefaultTestTime, @@ -680,7 +685,7 @@ func main() { { rttTimeout = true } - case sequentialRTTimes := <-rpm.CalculateSequentialRTTsTime(operatingCtx, saturatedRTTProbe, newRTTProbe, config.Urls.SmallUrl, debugLevel): + case sequentialRTTimes := <-rpm.CalculateProbeMeasurements(operatingCtx, *strictFlag, saturatedRTTProbe, newRTTProbe, config.Urls.SmallUrl, debugLevel): { if sequentialRTTimes.Err != nil { fmt.Printf( @@ -694,7 +699,7 @@ func main() { fmt.Printf("rttProbe: %v\n", newRTTProbe) } // We know that we have a good Sequential RTT. - totalRTsCount += uint64(sequentialRTTimes.RoundTripCount) + totalRTsCount += uint64(sequentialRTTimes.MeasurementCount) totalRTTimes += sequentialRTTimes.Delay.Seconds() if debug.IsDebug(debugLevel) { fmt.Printf( -- cgit v1.2.3 From 89f26501e59095e1e6ac59cf158f6305e4e93389 Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Fri, 13 May 2022 11:02:09 -0400 Subject: [Refactor] Create config package and package-ify saturate() 1. Create a separate package to handle the config information. 2. Move the saturate functionality into the rpm package. 3. Do general renaming/refactoring so that we are consistently saying measurement and not RTT (this nomenclature is more consistent with the standard). --- config/config.go | 134 +++++++++++++++ constants/constants.go | 2 +- networkQuality.go | 443 ++++++------------------------------------------- rpm/rpm.go | 221 ++++++++++++++++++++++++ utilities/utilities.go | 5 + 5 files changed, 412 insertions(+), 393 deletions(-) create mode 100644 config/config.go (limited to 'networkQuality.go') diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..05ec61d --- /dev/null +++ b/config/config.go @@ -0,0 +1,134 @@ +package config + +import ( + "crypto/tls" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strings" + + "github.com/network-quality/goresponsiveness/utilities" + "golang.org/x/net/http2" +) + +type ConfigUrls struct { + SmallUrl string `json:"small_https_download_url"` + LargeUrl string `json:"large_https_download_url"` + UploadUrl string `json:"https_upload_url"` +} + +type Config struct { + Version int + Urls ConfigUrls `json:"urls"` + Source string + Test_Endpoint string +} + +func (c *Config) Get(configHost string, configPath string) error { + configTransport := http2.Transport{} + configTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + configClient := &http.Client{Transport: &configTransport} + // Extraneous /s in URLs is normally okay, but the Apple CDN does not + // like them. Make sure that we put exactly one (1) / between the host + // and the path. + if !strings.HasPrefix(configPath, "/") { + configPath = "/" + configPath + } + c.Source = fmt.Sprintf("https://%s%s", configHost, configPath) + resp, err := configClient.Get(c.Source) + if err != nil { + return fmt.Errorf( + "Error: Could not connect to configuration host %s: %v\n", + configHost, + err, + ) + } + + jsonConfig, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf( + "Error: Could not read configuration content downloaded from %s: %v\n", + c.Source, + err, + ) + } + + err = json.Unmarshal(jsonConfig, c) + if err != nil { + return fmt.Errorf( + "Error: Could not parse configuration returned from %s: %v\n", + c.Source, + err, + ) + } + + //if len(c.Test_Endpoint) != 0 { + if false { + tempUrl, err := url.Parse(c.Urls.LargeUrl) + if err != nil { + return fmt.Errorf("Error parsing large_https_download_url: %v", err) + } + c.Urls.LargeUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path + tempUrl, err = url.Parse(c.Urls.SmallUrl) + if err != nil { + return fmt.Errorf("Error parsing small_https_download_url: %v", err) + } + c.Urls.SmallUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path + tempUrl, err = url.Parse(c.Urls.UploadUrl) + if err != nil { + return fmt.Errorf("Error parsing https_upload_url: %v", err) + } + c.Urls.UploadUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path + } + return nil +} + +func (c *Config) String() string { + return fmt.Sprintf( + "Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s\nEndpoint: %s\n", + c.Version, + c.Urls.SmallUrl, + c.Urls.LargeUrl, + c.Urls.UploadUrl, + c.Test_Endpoint, + ) +} + +func (c *Config) IsValid() error { + if parsedUrl, err := url.ParseRequestURI(c.Urls.LargeUrl); err != nil || + parsedUrl.Scheme != "https" { + return fmt.Errorf( + "Configuration url large_https_download_url is invalid: %s", + utilities.Conditional( + len(c.Urls.LargeUrl) != 0, + c.Urls.LargeUrl, + "Missing", + ), + ) + } + if parsedUrl, err := url.ParseRequestURI(c.Urls.SmallUrl); err != nil || + parsedUrl.Scheme != "https" { + return fmt.Errorf( + "Configuration url small_https_download_url is invalid: %s", + utilities.Conditional( + len(c.Urls.SmallUrl) != 0, + c.Urls.SmallUrl, + "Missing", + ), + ) + } + if parsedUrl, err := url.ParseRequestURI(c.Urls.UploadUrl); err != nil || + parsedUrl.Scheme != "https" { + return fmt.Errorf( + "Configuration url https_upload_url is invalid: %s", + utilities.Conditional( + len(c.Urls.UploadUrl) != 0, + c.Urls.UploadUrl, + "Missing", + ), + ) + } + return nil +} diff --git a/constants/constants.go b/constants/constants.go index 2f906b6..147b643 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -18,7 +18,7 @@ var ( // The amount of time that the client will cooldown if it is in debug mode. CooldownPeriod time.Duration = 4 * time.Second // The number of probes to send when calculating RTT. - RPMProbeCount int = 5 + MeasurementProbeCount int = 5 // The amount of time that we give ourselves to calculate the RPM. RPMCalculationTime time.Duration = 10 * time.Second diff --git a/networkQuality.go b/networkQuality.go index 2aba054..ab9517e 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -17,25 +17,20 @@ package main import ( "context" "crypto/tls" - "encoding/json" "flag" "fmt" _ "io" - "io/ioutil" _ "log" - "math/rand" "net/http" - "net/url" "os" "runtime/pprof" - "strings" "time" "github.com/network-quality/goresponsiveness/ccw" + "github.com/network-quality/goresponsiveness/config" "github.com/network-quality/goresponsiveness/constants" "github.com/network-quality/goresponsiveness/debug" "github.com/network-quality/goresponsiveness/lgc" - "github.com/network-quality/goresponsiveness/ma" "github.com/network-quality/goresponsiveness/rpm" "github.com/network-quality/goresponsiveness/timeoutat" "github.com/network-quality/goresponsiveness/utilities" @@ -86,343 +81,6 @@ var ( ) ) -type ConfigUrls struct { - SmallUrl string `json:"small_https_download_url"` - LargeUrl string `json:"large_https_download_url"` - UploadUrl string `json:"https_upload_url"` -} - -type Config struct { - Version int - Urls ConfigUrls `json:"urls"` - Source string - Test_Endpoint string -} - -func (c *Config) Get(configHost string, configPath string) error { - configTransport := http2.Transport{} - configTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} - configClient := &http.Client{Transport: &configTransport} - // Extraneous /s in URLs is normally okay, but the Apple CDN does not - // like them. Make sure that we put exactly one (1) / between the host - // and the path. - if !strings.HasPrefix(configPath, "/") { - configPath = "/" + configPath - } - c.Source = fmt.Sprintf("https://%s%s", configHost, configPath) - resp, err := configClient.Get(c.Source) - if err != nil { - return fmt.Errorf( - "Error: Could not connect to configuration host %s: %v\n", - configHost, - err, - ) - } - - jsonConfig, err := ioutil.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf( - "Error: Could not read configuration content downloaded from %s: %v\n", - c.Source, - err, - ) - } - - err = json.Unmarshal(jsonConfig, c) - if err != nil { - return fmt.Errorf( - "Error: Could not parse configuration returned from %s: %v\n", - c.Source, - err, - ) - } - - //if len(c.Test_Endpoint) != 0 { - if false { - tempUrl, err := url.Parse(c.Urls.LargeUrl) - if err != nil { - return fmt.Errorf("Error parsing large_https_download_url: %v", err) - } - c.Urls.LargeUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path - tempUrl, err = url.Parse(c.Urls.SmallUrl) - if err != nil { - return fmt.Errorf("Error parsing small_https_download_url: %v", err) - } - c.Urls.SmallUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path - tempUrl, err = url.Parse(c.Urls.UploadUrl) - if err != nil { - return fmt.Errorf("Error parsing https_upload_url: %v", err) - } - c.Urls.UploadUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path - } - return nil -} - -func (c *Config) String() string { - return fmt.Sprintf( - "Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s\nEndpoint: %s\n", - c.Version, - c.Urls.SmallUrl, - c.Urls.LargeUrl, - c.Urls.UploadUrl, - c.Test_Endpoint, - ) -} - -func (c *Config) IsValid() error { - if parsedUrl, err := url.ParseRequestURI(c.Urls.LargeUrl); err != nil || - parsedUrl.Scheme != "https" { - return fmt.Errorf( - "Configuration url large_https_download_url is invalid: %s", - utilities.Conditional( - len(c.Urls.LargeUrl) != 0, - c.Urls.LargeUrl, - "Missing", - ), - ) - } - if parsedUrl, err := url.ParseRequestURI(c.Urls.SmallUrl); err != nil || - parsedUrl.Scheme != "https" { - return fmt.Errorf( - "Configuration url small_https_download_url is invalid: %s", - utilities.Conditional( - len(c.Urls.SmallUrl) != 0, - c.Urls.SmallUrl, - "Missing", - ), - ) - } - if parsedUrl, err := url.ParseRequestURI(c.Urls.UploadUrl); err != nil || - parsedUrl.Scheme != "https" { - return fmt.Errorf( - "Configuration url https_upload_url is invalid: %s", - utilities.Conditional( - len(c.Urls.UploadUrl) != 0, - c.Urls.UploadUrl, - "Missing", - ), - ) - } - return nil -} - -func addFlows( - ctx context.Context, - toAdd uint64, - lgcs *[]lgc.LoadGeneratingConnection, - lgcsPreviousTransferred *[]uint64, - lgcGenerator func() lgc.LoadGeneratingConnection, - debug debug.DebugLevel, -) { - for i := uint64(0); i < toAdd; i++ { - *lgcs = append(*lgcs, lgcGenerator()) - *lgcsPreviousTransferred = append(*lgcsPreviousTransferred, 0) - if !(*lgcs)[len(*lgcs)-1].Start(ctx, debug) { - fmt.Printf( - "Error starting lgc with id %d!\n", - (*lgcs)[len(*lgcs)-1].ClientId(), - ) - return - } - } -} - -type SaturationResult struct { - RateBps float64 - lgcs []lgc.LoadGeneratingConnection -} - -func saturate( - saturationCtx context.Context, - operatingCtx context.Context, - lgcGenerator func() lgc.LoadGeneratingConnection, - debugging *debug.DebugWithPrefix, -) (saturated chan SaturationResult) { - saturated = make(chan SaturationResult) - go func() { - - lgcs := make([]lgc.LoadGeneratingConnection, 0) - lgcsPreviousTransferred := make([]uint64, 0) - - addFlows( - saturationCtx, - constants.StartingNumberOfLoadGeneratingConnections, - &lgcs, - &lgcsPreviousTransferred, - lgcGenerator, - debugging.Level, - ) - - previousFlowIncreaseIteration := uint64(0) - previousMovingAverage := float64(0) - movingAverage := ma.NewMovingAverage( - constants.MovingAverageIntervalCount, - ) - movingAverageAverage := ma.NewMovingAverage( - constants.MovingAverageIntervalCount, - ) - - nextSampleStartTime := time.Now().Add(time.Second) - - for currentIteration := uint64(0); true; currentIteration++ { - - // When the program stops operating, then stop. - if saturationCtx.Err() != nil { - return - } - - // We may be asked to stop trying to saturate the - // network and return our current status. - if saturationCtx.Err() != nil { - //break - } - - now := time.Now() - // At each 1-second interval - if nextSampleStartTime.Sub(now) > 0 { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Sleeping until %v\n", - debugging, - nextSampleStartTime, - ) - } - time.Sleep(nextSampleStartTime.Sub(now)) - } else { - fmt.Fprintf(os.Stderr, "Warning: Missed a one-second deadline.\n") - } - nextSampleStartTime = time.Now().Add(time.Second) - - // Compute "instantaneous aggregate" goodput which is the number of - // bytes transferred within the last second. - totalTransfer := uint64(0) - allInvalid := true - for i := range lgcs { - if !lgcs[i].IsValid() { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Load-generating connection with id %d is invalid ... skipping.\n", - debugging, - lgcs[i].ClientId(), - ) - } - continue - } - allInvalid = false - previousTransferred := lgcsPreviousTransferred[i] - currentTransferred := lgcs[i].Transferred() - totalTransfer += (currentTransferred - previousTransferred) - lgcsPreviousTransferred[i] = currentTransferred - } - - // For some reason, all the lgcs are invalid. This likely means that - // the network/server went away. - if allInvalid { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: All lgcs were invalid. Assuming that network/server went away.\n", - debugging, - ) - } - break - } - - // Compute a moving average of the last - // constants.MovingAverageIntervalCount "instantaneous aggregate - // goodput" measurements - movingAverage.AddMeasurement(float64(totalTransfer)) - currentMovingAverage := movingAverage.CalculateAverage() - movingAverageAverage.AddMeasurement(currentMovingAverage) - movingAverageDelta := utilities.SignedPercentDifference( - currentMovingAverage, - previousMovingAverage, - ) - - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Instantaneous goodput: %f MB.\n", - debugging, - utilities.ToMBps(float64(totalTransfer)), - ) - fmt.Printf( - "%v: Previous moving average: %f MB.\n", - debugging, - utilities.ToMBps(previousMovingAverage), - ) - fmt.Printf( - "%v: Current moving average: %f MB.\n", - debugging, - utilities.ToMBps(currentMovingAverage), - ) - fmt.Printf( - "%v: Moving average delta: %f.\n", - debugging, - movingAverageDelta, - ) - } - - previousMovingAverage = currentMovingAverage - - // Special case: We won't make any adjustments on the first - // iteration. - if currentIteration == 0 { - continue - } - - // If moving average > "previous" moving average + InstabilityDelta: - if movingAverageDelta > constants.InstabilityDelta { - // Network did not yet reach saturation. If no flows added - // within the last 4 seconds, add 4 more flows - if (currentIteration - previousFlowIncreaseIteration) > uint64( - constants.MovingAverageStabilitySpan, - ) { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Adding flows because we are unsaturated and waited a while.\n", - debugging, - ) - } - addFlows( - saturationCtx, - constants.AdditiveNumberOfLoadGeneratingConnections, - &lgcs, - &lgcsPreviousTransferred, - lgcGenerator, - debugging.Level, - ) - previousFlowIncreaseIteration = currentIteration - } else { - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debugging) - } - } - } else { // Else, network reached saturation for the current flow count. - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: Network reached saturation with current flow count.\n", debugging) - } - // If new flows added and for 4 seconds the moving average - // throughput did not change: network reached stable saturation - if (currentIteration-previousFlowIncreaseIteration) < uint64(constants.MovingAverageStabilitySpan) && movingAverageAverage.AllSequentialIncreasesLessThan(float64(5)) { - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging) - } - break - } else { - // Else, add four more flows - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging) - } - addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, &lgcsPreviousTransferred, lgcGenerator, debugging.Level) - previousFlowIncreaseIteration = currentIteration - } - } - - } - saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), lgcs: lgcs} - }() - return -} - func main() { flag.Parse() @@ -433,7 +91,7 @@ func main() { saturationCtx, cancelSaturationCtx := context.WithCancel( context.Background(), ) - config := &Config{} + config := &config.Config{} var debugLevel debug.DebugLevel = debug.Error if *debugCliFlag { @@ -508,6 +166,10 @@ func main() { } } + /* + * Create (and then, ironically, name) two anonymous functions that, when invoked, + * will create load-generating connections for upload/download/ + */ generate_lbd := func() lgc.LoadGeneratingConnection { return &lgc.LoadGeneratingConnectionDownload{ Path: config.Urls.LargeUrl, @@ -524,13 +186,13 @@ func main() { var downloadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "download") var uploadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "upload") - downloadSaturationChannel := saturate( + downloadSaturationChannel := rpm.Saturate( saturationCtx, operatingCtx, generate_lbd, downloadDebugging, ) - uploadSaturationChannel := saturate( + uploadSaturationChannel := rpm.Saturate( saturationCtx, operatingCtx, generate_lbu, @@ -540,8 +202,8 @@ func main() { saturationTimeout := false uploadSaturated := false downloadSaturated := false - downloadSaturation := SaturationResult{} - uploadSaturation := SaturationResult{} + downloadSaturation := rpm.SaturationResult{} + uploadSaturation := rpm.SaturationResult{} for !(uploadSaturated && downloadSaturated) { select { @@ -557,7 +219,7 @@ func main() { "", ), utilities.ToMBps(downloadSaturation.RateBps), - len(downloadSaturation.lgcs), + len(downloadSaturation.LGCs), ) } } @@ -573,7 +235,7 @@ func main() { "", ), utilities.ToMBps(uploadSaturation.RateBps), - len(uploadSaturation.lgcs), + len(uploadSaturation.LGCs), ) } } @@ -585,7 +247,7 @@ func main() { // will exit! fmt.Fprint( os.Stderr, - "Error: Saturation could not be completed in time and no provisional rates could be accessed. Test failed.\n", + "Error: Saturation could not be completed in time and no provisional rates could be assessed. Test failed.\n", ) cancelOperatingCtx() if *debugCliFlag { @@ -629,25 +291,21 @@ func main() { ) } - totalRTsCount := uint64(0) - totalRTTimes := float64(0) - rttTimeout := false + totalMeasurements := uint64(0) + totalMeasurementTimes := float64(0) + measurementTimeout := false - for i := 0; i < constants.RPMProbeCount && !rttTimeout; i++ { - if len(downloadSaturation.lgcs) == 0 { + for i := 0; i < constants.MeasurementProbeCount && !measurementTimeout; i++ { + if len(downloadSaturation.LGCs) == 0 { continue } - randomlgcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))). - Int() % - len( - downloadSaturation.lgcs, - ) - if !downloadSaturation.lgcs[randomlgcsIndex].IsValid() { + randomLGCsIndex := utilities.RandBetween(len(downloadSaturation.LGCs)) + if !downloadSaturation.LGCs[randomLGCsIndex].IsValid() { if *debugCliFlag { fmt.Printf( - "%v: The randomly selected download lgc (with id %d) was invalid. Skipping.\n", + "%v: The randomly selected saturated connection (with id %d) was invalid. Skipping.\n", debugCliFlag, - downloadSaturation.lgcs[randomlgcsIndex].ClientId(), + downloadSaturation.LGCs[randomLGCsIndex].ClientId(), ) } @@ -657,7 +315,7 @@ func main() { if time.Since(timeoutAbsoluteTime) > 0 { if *debugCliFlag { fmt.Printf( - "Pathologically could not find valid lgcs to use for measurement.\n", + "Pathologically could not find valid saturated connections use for measurement.\n", ) } break @@ -665,46 +323,47 @@ func main() { continue } - newTransport := http2.Transport{} - newTransport.TLSClientConfig = &tls.Config{} + unsaturatedMeasurementTransport := http2.Transport{} + unsaturatedMeasurementTransport.TLSClientConfig = &tls.Config{} if sslKeyFileConcurrentWriter != nil { - newTransport.TLSClientConfig.KeyLogWriter = sslKeyFileConcurrentWriter + unsaturatedMeasurementTransport.TLSClientConfig.KeyLogWriter = sslKeyFileConcurrentWriter } - newTransport.TLSClientConfig.InsecureSkipVerify = true - newClient := http.Client{Transport: &newTransport} + unsaturatedMeasurementTransport.TLSClientConfig.InsecureSkipVerify = true + newClient := http.Client{Transport: &unsaturatedMeasurementTransport} - newRTTProbe := rpm.NewProbe(&newClient, debugLevel) + unsaturatedMeasurementProbe := rpm.NewProbe(&newClient, debugLevel) - saturatedRTTProbe := rpm.NewProbe( - downloadSaturation.lgcs[randomlgcsIndex].Client(), + saturatedMeasurementProbe := rpm.NewProbe( + downloadSaturation.LGCs[randomLGCsIndex].Client(), debugLevel, ) select { case <-timeoutChannel: { - rttTimeout = true + measurementTimeout = true } - case sequentialRTTimes := <-rpm.CalculateProbeMeasurements(operatingCtx, *strictFlag, saturatedRTTProbe, newRTTProbe, config.Urls.SmallUrl, debugLevel): + case sequentialMeasurementTimes := <-rpm.CalculateProbeMeasurements(operatingCtx, *strictFlag, saturatedMeasurementProbe, unsaturatedMeasurementProbe, config.Urls.SmallUrl, debugLevel): { - if sequentialRTTimes.Err != nil { + if sequentialMeasurementTimes.Err != nil { fmt.Printf( - "Failed to calculate a time for sequential RTTs: %v\n", - sequentialRTTimes.Err, + "Failed to calculate a time for sequential measurements: %v\n", + sequentialMeasurementTimes.Err, ) continue } if debug.IsDebug(debugLevel) { - fmt.Printf("rttProbe: %v\n", newRTTProbe) + fmt.Printf("unsaturatedMeasurementProbe: %v\n", unsaturatedMeasurementProbe) } - // We know that we have a good Sequential RTT. - totalRTsCount += uint64(sequentialRTTimes.MeasurementCount) - totalRTTimes += sequentialRTTimes.Delay.Seconds() + // We know that we have a good Sequential measurement. + totalMeasurements += uint64(sequentialMeasurementTimes.MeasurementCount) + totalMeasurementTimes += sequentialMeasurementTimes.Delay.Seconds() if debug.IsDebug(debugLevel) { fmt.Printf( - "sequentialRTTsTime: %v\n", - sequentialRTTimes.Delay.Seconds(), + "most-recent sequential measurement time: %v; most-recent sequential measurement count: %v\n", + sequentialMeasurementTimes.Delay.Seconds(), + sequentialMeasurementTimes.MeasurementCount, ) } } @@ -715,31 +374,31 @@ func main() { "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", utilities.ToMbps(downloadSaturation.RateBps), utilities.ToMBps(downloadSaturation.RateBps), - len(downloadSaturation.lgcs), + len(downloadSaturation.LGCs), ) fmt.Printf( "Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", utilities.ToMbps(uploadSaturation.RateBps), utilities.ToMBps(uploadSaturation.RateBps), - len(uploadSaturation.lgcs), + len(uploadSaturation.LGCs), ) - if totalRTsCount != 0 { + if totalMeasurements != 0 { // "... it sums the five time values for each probe, and divides by the // total // number of probes to compute an average probe duration. The // reciprocal of this, normalized to 60 seconds, gives the Round-trips // Per Minute (RPM)." - // "average probe duration" = totalRTTimes / totalRTsCount. - // The reciprocol of this = 1 / (totalRTTimes / totalRTsCount) <- + // "average probe duration" = totalMeasurementTimes / totalMeasurements. + // The reciprocol of this = 1 / (totalMeasurementTimes / totalMeasurements) <- // semantically the probes-per-second. // Normalized to 60 seconds: 60 * (1 - // / (totalRTTimes / totalRTsCount))) <- semantically the number of + // / ((totalMeasurementTimes / totalMeasurements)))) <- semantically the number of // probes per minute. rpm := float64( time.Minute.Seconds(), - ) / (totalRTTimes / (float64(totalRTsCount))) - fmt.Printf("Total RTTs measured: %d\n", totalRTsCount) + ) / (totalMeasurementTimes / (float64(totalMeasurements))) + fmt.Printf("Total measurements: %d\n", totalMeasurements) fmt.Printf("RPM: %5.0f\n", rpm) } else { fmt.Printf("Error occurred calculating RPM -- no probe measurements received.\n") diff --git a/rpm/rpm.go b/rpm/rpm.go index a349cee..8f431b6 100644 --- a/rpm/rpm.go +++ b/rpm/rpm.go @@ -7,14 +7,235 @@ import ( "io" "net/http" "net/http/httptrace" + "os" "time" + "github.com/network-quality/goresponsiveness/constants" "github.com/network-quality/goresponsiveness/debug" + "github.com/network-quality/goresponsiveness/lgc" + "github.com/network-quality/goresponsiveness/ma" "github.com/network-quality/goresponsiveness/stats" "github.com/network-quality/goresponsiveness/traceable" "github.com/network-quality/goresponsiveness/utilities" ) +func addFlows( + ctx context.Context, + toAdd uint64, + lgcs *[]lgc.LoadGeneratingConnection, + lgcsPreviousTransferred *[]uint64, + lgcGenerator func() lgc.LoadGeneratingConnection, + debug debug.DebugLevel, +) { + for i := uint64(0); i < toAdd; i++ { + *lgcs = append(*lgcs, lgcGenerator()) + *lgcsPreviousTransferred = append(*lgcsPreviousTransferred, 0) + if !(*lgcs)[len(*lgcs)-1].Start(ctx, debug) { + fmt.Printf( + "Error starting lgc with id %d!\n", + (*lgcs)[len(*lgcs)-1].ClientId(), + ) + return + } + } +} + +type SaturationResult struct { + RateBps float64 + LGCs []lgc.LoadGeneratingConnection +} + +func Saturate( + saturationCtx context.Context, + operatingCtx context.Context, + lgcGenerator func() lgc.LoadGeneratingConnection, + debugging *debug.DebugWithPrefix, +) (saturated chan SaturationResult) { + saturated = make(chan SaturationResult) + go func() { + + lgcs := make([]lgc.LoadGeneratingConnection, 0) + lgcsPreviousTransferred := make([]uint64, 0) + + addFlows( + saturationCtx, + constants.StartingNumberOfLoadGeneratingConnections, + &lgcs, + &lgcsPreviousTransferred, + lgcGenerator, + debugging.Level, + ) + + previousFlowIncreaseIteration := uint64(0) + previousMovingAverage := float64(0) + movingAverage := ma.NewMovingAverage( + constants.MovingAverageIntervalCount, + ) + movingAverageAverage := ma.NewMovingAverage( + constants.MovingAverageIntervalCount, + ) + + nextSampleStartTime := time.Now().Add(time.Second) + + for currentIteration := uint64(0); true; currentIteration++ { + + // When the program stops operating, then stop. + if saturationCtx.Err() != nil { + return + } + + // We may be asked to stop trying to saturate the + // network and return our current status. + if saturationCtx.Err() != nil { + //break + } + + now := time.Now() + // At each 1-second interval + if nextSampleStartTime.Sub(now) > 0 { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Sleeping until %v\n", + debugging, + nextSampleStartTime, + ) + } + time.Sleep(nextSampleStartTime.Sub(now)) + } else { + fmt.Fprintf(os.Stderr, "Warning: Missed a one-second deadline.\n") + } + nextSampleStartTime = time.Now().Add(time.Second) + + // Compute "instantaneous aggregate" goodput which is the number of + // bytes transferred within the last second. + totalTransfer := uint64(0) + allInvalid := true + for i := range lgcs { + if !lgcs[i].IsValid() { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Load-generating connection with id %d is invalid ... skipping.\n", + debugging, + lgcs[i].ClientId(), + ) + } + continue + } + allInvalid = false + previousTransferred := lgcsPreviousTransferred[i] + currentTransferred := lgcs[i].Transferred() + totalTransfer += (currentTransferred - previousTransferred) + lgcsPreviousTransferred[i] = currentTransferred + } + + // For some reason, all the lgcs are invalid. This likely means that + // the network/server went away. + if allInvalid { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: All lgcs were invalid. Assuming that network/server went away.\n", + debugging, + ) + } + break + } + + // Compute a moving average of the last + // constants.MovingAverageIntervalCount "instantaneous aggregate + // goodput" measurements + movingAverage.AddMeasurement(float64(totalTransfer)) + currentMovingAverage := movingAverage.CalculateAverage() + movingAverageAverage.AddMeasurement(currentMovingAverage) + movingAverageDelta := utilities.SignedPercentDifference( + currentMovingAverage, + previousMovingAverage, + ) + + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Instantaneous goodput: %f MB.\n", + debugging, + utilities.ToMBps(float64(totalTransfer)), + ) + fmt.Printf( + "%v: Previous moving average: %f MB.\n", + debugging, + utilities.ToMBps(previousMovingAverage), + ) + fmt.Printf( + "%v: Current moving average: %f MB.\n", + debugging, + utilities.ToMBps(currentMovingAverage), + ) + fmt.Printf( + "%v: Moving average delta: %f.\n", + debugging, + movingAverageDelta, + ) + } + + previousMovingAverage = currentMovingAverage + + // Special case: We won't make any adjustments on the first + // iteration. + if currentIteration == 0 { + continue + } + + // If moving average > "previous" moving average + InstabilityDelta: + if movingAverageDelta > constants.InstabilityDelta { + // Network did not yet reach saturation. If no flows added + // within the last 4 seconds, add 4 more flows + if (currentIteration - previousFlowIncreaseIteration) > uint64( + constants.MovingAverageStabilitySpan, + ) { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Adding flows because we are unsaturated and waited a while.\n", + debugging, + ) + } + addFlows( + saturationCtx, + constants.AdditiveNumberOfLoadGeneratingConnections, + &lgcs, + &lgcsPreviousTransferred, + lgcGenerator, + debugging.Level, + ) + previousFlowIncreaseIteration = currentIteration + } else { + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debugging) + } + } + } else { // Else, network reached saturation for the current flow count. + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: Network reached saturation with current flow count.\n", debugging) + } + // If new flows added and for 4 seconds the moving average + // throughput did not change: network reached stable saturation + if (currentIteration-previousFlowIncreaseIteration) < uint64(constants.MovingAverageStabilitySpan) && movingAverageAverage.AllSequentialIncreasesLessThan(float64(5)) { + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging) + } + break + } else { + // Else, add four more flows + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging) + } + addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, &lgcsPreviousTransferred, lgcGenerator, debugging.Level) + previousFlowIncreaseIteration = currentIteration + } + } + + } + saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs} + }() + return +} + type Probe struct { client *http.Client stats *stats.TraceStats diff --git a/utilities/utilities.go b/utilities/utilities.go index 4b114ba..a143d31 100644 --- a/utilities/utilities.go +++ b/utilities/utilities.go @@ -17,6 +17,7 @@ package utilities import ( "fmt" "math" + "math/rand" "os" "reflect" "sync/atomic" @@ -117,3 +118,7 @@ func (optional Optional[S]) String() string { return "None" } } + +func RandBetween(max int) int { + return rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % max +} -- cgit v1.2.3 From de7a7bf994a020049eca89098aab9d13ff81f361 Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Sat, 4 Jun 2022 05:49:07 -0400 Subject: [Feature] Access TCP_INFO about underlying TCP connections This will only work on *nix systems. Code that displays information in this PR is only exemplary -- I am sure that there are better places in the code to display it! --- go.mod | 5 ++++- go.sum | 2 ++ lgc/lgc.go | 9 +++++++++ networkQuality.go | 19 ++++++++++++++++++ utilities/utilities.go | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 88 insertions(+), 1 deletion(-) (limited to 'networkQuality.go') diff --git a/go.mod b/go.mod index d60ef0a..39b9252 100644 --- a/go.mod +++ b/go.mod @@ -4,4 +4,7 @@ go 1.18 require golang.org/x/net v0.0.0-20220225172249-27dd8689420f -require golang.org/x/text v0.3.7 // indirect +require ( + golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect + golang.org/x/text v0.3.7 // indirect +) diff --git a/go.sum b/go.sum index 4832ce9..ce291cf 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= diff --git a/lgc/lgc.go b/lgc/lgc.go index 13f2c06..5b35fd1 100644 --- a/lgc/lgc.go +++ b/lgc/lgc.go @@ -38,6 +38,7 @@ type LoadGeneratingConnection interface { Client() *http.Client IsValid() bool ClientId() uint64 + Stats() *stats.TraceStats } // TODO: All 64-bit fields that are accessed atomically must @@ -284,6 +285,10 @@ func (lbd *LoadGeneratingConnectionDownload) IsValid() bool { return lbd.valid } +func (lbd *LoadGeneratingConnectionDownload) Stats() *stats.TraceStats { + return &lbd.stats +} + func (lbd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) { var request *http.Request = nil var get *http.Response = nil @@ -420,3 +425,7 @@ func (lgu *LoadGeneratingConnectionUpload) Start( go lgu.doUpload(ctx) return true } + +func (lbd *LoadGeneratingConnectionUpload) Stats() *stats.TraceStats { + return nil +} diff --git a/networkQuality.go b/networkQuality.go index ab9517e..7593714 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -21,6 +21,7 @@ import ( "fmt" _ "io" _ "log" + "net" "net/http" "os" "runtime/pprof" @@ -323,6 +324,24 @@ func main() { continue } + if *debugCliFlag { + // Note: This code is just an example of how to use utilities.GetTCPInfo. + rawConn := downloadSaturation.LGCs[randomLGCsIndex].Stats().ConnInfo.Conn + tlsConn, ok := rawConn.(*tls.Conn) + if !ok { + fmt.Printf("OOPS: Could not get the TCP info for the connection (not a TLS connection)!\n") + } + tcpConn, ok := tlsConn.NetConn().(*net.TCPConn) + if !ok { + fmt.Printf("OOPS: Could not get the TCP info for the connection (not a TCP connection)!\n") + } + if info, err := utilities.GetTCPInfo(tcpConn); err != nil { + fmt.Printf("OOPS: Could not get the TCP info for the connection: %v!\n", err) + } else { + utilities.PrintTCPInfo(info) + } + } + unsaturatedMeasurementTransport := http2.Transport{} unsaturatedMeasurementTransport.TLSClientConfig = &tls.Config{} if sslKeyFileConcurrentWriter != nil { diff --git a/utilities/utilities.go b/utilities/utilities.go index a143d31..e4c8a0d 100644 --- a/utilities/utilities.go +++ b/utilities/utilities.go @@ -18,10 +18,13 @@ import ( "fmt" "math" "math/rand" + "net" "os" "reflect" "sync/atomic" "time" + + "golang.org/x/sys/unix" ) func IsInterfaceNil(ifc interface{}) bool { @@ -122,3 +125,54 @@ func (optional Optional[S]) String() string { func RandBetween(max int) int { return rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % max } + +func GetTCPInfo(connection net.Conn) (*unix.TCPInfo, error) { + tcpConn, ok := connection.(*net.TCPConn) + if !ok { + return nil, fmt.Errorf("connection is not a net.TCPConn") + } + rawConn, err := tcpConn.SyscallConn() + if err != nil { + return nil, err + } + + var info *unix.TCPInfo = nil + rawConn.Control(func(fd uintptr) { + info, err = unix.GetsockoptTCPInfo(int(fd), unix.SOL_TCP, unix.TCP_INFO) + }) + return info, err +} + +func PrintTCPInfo(info *unix.TCPInfo) { + fmt.Printf("TCPInfo: \n") + fmt.Printf(" State: %v\n", info.State) + fmt.Printf(" Ca_state: %v\n", info.Ca_state) + fmt.Printf(" Retransmits: %v\n", info.Retransmits) + fmt.Printf(" Probes: %v\n", info.Probes) + fmt.Printf(" Backoff: %v\n", info.Backoff) + fmt.Printf(" Options: %v\n", info.Options) + fmt.Printf(" Rto: %v\n", info.Rto) + fmt.Printf(" Ato: %v\n", info.Ato) + fmt.Printf(" Snd_mss: %v\n", info.Snd_mss) + fmt.Printf(" Rcv_mss: %v\n", info.Rcv_mss) + fmt.Printf(" Unacked: %v\n", info.Unacked) + fmt.Printf(" Sacked: %v\n", info.Sacked) + fmt.Printf(" Lost: %v\n", info.Lost) + fmt.Printf(" Retrans: %v\n", info.Retrans) + fmt.Printf(" Fackets: %v\n", info.Fackets) + fmt.Printf(" Last_data_sent: %v\n", info.Last_data_sent) + fmt.Printf(" Last_ack_sent: %v\n", info.Last_ack_sent) + fmt.Printf(" Last_data_recv: %v\n", info.Last_data_recv) + fmt.Printf(" Last_ack_recv: %v\n", info.Last_ack_recv) + fmt.Printf(" Pmtu: %v\n", info.Pmtu) + fmt.Printf(" Rcv_ssthresh: %v\n", info.Rcv_ssthresh) + fmt.Printf(" Rtt: %v\n", info.Rtt) + fmt.Printf(" Rttvar: %v\n", info.Rttvar) + fmt.Printf(" Snd_ssthresh: %v\n", info.Snd_ssthresh) + fmt.Printf(" Snd_cwnd: %v\n", info.Snd_cwnd) + fmt.Printf(" Advmss: %v\n", info.Advmss) + fmt.Printf(" Reordering: %v\n", info.Reordering) + fmt.Printf(" Rcv_rtt: %v\n", info.Rcv_rtt) + fmt.Printf(" Rcv_space: %v\n", info.Rcv_space) + fmt.Printf(" Total_retrans: %v\n", info.Total_retrans) +} -- cgit v1.2.3 From 9be87fa5ec89c9e393c9c93b3cb36668c71593d6 Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Sat, 4 Jun 2022 21:24:19 -0400 Subject: [Feature] Add conditional compilation support for GetTCPInfo Now there is functionality for conditionally supporting GetTCPInfo depending on the platform. If the platform supports it, then the client can call utilities.GetTCPInfo. In the case that the platform does not support the GetTCPInfo function call, the result is an error of the type `NotImplemented`. --- lgc/lgc.go | 31 ++++++++++++----------- networkQuality.go | 7 +++++- utilities/tcpinfo_other.go | 18 ++++++++++++++ utilities/tcpinfo_unix.go | 62 ++++++++++++++++++++++++++++++++++++++++++++++ utilities/utilities.go | 56 ++++------------------------------------- 5 files changed, 107 insertions(+), 67 deletions(-) create mode 100644 utilities/tcpinfo_other.go create mode 100644 utilities/tcpinfo_unix.go (limited to 'networkQuality.go') diff --git a/lgc/lgc.go b/lgc/lgc.go index 5b35fd1..fb51ec1 100644 --- a/lgc/lgc.go +++ b/lgc/lgc.go @@ -281,40 +281,40 @@ func (lgd *LoadGeneratingConnectionDownload) Start( go lgd.doDownload(ctx) return true } -func (lbd *LoadGeneratingConnectionDownload) IsValid() bool { - return lbd.valid +func (lgd *LoadGeneratingConnectionDownload) IsValid() bool { + return lgd.valid } -func (lbd *LoadGeneratingConnectionDownload) Stats() *stats.TraceStats { - return &lbd.stats +func (lgd *LoadGeneratingConnectionDownload) Stats() *stats.TraceStats { + return &lgd.stats } -func (lbd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) { +func (lgd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) { var request *http.Request = nil var get *http.Response = nil var err error = nil if request, err = http.NewRequestWithContext( - httptrace.WithClientTrace(ctx, lbd.tracer), + httptrace.WithClientTrace(ctx, lgd.tracer), "GET", - lbd.Path, + lgd.Path, nil, ); err != nil { - lbd.valid = false + lgd.valid = false return } - lbd.downloadStartTime = time.Now() - lbd.lastIntervalEnd = 0 + lgd.downloadStartTime = time.Now() + lgd.lastIntervalEnd = 0 - if get, err = lbd.client.Do(request); err != nil { - lbd.valid = false + if get, err = lgd.client.Do(request); err != nil { + lgd.valid = false return } - cr := &countingReader{n: &lbd.downloaded, ctx: ctx, readable: get.Body} + cr := &countingReader{n: &lgd.downloaded, ctx: ctx, readable: get.Body} _, _ = io.Copy(ioutil.Discard, cr) get.Body.Close() - if debug.IsDebug(lbd.debug) { + if debug.IsDebug(lgd.debug) { fmt.Printf("Ending a load-generating download.\n") } } @@ -426,6 +426,7 @@ func (lgu *LoadGeneratingConnectionUpload) Start( return true } -func (lbd *LoadGeneratingConnectionUpload) Stats() *stats.TraceStats { +func (lgu *LoadGeneratingConnectionUpload) Stats() *stats.TraceStats { + // Get all your stats from the download side of the LGC. return nil } diff --git a/networkQuality.go b/networkQuality.go index 7593714..d9bdc94 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -336,7 +336,12 @@ func main() { fmt.Printf("OOPS: Could not get the TCP info for the connection (not a TCP connection)!\n") } if info, err := utilities.GetTCPInfo(tcpConn); err != nil { - fmt.Printf("OOPS: Could not get the TCP info for the connection: %v!\n", err) + switch err.(type) { + case *utilities.NotImplemented: + fmt.Printf("GetTCPInfo not implemented on this platform.\n") + default: + fmt.Printf("OOPS: Could not get the TCP info for the connection: %v!\n", err) + } } else { utilities.PrintTCPInfo(info) } diff --git a/utilities/tcpinfo_other.go b/utilities/tcpinfo_other.go new file mode 100644 index 0000000..8dd070b --- /dev/null +++ b/utilities/tcpinfo_other.go @@ -0,0 +1,18 @@ +//go:build !darwin && !dragonfly && !freebsd && !linux && !netbsd && !openbsd +// +build !darwin,!dragonfly,!freebsd,!linux,!netbsd,!openbsd + +package utilities + +import ( + "net" + + "golang.org/x/sys/unix" +) + +func GetTCPInfo(connection net.Conn) (*unix.TCPInfo, error) { + return nil, NotImplemented{Functionality: "GetTCPInfo"} +} + +func PrintTCPInfo(info *unix.TCPInfo) { + return +} diff --git a/utilities/tcpinfo_unix.go b/utilities/tcpinfo_unix.go new file mode 100644 index 0000000..b0b5252 --- /dev/null +++ b/utilities/tcpinfo_unix.go @@ -0,0 +1,62 @@ +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd + +package utilities + +import ( + "fmt" + "net" + + "golang.org/x/sys/unix" +) + +func GetTCPInfo(connection net.Conn) (*unix.TCPInfo, error) { + tcpConn, ok := connection.(*net.TCPConn) + if !ok { + return nil, fmt.Errorf("connection is not a net.TCPConn") + } + rawConn, err := tcpConn.SyscallConn() + if err != nil { + return nil, err + } + + var info *unix.TCPInfo = nil + rawConn.Control(func(fd uintptr) { + info, err = unix.GetsockoptTCPInfo(int(fd), unix.SOL_TCP, unix.TCP_INFO) + }) + return info, err +} + +func PrintTCPInfo(info *unix.TCPInfo) { + fmt.Printf("TCPInfo: \n") + fmt.Printf(" State: %v\n", info.State) + fmt.Printf(" Ca_state: %v\n", info.Ca_state) + fmt.Printf(" Retransmits: %v\n", info.Retransmits) + fmt.Printf(" Probes: %v\n", info.Probes) + fmt.Printf(" Backoff: %v\n", info.Backoff) + fmt.Printf(" Options: %v\n", info.Options) + fmt.Printf(" Rto: %v\n", info.Rto) + fmt.Printf(" Ato: %v\n", info.Ato) + fmt.Printf(" Snd_mss: %v\n", info.Snd_mss) + fmt.Printf(" Rcv_mss: %v\n", info.Rcv_mss) + fmt.Printf(" Unacked: %v\n", info.Unacked) + fmt.Printf(" Sacked: %v\n", info.Sacked) + fmt.Printf(" Lost: %v\n", info.Lost) + fmt.Printf(" Retrans: %v\n", info.Retrans) + fmt.Printf(" Fackets: %v\n", info.Fackets) + fmt.Printf(" Last_data_sent: %v\n", info.Last_data_sent) + fmt.Printf(" Last_ack_sent: %v\n", info.Last_ack_sent) + fmt.Printf(" Last_data_recv: %v\n", info.Last_data_recv) + fmt.Printf(" Last_ack_recv: %v\n", info.Last_ack_recv) + fmt.Printf(" Pmtu: %v\n", info.Pmtu) + fmt.Printf(" Rcv_ssthresh: %v\n", info.Rcv_ssthresh) + fmt.Printf(" Rtt: %v\n", info.Rtt) + fmt.Printf(" Rttvar: %v\n", info.Rttvar) + fmt.Printf(" Snd_ssthresh: %v\n", info.Snd_ssthresh) + fmt.Printf(" Snd_cwnd: %v\n", info.Snd_cwnd) + fmt.Printf(" Advmss: %v\n", info.Advmss) + fmt.Printf(" Reordering: %v\n", info.Reordering) + fmt.Printf(" Rcv_rtt: %v\n", info.Rcv_rtt) + fmt.Printf(" Rcv_space: %v\n", info.Rcv_space) + fmt.Printf(" Total_retrans: %v\n", info.Total_retrans) +} diff --git a/utilities/utilities.go b/utilities/utilities.go index e4c8a0d..7e26ab9 100644 --- a/utilities/utilities.go +++ b/utilities/utilities.go @@ -18,13 +18,10 @@ import ( "fmt" "math" "math/rand" - "net" "os" "reflect" "sync/atomic" "time" - - "golang.org/x/sys/unix" ) func IsInterfaceNil(ifc interface{}) bool { @@ -126,53 +123,10 @@ func RandBetween(max int) int { return rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % max } -func GetTCPInfo(connection net.Conn) (*unix.TCPInfo, error) { - tcpConn, ok := connection.(*net.TCPConn) - if !ok { - return nil, fmt.Errorf("connection is not a net.TCPConn") - } - rawConn, err := tcpConn.SyscallConn() - if err != nil { - return nil, err - } +type NotImplemented struct { + Functionality string +} - var info *unix.TCPInfo = nil - rawConn.Control(func(fd uintptr) { - info, err = unix.GetsockoptTCPInfo(int(fd), unix.SOL_TCP, unix.TCP_INFO) - }) - return info, err -} - -func PrintTCPInfo(info *unix.TCPInfo) { - fmt.Printf("TCPInfo: \n") - fmt.Printf(" State: %v\n", info.State) - fmt.Printf(" Ca_state: %v\n", info.Ca_state) - fmt.Printf(" Retransmits: %v\n", info.Retransmits) - fmt.Printf(" Probes: %v\n", info.Probes) - fmt.Printf(" Backoff: %v\n", info.Backoff) - fmt.Printf(" Options: %v\n", info.Options) - fmt.Printf(" Rto: %v\n", info.Rto) - fmt.Printf(" Ato: %v\n", info.Ato) - fmt.Printf(" Snd_mss: %v\n", info.Snd_mss) - fmt.Printf(" Rcv_mss: %v\n", info.Rcv_mss) - fmt.Printf(" Unacked: %v\n", info.Unacked) - fmt.Printf(" Sacked: %v\n", info.Sacked) - fmt.Printf(" Lost: %v\n", info.Lost) - fmt.Printf(" Retrans: %v\n", info.Retrans) - fmt.Printf(" Fackets: %v\n", info.Fackets) - fmt.Printf(" Last_data_sent: %v\n", info.Last_data_sent) - fmt.Printf(" Last_ack_sent: %v\n", info.Last_ack_sent) - fmt.Printf(" Last_data_recv: %v\n", info.Last_data_recv) - fmt.Printf(" Last_ack_recv: %v\n", info.Last_ack_recv) - fmt.Printf(" Pmtu: %v\n", info.Pmtu) - fmt.Printf(" Rcv_ssthresh: %v\n", info.Rcv_ssthresh) - fmt.Printf(" Rtt: %v\n", info.Rtt) - fmt.Printf(" Rttvar: %v\n", info.Rttvar) - fmt.Printf(" Snd_ssthresh: %v\n", info.Snd_ssthresh) - fmt.Printf(" Snd_cwnd: %v\n", info.Snd_cwnd) - fmt.Printf(" Advmss: %v\n", info.Advmss) - fmt.Printf(" Reordering: %v\n", info.Reordering) - fmt.Printf(" Rcv_rtt: %v\n", info.Rcv_rtt) - fmt.Printf(" Rcv_space: %v\n", info.Rcv_space) - fmt.Printf(" Total_retrans: %v\n", info.Total_retrans) +func (ni *NotImplemented) Error() string { + return fmt.Sprintf("%v not implemented.\n", ni.Functionality) } -- cgit v1.2.3 From d06438ff7414abfcc5a2a1bd13a935ee594f0842 Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Sun, 5 Jun 2022 03:32:17 -0400 Subject: [Feature] Add -extended-stats for *nix platforms On *nix platforms, there is now a `-extended-stats` option that will print out some extended statistics for the test. The statistics are based on information gleaned from the `TCP_INFO` of the underlying TCP connections. What is implemented so far is just a proof of concept of the extended stats that could be calculated. Depending on what users want, we can add additional extended statistics. --- extendedstats/other.go | 22 ++++++++++++++ extendedstats/unix.go | 72 ++++++++++++++++++++++++++++++++++++++++++++++ networkQuality.go | 54 +++++++++++++++++----------------- utilities/tcpinfo_other.go | 18 ------------ utilities/tcpinfo_unix.go | 62 --------------------------------------- utilities/utilities.go | 11 ++++--- 6 files changed, 127 insertions(+), 112 deletions(-) create mode 100644 extendedstats/other.go create mode 100644 extendedstats/unix.go delete mode 100644 utilities/tcpinfo_other.go delete mode 100644 utilities/tcpinfo_unix.go (limited to 'networkQuality.go') diff --git a/extendedstats/other.go b/extendedstats/other.go new file mode 100644 index 0000000..c0f89ed --- /dev/null +++ b/extendedstats/other.go @@ -0,0 +1,22 @@ +//go:build !darwin && !dragonfly && !freebsd && !linux && !netbsd && !openbsd +// +build !darwin,!dragonfly,!freebsd,!linux,!netbsd,!openbsd + +package extendedstats + +import ( + "net" + + "golang.org/x/sys/unix" +) + +type ExtendedStats struct{} + +func (es *ExtendedStats) IncorporateConnectionStats(conn net.Conn) {} + +func (es *ExtendedStats) Repr() string { + return "" +} + +func ExtendedStatsAvailable() bool { + return false +} diff --git a/extendedstats/unix.go b/extendedstats/unix.go new file mode 100644 index 0000000..e50d719 --- /dev/null +++ b/extendedstats/unix.go @@ -0,0 +1,72 @@ +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd +// +build darwin dragonfly freebsd linux netbsd openbsd + +package extendedstats + +import ( + "crypto/tls" + "fmt" + "net" + + "github.com/network-quality/goresponsiveness/utilities" + "golang.org/x/sys/unix" +) + +type ExtendedStats struct { + MaxPathMtu uint64 + TotalRetransmissions uint64 + AverageRtt float64 + rtt_measurements uint64 + total_rtt float64 +} + +func ExtendedStatsAvailable() bool { + return true +} + +func (es *ExtendedStats) IncorporateConnectionStats(rawConn net.Conn) { + tlsConn, ok := rawConn.(*tls.Conn) + if !ok { + fmt.Printf("OOPS: Could not get the TCP info for the connection (not a TLS connection)!\n") + } + tcpConn, ok := tlsConn.NetConn().(*net.TCPConn) + if !ok { + fmt.Printf("OOPS: Could not get the TCP info for the connection (not a TCP connection)!\n") + } + if info, err := getTCPInfo(tcpConn); err != nil { + fmt.Printf("OOPS: Could not get the TCP info for the connection: %v!\n", err) + } else { + es.MaxPathMtu = utilities.Max(es.MaxPathMtu, uint64(info.Pmtu)) + // https://lkml.iu.edu/hypermail/linux/kernel/1705.0/01790.html + es.TotalRetransmissions += uint64(info.Total_retrans) + es.total_rtt += float64(info.Rtt) + es.rtt_measurements += 1 + es.AverageRtt = es.total_rtt / float64(es.rtt_measurements) + + } +} + +func (es *ExtendedStats) Repr() string { + return fmt.Sprintf(`Extended Statistics: + Maximum Path MTU: %v + Total Retransmissions: %v + Average RTT: %v +`, es.MaxPathMtu, es.TotalRetransmissions, es.AverageRtt) +} + +func getTCPInfo(connection net.Conn) (*unix.TCPInfo, error) { + tcpConn, ok := connection.(*net.TCPConn) + if !ok { + return nil, fmt.Errorf("connection is not a net.TCPConn") + } + rawConn, err := tcpConn.SyscallConn() + if err != nil { + return nil, err + } + + var info *unix.TCPInfo = nil + rawConn.Control(func(fd uintptr) { + info, err = unix.GetsockoptTCPInfo(int(fd), unix.SOL_TCP, unix.TCP_INFO) + }) + return info, err +} diff --git a/networkQuality.go b/networkQuality.go index d9bdc94..217ee6a 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -19,9 +19,6 @@ import ( "crypto/tls" "flag" "fmt" - _ "io" - _ "log" - "net" "net/http" "os" "runtime/pprof" @@ -31,6 +28,7 @@ import ( "github.com/network-quality/goresponsiveness/config" "github.com/network-quality/goresponsiveness/constants" "github.com/network-quality/goresponsiveness/debug" + "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" "github.com/network-quality/goresponsiveness/rpm" "github.com/network-quality/goresponsiveness/timeoutat" @@ -80,6 +78,12 @@ var ( "", "Enable client runtime profiling and specify storage location. Disabled by default.", ) + + calculateExtendedStats = flag.Bool( + "extended-stats", + false, + "Enable the collection and display of extended statistics -- may not be available on certain platforms.", + ) ) func main() { @@ -99,6 +103,11 @@ func main() { debugLevel = debug.Debug } + if *calculateExtendedStats && !extendedstats.ExtendedStatsAvailable() { + *calculateExtendedStats = false + fmt.Printf("Warning: Calculation of extended statics was requested but they are not supported on this platform.\n") + } + if err := config.Get(configHostPort, *configPath); err != nil { fmt.Fprintf(os.Stderr, "%s\n", err) return @@ -295,6 +304,18 @@ func main() { totalMeasurements := uint64(0) totalMeasurementTimes := float64(0) measurementTimeout := false + extendedStats := extendedstats.ExtendedStats{} + + for i := 0; i < len(downloadSaturation.LGCs); i++ { + // Assume that extended statistics are available -- the check was done explicitly at + // program startup if the calculateExtendedStats flag was set by the user on the command line. + if *calculateExtendedStats { + if !extendedstats.ExtendedStatsAvailable() { + panic("Extended stats are not available but the user requested their calculation.") + } + extendedStats.IncorporateConnectionStats(downloadSaturation.LGCs[i].Stats().ConnInfo.Conn) + } + } for i := 0; i < constants.MeasurementProbeCount && !measurementTimeout; i++ { if len(downloadSaturation.LGCs) == 0 { @@ -324,29 +345,6 @@ func main() { continue } - if *debugCliFlag { - // Note: This code is just an example of how to use utilities.GetTCPInfo. - rawConn := downloadSaturation.LGCs[randomLGCsIndex].Stats().ConnInfo.Conn - tlsConn, ok := rawConn.(*tls.Conn) - if !ok { - fmt.Printf("OOPS: Could not get the TCP info for the connection (not a TLS connection)!\n") - } - tcpConn, ok := tlsConn.NetConn().(*net.TCPConn) - if !ok { - fmt.Printf("OOPS: Could not get the TCP info for the connection (not a TCP connection)!\n") - } - if info, err := utilities.GetTCPInfo(tcpConn); err != nil { - switch err.(type) { - case *utilities.NotImplemented: - fmt.Printf("GetTCPInfo not implemented on this platform.\n") - default: - fmt.Printf("OOPS: Could not get the TCP info for the connection: %v!\n", err) - } - } else { - utilities.PrintTCPInfo(info) - } - } - unsaturatedMeasurementTransport := http2.Transport{} unsaturatedMeasurementTransport.TLSClientConfig = &tls.Config{} if sslKeyFileConcurrentWriter != nil { @@ -428,6 +426,10 @@ func main() { fmt.Printf("Error occurred calculating RPM -- no probe measurements received.\n") } + if *calculateExtendedStats { + fmt.Printf(extendedStats.Repr()) + } + cancelOperatingCtx() if *debugCliFlag { fmt.Printf("In debugging mode, we will cool down.\n") diff --git a/utilities/tcpinfo_other.go b/utilities/tcpinfo_other.go deleted file mode 100644 index 8dd070b..0000000 --- a/utilities/tcpinfo_other.go +++ /dev/null @@ -1,18 +0,0 @@ -//go:build !darwin && !dragonfly && !freebsd && !linux && !netbsd && !openbsd -// +build !darwin,!dragonfly,!freebsd,!linux,!netbsd,!openbsd - -package utilities - -import ( - "net" - - "golang.org/x/sys/unix" -) - -func GetTCPInfo(connection net.Conn) (*unix.TCPInfo, error) { - return nil, NotImplemented{Functionality: "GetTCPInfo"} -} - -func PrintTCPInfo(info *unix.TCPInfo) { - return -} diff --git a/utilities/tcpinfo_unix.go b/utilities/tcpinfo_unix.go deleted file mode 100644 index b0b5252..0000000 --- a/utilities/tcpinfo_unix.go +++ /dev/null @@ -1,62 +0,0 @@ -//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd -// +build darwin dragonfly freebsd linux netbsd openbsd - -package utilities - -import ( - "fmt" - "net" - - "golang.org/x/sys/unix" -) - -func GetTCPInfo(connection net.Conn) (*unix.TCPInfo, error) { - tcpConn, ok := connection.(*net.TCPConn) - if !ok { - return nil, fmt.Errorf("connection is not a net.TCPConn") - } - rawConn, err := tcpConn.SyscallConn() - if err != nil { - return nil, err - } - - var info *unix.TCPInfo = nil - rawConn.Control(func(fd uintptr) { - info, err = unix.GetsockoptTCPInfo(int(fd), unix.SOL_TCP, unix.TCP_INFO) - }) - return info, err -} - -func PrintTCPInfo(info *unix.TCPInfo) { - fmt.Printf("TCPInfo: \n") - fmt.Printf(" State: %v\n", info.State) - fmt.Printf(" Ca_state: %v\n", info.Ca_state) - fmt.Printf(" Retransmits: %v\n", info.Retransmits) - fmt.Printf(" Probes: %v\n", info.Probes) - fmt.Printf(" Backoff: %v\n", info.Backoff) - fmt.Printf(" Options: %v\n", info.Options) - fmt.Printf(" Rto: %v\n", info.Rto) - fmt.Printf(" Ato: %v\n", info.Ato) - fmt.Printf(" Snd_mss: %v\n", info.Snd_mss) - fmt.Printf(" Rcv_mss: %v\n", info.Rcv_mss) - fmt.Printf(" Unacked: %v\n", info.Unacked) - fmt.Printf(" Sacked: %v\n", info.Sacked) - fmt.Printf(" Lost: %v\n", info.Lost) - fmt.Printf(" Retrans: %v\n", info.Retrans) - fmt.Printf(" Fackets: %v\n", info.Fackets) - fmt.Printf(" Last_data_sent: %v\n", info.Last_data_sent) - fmt.Printf(" Last_ack_sent: %v\n", info.Last_ack_sent) - fmt.Printf(" Last_data_recv: %v\n", info.Last_data_recv) - fmt.Printf(" Last_ack_recv: %v\n", info.Last_ack_recv) - fmt.Printf(" Pmtu: %v\n", info.Pmtu) - fmt.Printf(" Rcv_ssthresh: %v\n", info.Rcv_ssthresh) - fmt.Printf(" Rtt: %v\n", info.Rtt) - fmt.Printf(" Rttvar: %v\n", info.Rttvar) - fmt.Printf(" Snd_ssthresh: %v\n", info.Snd_ssthresh) - fmt.Printf(" Snd_cwnd: %v\n", info.Snd_cwnd) - fmt.Printf(" Advmss: %v\n", info.Advmss) - fmt.Printf(" Reordering: %v\n", info.Reordering) - fmt.Printf(" Rcv_rtt: %v\n", info.Rcv_rtt) - fmt.Printf(" Rcv_space: %v\n", info.Rcv_space) - fmt.Printf(" Total_retrans: %v\n", info.Total_retrans) -} diff --git a/utilities/utilities.go b/utilities/utilities.go index 7e26ab9..76acbd2 100644 --- a/utilities/utilities.go +++ b/utilities/utilities.go @@ -123,10 +123,9 @@ func RandBetween(max int) int { return rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % max } -type NotImplemented struct { - Functionality string -} - -func (ni *NotImplemented) Error() string { - return fmt.Sprintf("%v not implemented.\n", ni.Functionality) +func Max(x, y uint64) uint64 { + if x > y { + return x + } + return y } -- cgit v1.2.3