diff options
| author | Will Hawkins <[email protected]> | 2022-07-01 01:49:08 -0400 |
|---|---|---|
| committer | Will Hawkins <[email protected]> | 2022-07-01 01:49:08 -0400 |
| commit | 4fd7d42026f85da367afaaafeefd217b983f71ca (patch) | |
| tree | 928add298c86ba9ece5552dce56adc066103e84e /networkQuality.go | |
| parent | 631a845c807c0e9aca897f231763d3dcec76de20 (diff) | |
[Feature] Support spec v2
This is a WIP for supporting v2 of the RPM spec.
Diffstat (limited to 'networkQuality.go')
| -rw-r--r-- | networkQuality.go | 214 |
1 files changed, 54 insertions, 160 deletions
diff --git a/networkQuality.go b/networkQuality.go index 93c42e0..d5c3523 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -16,10 +16,8 @@ package main import ( "context" - "crypto/tls" "flag" "fmt" - "net/http" "os" "runtime/pprof" "time" @@ -33,7 +31,6 @@ import ( "github.com/network-quality/goresponsiveness/rpm" "github.com/network-quality/goresponsiveness/timeoutat" "github.com/network-quality/goresponsiveness/utilities" - "golang.org/x/net/http2" ) var ( @@ -98,7 +95,7 @@ func main() { timeoutAbsoluteTime := time.Now().Add(timeoutDuration) configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort) operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background()) - saturationCtx, cancelSaturationCtx := context.WithCancel( + lgDataCollectionCtx, cancelLGDataCollectionCtx := context.WithCancel( context.Background(), ) config := &config.Config{} @@ -187,84 +184,90 @@ func main() { * Create (and then, ironically, name) two anonymous functions that, when invoked, * will create load-generating connections for upload/download/ */ - generate_lbd := func() lgc.LoadGeneratingConnection { + generate_lgd := func() lgc.LoadGeneratingConnection { return &lgc.LoadGeneratingConnectionDownload{ Path: config.Urls.LargeUrl, KeyLogger: sslKeyFileConcurrentWriter, } } - generate_lbu := func() lgc.LoadGeneratingConnection { + generate_lgu := func() lgc.LoadGeneratingConnection { return &lgc.LoadGeneratingConnectionUpload{ Path: config.Urls.UploadUrl, KeyLogger: sslKeyFileConcurrentWriter, } } + generate_lg_probe_configuration := func() rpm.ProbeConfiguration { + return rpm.ProbeConfiguration{URL: config.Urls.SmallUrl} + } + var downloadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "download") var uploadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "upload") - downloadSaturationChannel := rpm.Saturate( - saturationCtx, + downloadDataCollectionChannel := rpm.LGCollectData( + lgDataCollectionCtx, operatingCtx, - generate_lbd, + generate_lgd, + generate_lg_probe_configuration, downloadDebugging, ) - uploadSaturationChannel := rpm.Saturate( - saturationCtx, + uploadDataCollectionChannel := rpm.LGCollectData( + lgDataCollectionCtx, operatingCtx, - generate_lbu, + generate_lgu, + generate_lg_probe_configuration, uploadDebugging, ) - saturationTimeout := false - uploadSaturated := false - downloadSaturated := false - downloadSaturation := rpm.SaturationResult{} - uploadSaturation := rpm.SaturationResult{} + dataCollectionTimeout := false + uploadDataCollectionComplete := false + downloadDataCollectionComple := false + downloadDataCollectionResult := rpm.LGDataCollectionResult{} + uploadDataCollectionResult := rpm.LGDataCollectionResult{} - for !(uploadSaturated && downloadSaturated) { + for !(uploadDataCollectionComplete && downloadDataCollectionComple) { select { - case downloadSaturation = <-downloadSaturationChannel: + case downloadDataCollectionResult = <-downloadDataCollectionChannel: { - downloadSaturated = true + downloadDataCollectionComple = true if *debugCliFlag { fmt.Printf( - "################# download is %s saturated (%fMBps, %d flows)!\n", + "################# download load-generating data collection is %s complete (%fMBps, %d flows)!\n", utilities.Conditional( - saturationTimeout, + dataCollectionTimeout, "(provisionally)", "", ), - utilities.ToMBps(downloadSaturation.RateBps), - len(downloadSaturation.LGCs), + utilities.ToMBps(downloadDataCollectionResult.RateBps), + len(downloadDataCollectionResult.LGCs), ) } } - case uploadSaturation = <-uploadSaturationChannel: + case uploadDataCollectionResult = <-uploadDataCollectionChannel: { - uploadSaturated = true + uploadDataCollectionComplete = true if *debugCliFlag { fmt.Printf( - "################# upload is %s saturated (%fMBps, %d flows)!\n", + "################# upload load-generating data collection is %s complete (%fMBps, %d flows)!\n", utilities.Conditional( - saturationTimeout, + dataCollectionTimeout, "(provisionally)", "", ), - utilities.ToMBps(uploadSaturation.RateBps), - len(uploadSaturation.LGCs), + utilities.ToMBps(uploadDataCollectionResult.RateBps), + len(uploadDataCollectionResult.LGCs), ) } } case <-timeoutChannel: { - if saturationTimeout { - // We already timedout on saturation. This signal means that - // we are timedout on getting the provisional saturation. We + if dataCollectionTimeout { + // We already timedout on data collection. This signal means that + // we are timedout on getting the provisional data collection. We // will exit! fmt.Fprint( os.Stderr, - "Error: Saturation could not be completed in time and no provisional rates could be assessed. Test failed.\n", + "Error: Load-Generating data collection could not be completed in time and no provisional data could be gathered. Test failed.\n", ) cancelOperatingCtx() if *debugCliFlag { @@ -272,13 +275,13 @@ func main() { } return } - saturationTimeout = true + dataCollectionTimeout = true - // We timed out attempting to saturate the link. So, we will - // shut down all the saturation xfers - cancelSaturationCtx() + // We timed out attempting to collect data about the link. So, we will + // shut down all the collection xfers + cancelLGDataCollectionCtx() // and then we will give ourselves some additional time in order - // to calculate a provisional saturation. + // to complete provisional data collection. timeoutAbsoluteTime = time.Now(). Add(time.Second * time.Duration(*rpmtimeout)) timeoutChannel = timeoutat.TimeoutAt( @@ -288,39 +291,27 @@ func main() { ) if *debugCliFlag { fmt.Printf( - "################# timeout reaching saturation!\n", + "################# timeout collecting load-generating data!\n", ) } } } } - // Give ourselves no more than 15 seconds to complete the RPM calculation. - // This is conditional because (above) we may have already added the time. - // We did it up there so that we could also limit the amount of time waiting - // for a conditional saturation calculation. - if !saturationTimeout { - timeoutAbsoluteTime = time.Now().Add(time.Second * time.Duration(*rpmtimeout)) - timeoutChannel = timeoutat.TimeoutAt( - operatingCtx, - timeoutAbsoluteTime, - debugLevel, - ) - } + // In the new version we are no longer going to wait to send probes until after + // saturation. When we get here we are now only going to compute the results + // and/or extended statistics! - totalMeasurements := uint64(0) - totalMeasurementTimes := float64(0) - measurementTimeout := false extendedStats := extendedstats.ExtendedStats{} - for i := 0; i < len(downloadSaturation.LGCs); i++ { + for i := 0; i < len(downloadDataCollectionResult.LGCs); i++ { // Assume that extended statistics are available -- the check was done explicitly at // program startup if the calculateExtendedStats flag was set by the user on the command line. if *calculateExtendedStats { if !extendedstats.ExtendedStatsAvailable() { panic("Extended stats are not available but the user requested their calculation.") } - if err := extendedStats.IncorporateConnectionStats(downloadSaturation.LGCs[i].Stats().ConnInfo.Conn); err != nil { + if err := extendedStats.IncorporateConnectionStats(downloadDataCollectionResult.LGCs[i].Stats().ConnInfo.Conn); err != nil { fmt.Fprintf( os.Stderr, "Warning: Could not add extended stats for the connection: %v", @@ -329,116 +320,19 @@ func main() { } } } - - for i := 0; i < constants.MeasurementProbeCount && !measurementTimeout; i++ { - if len(downloadSaturation.LGCs) == 0 { - continue - } - randomLGCsIndex := utilities.RandBetween(len(downloadSaturation.LGCs)) - if !downloadSaturation.LGCs[randomLGCsIndex].IsValid() { - if *debugCliFlag { - fmt.Printf( - "%v: The randomly selected saturated connection (with id %d) was invalid. Skipping.\n", - debugCliFlag, - downloadSaturation.LGCs[randomLGCsIndex].ClientId(), - ) - } - - // Protect against pathological cases where we continuously select - // invalid connections and never - // do the select below - if time.Since(timeoutAbsoluteTime) > 0 { - if *debugCliFlag { - fmt.Printf( - "Pathologically could not find valid saturated connections use for measurement.\n", - ) - } - break - } - continue - } - - unsaturatedMeasurementTransport := http2.Transport{} - unsaturatedMeasurementTransport.TLSClientConfig = &tls.Config{} - if sslKeyFileConcurrentWriter != nil { - unsaturatedMeasurementTransport.TLSClientConfig.KeyLogWriter = sslKeyFileConcurrentWriter - } - unsaturatedMeasurementTransport.TLSClientConfig.InsecureSkipVerify = true - newClient := http.Client{Transport: &unsaturatedMeasurementTransport} - - unsaturatedMeasurementProbe := rpm.NewProbe(&newClient, debugLevel) - - saturatedMeasurementProbe := rpm.NewProbe( - downloadSaturation.LGCs[randomLGCsIndex].Client(), - debugLevel, - ) - - select { - case <-timeoutChannel: - { - measurementTimeout = true - } - case sequentialMeasurementTimes := <-rpm.CalculateProbeMeasurements(operatingCtx, *strictFlag, saturatedMeasurementProbe, unsaturatedMeasurementProbe, config.Urls.SmallUrl, debugLevel): - { - if sequentialMeasurementTimes.Err != nil { - fmt.Printf( - "Failed to calculate a time for sequential measurements: %v\n", - sequentialMeasurementTimes.Err, - ) - continue - } - - if debug.IsDebug(debugLevel) { - fmt.Printf("unsaturatedMeasurementProbe: %v\n", unsaturatedMeasurementProbe) - } - // We know that we have a good Sequential measurement. - totalMeasurements += uint64(sequentialMeasurementTimes.MeasurementCount) - totalMeasurementTimes += sequentialMeasurementTimes.Delay.Seconds() - if debug.IsDebug(debugLevel) { - fmt.Printf( - "most-recent sequential measurement time: %v; most-recent sequential measurement count: %v\n", - sequentialMeasurementTimes.Delay.Seconds(), - sequentialMeasurementTimes.MeasurementCount, - ) - } - } - } - } - fmt.Printf( "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", - utilities.ToMbps(downloadSaturation.RateBps), - utilities.ToMBps(downloadSaturation.RateBps), - len(downloadSaturation.LGCs), + utilities.ToMbps(downloadDataCollectionResult.RateBps), + utilities.ToMBps(downloadDataCollectionResult.RateBps), + len(downloadDataCollectionResult.LGCs), ) fmt.Printf( "Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", - utilities.ToMbps(uploadSaturation.RateBps), - utilities.ToMBps(uploadSaturation.RateBps), - len(uploadSaturation.LGCs), + utilities.ToMbps(uploadDataCollectionResult.RateBps), + utilities.ToMBps(uploadDataCollectionResult.RateBps), + len(uploadDataCollectionResult.LGCs), ) - if totalMeasurements != 0 { - // "... it sums the five time values for each probe, and divides by the - // total - // number of probes to compute an average probe duration. The - // reciprocal of this, normalized to 60 seconds, gives the Round-trips - // Per Minute (RPM)." - // "average probe duration" = totalMeasurementTimes / totalMeasurements. - // The reciprocol of this = 1 / (totalMeasurementTimes / totalMeasurements) <- - // semantically the probes-per-second. - // Normalized to 60 seconds: 60 * (1 - // / ((totalMeasurementTimes / totalMeasurements)))) <- semantically the number of - // probes per minute. - rpm := float64( - time.Minute.Seconds(), - ) / (totalMeasurementTimes / (float64(totalMeasurements))) - fmt.Printf("Total measurements: %d\n", totalMeasurements) - fmt.Printf("RPM: %5.0f\n", rpm) - } else { - fmt.Printf("Error occurred calculating RPM -- no probe measurements received.\n") - } - if *calculateExtendedStats { fmt.Println(extendedStats.Repr()) } |
