diff options
Diffstat (limited to 'networkQuality.go')
| -rw-r--r-- | networkQuality.go | 924 |
1 files changed, 474 insertions, 450 deletions
diff --git a/networkQuality.go b/networkQuality.go index f97b27c..776c0e7 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -22,6 +22,7 @@ import ( "net/url" "os" "runtime/pprof" + "strings" "time" "github.com/network-quality/goresponsiveness/ccw" @@ -29,6 +30,7 @@ import ( "github.com/network-quality/goresponsiveness/constants" "github.com/network-quality/goresponsiveness/datalogger" "github.com/network-quality/goresponsiveness/debug" + "github.com/network-quality/goresponsiveness/direction" "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" "github.com/network-quality/goresponsiveness/ms" @@ -68,10 +70,46 @@ var ( "Enable debugging.", ) rpmtimeout = flag.Int( - "rpmtimeout", - constants.RPMCalculationTime, - "Maximum time to spend calculating RPM (i.e., total test time.).", + "rpm.timeout", + constants.DefaultTestTime, + "Maximum time (in seconds) to spend calculating RPM (i.e., total test time.).", ) + rpmmad = flag.Int( + "rpm.mad", + constants.SpecParameterCliOptionsDefaults.Mad, + "Moving average distance -- number of intervals considered during stability calculations.", + ) + rpmid = flag.Int( + "rpm.id", + constants.SpecParameterCliOptionsDefaults.Id, + "Duration of the interval between re-evaluating the network conditions (in seconds).", + ) + rpmtmp = flag.Uint( + "rpm.tmp", + constants.SpecParameterCliOptionsDefaults.Tmp, + "Percent of measurements to trim when calculating statistics about network conditions (between 0 and 100).", + ) + rpmsdt = flag.Float64( + "rpm.sdt", + constants.SpecParameterCliOptionsDefaults.Sdt, + "Cutoff in the standard deviation of measured values about network conditions between unstable and stable.", + ) + rpmmnp = flag.Int( + "rpm.mnp", + constants.SpecParameterCliOptionsDefaults.Mnp, + "Maximimum number of parallel connections to establish when attempting to reach working conditions.", + ) + rpmmps = flag.Int( + "rpm.mps", + constants.SpecParameterCliOptionsDefaults.Mps, + "Maximimum number of probes to send per second.", + ) + rpmptc = flag.Float64( + "rpm.ptc", + constants.SpecParameterCliOptionsDefaults.Ptc, + "Percentage of the (discovered) total network capacity that probes are allowed to consume.", + ) + sslKeyFileName = flag.String( "ssl-key-file", "", @@ -97,11 +135,6 @@ var ( "", "Store granular information about tests results in files with this basename. Time and information type will be appended (before the first .) to create separate log files. Disabled by default.", ) - probeIntervalTime = flag.Uint( - "probe-interval-time", - 100, - "Time (in ms) between probes (foreign and self).", - ) connectToAddr = flag.String( "connect-to", "", @@ -132,8 +165,26 @@ func main() { os.Exit(0) } - timeoutDuration := time.Second * time.Duration(*rpmtimeout) - timeoutAbsoluteTime := time.Now().Add(timeoutDuration) + var debugLevel debug.DebugLevel = debug.Error + + if *debugCliFlag { + debugLevel = debug.Debug + } + + specParameters, err := rpm.SpecParametersFromArguments(*rpmtimeout, *rpmmad, *rpmid, + *rpmtmp, *rpmsdt, *rpmmnp, *rpmmps, *rpmptc) + if err != nil { + fmt.Fprintf( + os.Stderr, + "Error: There was an error configuring the test with user-supplied parameters: %v\n", + err, + ) + os.Exit(1) + } + + if debug.IsDebug(debugLevel) { + fmt.Printf("Running the test according to the following spec parameters:\n%v\n", specParameters.ToString()) + } var configHostPort string @@ -158,30 +209,14 @@ func main() { // the others. operatingCtx, operatingCtxCancel := context.WithCancel(context.Background()) - // The operator contexts. These contexts control the processes that manage - // network activity but do not control network activity. - - uploadLoadGeneratorOperatorCtx, uploadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx) - downloadLoadGeneratorOperatorCtx, downloadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx) - proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx) - - // This context is used to control the network activity (i.e., it controls all - // the connections that are open to do load generation and probing). Cancelling this context will close - // all the network connections that are responsible for generating the load. - networkActivityCtx, networkActivityCtxCancel := context.WithCancel(operatingCtx) - config := &config.Config{ ConnectToAddr: *connectToAddr, } - var debugLevel debug.DebugLevel = debug.Error - - if *debugCliFlag { - debugLevel = debug.Debug - } if *calculateExtendedStats && !extendedstats.ExtendedStatsAvailable() { *calculateExtendedStats = false - fmt.Printf( + fmt.Fprintf( + os.Stderr, "Warning: Calculation of extended statistics was requested but is not supported on this platform.\n", ) } @@ -223,53 +258,14 @@ func main() { fmt.Printf("Configuration: %s\n", config) } - timeoutChannel := timeoutat.TimeoutAt( - operatingCtx, - timeoutAbsoluteTime, - debugLevel, - ) - if debug.IsDebug(debugLevel) { - fmt.Printf("Test will end no later than %v\n", timeoutAbsoluteTime) - } - - // print the banner - dt := time.Now().UTC() - fmt.Printf( - "%s UTC Go Responsiveness to %s...\n", - dt.Format("01-02-2006 15:04:05"), - configHostPort, - ) - - if len(*profile) != 0 { - f, err := os.Create(*profile) - if err != nil { - fmt.Fprintf( - os.Stderr, - "Error: Profiling requested but could not open the log file ( %s ) for writing: %v\n", - *profile, - err, - ) - os.Exit(1) - } - pprof.StartCPUProfile(f) - defer pprof.StopCPUProfile() - } - var selfProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] = nil - var foreignProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] = nil - var downloadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil - var uploadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil - var granularThroughputDataLogger datalogger.DataLogger[rpm.GranularThroughputDataPoint] = nil + downloadDirection := direction.Direction{} + uploadDirection := direction.Direction{} // User wants to log data if *dataLoggerBaseFileName != "" { var err error = nil unique := time.Now().UTC().Format("01-02-2006-15-04-05") - dataLoggerSelfFilename := utilities.FilenameAppend(*dataLoggerBaseFileName, "-self-"+unique) - dataLoggerForeignFilename := utilities.FilenameAppend( - *dataLoggerBaseFileName, - "-foreign-"+unique, - ) dataLoggerDownloadThroughputFilename := utilities.FilenameAppend( *dataLoggerBaseFileName, "-throughput-download-"+unique, @@ -278,100 +274,158 @@ func main() { *dataLoggerBaseFileName, "-throughput-upload-"+unique, ) - dataLoggerGranularThroughputFilename := utilities.FilenameAppend( + + dataLoggerDownloadGranularThroughputFilename := utilities.FilenameAppend( *dataLoggerBaseFileName, - "-throughput-granular-"+unique, + "-throughput-download-granular-"+unique, ) - selfProbeDataLogger, err = datalogger.CreateCSVDataLogger[probe.ProbeDataPoint]( + dataLoggerUploadGranularThroughputFilename := utilities.FilenameAppend( + *dataLoggerBaseFileName, + "-throughput-upload-granular-"+unique, + ) + + dataLoggerSelfFilename := utilities.FilenameAppend(*dataLoggerBaseFileName, "-self-"+unique) + dataLoggerForeignFilename := utilities.FilenameAppend( + *dataLoggerBaseFileName, + "-foreign-"+unique, + ) + + selfProbeDataLogger, err := datalogger.CreateCSVDataLogger[probe.ProbeDataPoint]( dataLoggerSelfFilename, ) if err != nil { - fmt.Printf( + fmt.Fprintf( + os.Stderr, "Warning: Could not create the file for storing self probe results (%s). Disabling functionality.\n", dataLoggerSelfFilename, ) selfProbeDataLogger = nil } + uploadDirection.SelfProbeDataLogger = selfProbeDataLogger + downloadDirection.SelfProbeDataLogger = selfProbeDataLogger - foreignProbeDataLogger, err = datalogger.CreateCSVDataLogger[probe.ProbeDataPoint]( + foreignProbeDataLogger, err := datalogger.CreateCSVDataLogger[probe.ProbeDataPoint]( dataLoggerForeignFilename, ) if err != nil { - fmt.Printf( + fmt.Fprintf( + os.Stderr, "Warning: Could not create the file for storing foreign probe results (%s). Disabling functionality.\n", dataLoggerForeignFilename, ) foreignProbeDataLogger = nil } + uploadDirection.ForeignProbeDataLogger = selfProbeDataLogger + downloadDirection.ForeignProbeDataLogger = foreignProbeDataLogger - downloadThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint]( + downloadDirection.ThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint]( dataLoggerDownloadThroughputFilename, ) if err != nil { - fmt.Printf( + fmt.Fprintf( + os.Stderr, "Warning: Could not create the file for storing download throughput results (%s). Disabling functionality.\n", dataLoggerDownloadThroughputFilename, ) - downloadThroughputDataLogger = nil + downloadDirection.ThroughputDataLogger = nil } - - uploadThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint]( + uploadDirection.ThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint]( dataLoggerUploadThroughputFilename, ) if err != nil { - fmt.Printf( + fmt.Fprintf( + os.Stderr, "Warning: Could not create the file for storing upload throughput results (%s). Disabling functionality.\n", dataLoggerUploadThroughputFilename, ) - uploadThroughputDataLogger = nil + uploadDirection.ThroughputDataLogger = nil } - granularThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.GranularThroughputDataPoint]( - dataLoggerGranularThroughputFilename, + downloadDirection.GranularThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.GranularThroughputDataPoint]( + dataLoggerDownloadGranularThroughputFilename, ) if err != nil { - fmt.Printf( - "Warning: Could not create the file for storing granular throughput results (%s). Disabling functionality.\n", - dataLoggerGranularThroughputFilename, + fmt.Fprintf( + os.Stderr, + "Warning: Could not create the file for storing download granular throughput results (%s). Disabling functionality.\n", + dataLoggerDownloadGranularThroughputFilename, + ) + downloadDirection.GranularThroughputDataLogger = nil + } + uploadDirection.GranularThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.GranularThroughputDataPoint]( + dataLoggerUploadGranularThroughputFilename, + ) + if err != nil { + fmt.Fprintf( + os.Stderr, + "Warning: Could not create the file for storing upload granular throughput results (%s). Disabling functionality.\n", + dataLoggerUploadGranularThroughputFilename, ) - granularThroughputDataLogger = nil + uploadDirection.GranularThroughputDataLogger = nil } + } // If, for some reason, the data loggers are nil, make them Null Data Loggers so that we don't have conditional // code later. - if selfProbeDataLogger == nil { - selfProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]() + if downloadDirection.SelfProbeDataLogger == nil { + downloadDirection.SelfProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]() + } + if uploadDirection.SelfProbeDataLogger == nil { + uploadDirection.SelfProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]() } - if foreignProbeDataLogger == nil { - foreignProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]() + + if downloadDirection.ForeignProbeDataLogger == nil { + downloadDirection.ForeignProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]() } - if downloadThroughputDataLogger == nil { - downloadThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]() + if uploadDirection.ForeignProbeDataLogger == nil { + uploadDirection.ForeignProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]() } - if uploadThroughputDataLogger == nil { - uploadThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]() + + if downloadDirection.ThroughputDataLogger == nil { + downloadDirection.ThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]() + } + if uploadDirection.ThroughputDataLogger == nil { + uploadDirection.ThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]() + } + + if downloadDirection.GranularThroughputDataLogger == nil { + downloadDirection.GranularThroughputDataLogger = + datalogger.CreateNullDataLogger[rpm.GranularThroughputDataPoint]() } - if granularThroughputDataLogger == nil { - granularThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.GranularThroughputDataPoint]() + if uploadDirection.GranularThroughputDataLogger == nil { + uploadDirection.GranularThroughputDataLogger = + datalogger.CreateNullDataLogger[rpm.GranularThroughputDataPoint]() } /* * Create (and then, ironically, name) two anonymous functions that, when invoked, * will create load-generating connections for upload/download */ - generateLgdc := func() lgc.LoadGeneratingConnection { + downloadDirection.CreateLgdc = func() lgc.LoadGeneratingConnection { lgd := lgc.NewLoadGeneratingConnectionDownload(config.Urls.LargeUrl, sslKeyFileConcurrentWriter, config.ConnectToAddr, *insecureSkipVerify) return &lgd } - - generateLguc := func() lgc.LoadGeneratingConnection { + uploadDirection.CreateLgdc = func() lgc.LoadGeneratingConnection { lgu := lgc.NewLoadGeneratingConnectionUpload(config.Urls.UploadUrl, sslKeyFileConcurrentWriter, config.ConnectToAddr, *insecureSkipVerify) return &lgu } + downloadDirection.DirectionDebugging = debug.NewDebugWithPrefix(debugLevel, "download") + downloadDirection.ProbeDebugging = debug.NewDebugWithPrefix(debugLevel, "download probe") + + uploadDirection.DirectionDebugging = debug.NewDebugWithPrefix(debugLevel, "upload") + uploadDirection.ProbeDebugging = debug.NewDebugWithPrefix(debugLevel, "upload probe") + + downloadDirection.Lgcc = lgc.NewLoadGeneratingConnectionCollection() + uploadDirection.Lgcc = lgc.NewLoadGeneratingConnectionCollection() + + // We do not do tracing on upload connections so there are no extended stats for those connections! + uploadDirection.ExtendedStatsEligible = false + downloadDirection.ExtendedStatsEligible = true + generateSelfProbeConfiguration := func() probe.ProbeConfiguration { return probe.ProbeConfiguration{ URL: config.Urls.SmallUrl, @@ -388,421 +442,391 @@ func main() { } } - var downloadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "download") - var uploadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "upload") - var combinedProbeDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "combined probe") - - downloadLoadGeneratingConnectionCollection := lgc.NewLoadGeneratingConnectionCollection() - uploadLoadGeneratingConnectionCollection := lgc.NewLoadGeneratingConnectionCollection() + downloadDirection.DirectionLabel = "Download" + uploadDirection.DirectionLabel = "Upload" - // TODO: Separate contexts for load generation and data collection. If we do that, if either of the two - // data collection go routines stops well before the other, they will continue to send probes and we can - // generate additional information! + directions := []*direction.Direction{&downloadDirection, &uploadDirection} - selfDownProbeConnectionCommunicationChannel, downloadThroughputChannel := rpm.LoadGenerator( - networkActivityCtx, - downloadLoadGeneratorOperatorCtx, - time.Second, - generateLgdc, - &downloadLoadGeneratingConnectionCollection, - *calculateExtendedStats, - downloadDebugging, - ) - selfUpProbeConnectionCommunicationChannel, uploadThroughputChannel := rpm.LoadGenerator( - networkActivityCtx, - uploadLoadGeneratorOperatorCtx, - time.Second, - generateLguc, - &uploadLoadGeneratingConnectionCollection, - *calculateExtendedStats, - uploadDebugging, + // print the banner + dt := time.Now().UTC() + fmt.Printf( + "%s UTC Go Responsiveness to %s...\n", + dt.Format("01-02-2006 15:04:05"), + configHostPort, ) - // Handles for the first connection that the load-generating go routines (both up and - // download) open are passed back on the self[Down|Up]ProbeConnectionCommunicationChannel - // so that we can then start probes on those connections. - selfDownProbeConnection := <-selfDownProbeConnectionCommunicationChannel - selfUpProbeConnection := <-selfUpProbeConnectionCommunicationChannel + if len(*profile) != 0 { + f, err := os.Create(*profile) + if err != nil { + fmt.Fprintf( + os.Stderr, + "Error: Profiling requested but could not open the log file ( %s ) for writing: %v\n", + *profile, + err, + ) + os.Exit(1) + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } + + // All tests will accumulate data to these series because it will all matter for RPM calculation! + selfRtts := ms.NewInfiniteMathematicalSeries[float64]() + foreignRtts := ms.NewInfiniteMathematicalSeries[float64]() - // The combined prober will handle launching, monitoring, etc of *both* the self and foreign - // probes. - probeDataPointsChannel := rpm.CombinedProber( - proberOperatorCtx, - networkActivityCtx, - generateForeignProbeConfiguration, - generateSelfProbeConfiguration, - selfDownProbeConnection, - selfUpProbeConnection, - time.Millisecond*(time.Duration(*probeIntervalTime)), - sslKeyFileConcurrentWriter, - *calculateExtendedStats, - combinedProbeDebugging, - ) + var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil + if *printQualityAttenuation { + selfRttsQualityAttenuation = qualityattenuation.NewSimpleQualityAttenuation() + } - responsivenessIsStable := false - downloadThroughputIsStable := false - uploadThroughputIsStable := false + for _, direction := range directions { - // Test parameters: - // 1. I: The number of previous instantaneous measurements to consider when generating - // the so-called instantaneous moving averages. - // 2. K: The number of instantaneous moving averages to consider when determining stability. - // 3: S: The standard deviation cutoff used to determine stability among the K preceding - // moving averages of a measurement. - // See + timeoutDuration := specParameters.TestTimeout + timeoutAbsoluteTime := time.Now().Add(timeoutDuration) - throughputI := constants.InstantaneousThroughputMeasurementCount - probeI := constants.InstantaneousProbeMeasurementCount - K := constants.InstantaneousMovingAverageStabilityCount - S := constants.StabilityStandardDeviation + timeoutChannel := timeoutat.TimeoutAt( + operatingCtx, + timeoutAbsoluteTime, + debugLevel, + ) + if debug.IsDebug(debugLevel) { + fmt.Printf("%s Test will end no later than %v\n", direction.DirectionLabel, timeoutAbsoluteTime) + } - downloadThroughputStabilizerDebugConfig := - debug.NewDebugWithPrefix(debug.Debug, "Download Throughput Stabilizer") - downloadThroughputStabilizerDebugLevel := debug.Error - if *debugCliFlag { - downloadThroughputStabilizerDebugLevel = debug.Debug - } - downloadThroughputStabilizer := stabilizer.NewThroughputStabilizer(throughputI, K, S, - downloadThroughputStabilizerDebugLevel, downloadThroughputStabilizerDebugConfig) + throughputCtx, throughputCtxCancel := context.WithCancel(operatingCtx) + proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx) - uploadThroughputStabilizerDebugConfig := - debug.NewDebugWithPrefix(debug.Debug, "Upload Throughput Stabilizer") - uploadThroughputStabilizerDebugLevel := debug.Error - if *debugCliFlag { - uploadThroughputStabilizerDebugLevel = debug.Debug - } - uploadThroughputStabilizer := stabilizer.NewThroughputStabilizer(throughputI, K, S, - uploadThroughputStabilizerDebugLevel, uploadThroughputStabilizerDebugConfig) + // This context is used to control the network activity (i.e., it controls all + // the connections that are open to do load generation and probing). Cancelling this context will close + // all the network connections that are responsible for generating the load. + networkActivityCtx, networkActivityCtxCancel := context.WithCancel(operatingCtx) - probeStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, "Probe Stabilizer") - probeStabilizerDebugLevel := debug.Error - if *debugCliFlag { - probeStabilizerDebugLevel = debug.Debug - } - probeStabilizer := stabilizer.NewProbeStabilizer(probeI, K, S, probeStabilizerDebugLevel, probeStabilizerDebugConfig) + throughputGeneratorCtx, throughputGeneratorCtxCancel := context.WithCancel(throughputCtx) - selfRtts := ms.NewInfiniteMathematicalSeries[float64]() - selfRttsQualityAttenuation := qualityattenuation.NewSimpleQualityAttenuation() - foreignRtts := ms.NewInfiniteMathematicalSeries[float64]() + lgStabilizationCommunicationChannel := rpm.LoadGenerator( + throughputCtx, + networkActivityCtx, + throughputGeneratorCtx, + specParameters.EvalInterval, + direction.CreateLgdc, + &direction.Lgcc, + specParameters.MaxParallelConns, + *calculateExtendedStats, + direction.DirectionDebugging, + ) - // For later debugging output, record the last throughputs on load-generating connectings - // and the number of open connections. - lastUploadThroughputRate := float64(0) - lastUploadThroughputOpenConnectionCount := int(0) - lastDownloadThroughputRate := float64(0) - lastDownloadThroughputOpenConnectionCount := int(0) + throughputStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, + fmt.Sprintf("%v Throughput Stabilizer", direction.DirectionLabel)) + downloadThroughputStabilizerDebugLevel := debug.Error + if *debugCliFlag { + downloadThroughputStabilizerDebugLevel = debug.Debug + } + throughputStabilizer := stabilizer.NewStabilizer[float64]( + uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance, 0, "bytes", + downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig) - // Every time that there is a new measurement, the possibility exists that the measurements become unstable. - // This allows us to continue pushing until *everything* is stable at the same time. -timeout: - for !(responsivenessIsStable && downloadThroughputIsStable && uploadThroughputIsStable) { - select { + responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, + fmt.Sprintf("%v Responsiveness Stabilizer", direction.DirectionLabel)) + responsivenessStabilizerDebugLevel := debug.Error + if *debugCliFlag { + responsivenessStabilizerDebugLevel = debug.Debug + } + responsivenessStabilizer := stabilizer.NewStabilizer[int64]( + uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance, + specParameters.TrimmedMeanPct, "milliseconds", + responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig) - case downloadThroughputMeasurement := <-downloadThroughputChannel: - { - downloadThroughputStabilizer.AddMeasurement(downloadThroughputMeasurement) - downloadThroughputIsStable = downloadThroughputStabilizer.IsStable() - if *debugCliFlag { - fmt.Printf( - "################# Download is instantaneously %s.\n", - utilities.Conditional(downloadThroughputIsStable, "stable", "unstable")) - } - downloadThroughputDataLogger.LogRecord(downloadThroughputMeasurement) - for i := range downloadThroughputMeasurement.GranularThroughputDataPoints { - datapoint := downloadThroughputMeasurement.GranularThroughputDataPoints[i] - datapoint.Direction = "Download" - granularThroughputDataLogger.LogRecord(datapoint) - } + // For later debugging output, record the last throughputs on load-generating connectings + // and the number of open connections. + lastThroughputRate := float64(0) + lastThroughputOpenConnectionCount := int(0) - lastDownloadThroughputRate = downloadThroughputMeasurement.Throughput - lastDownloadThroughputOpenConnectionCount = - downloadThroughputMeasurement.Connections - } + lg_timeout: + for !direction.StableThroughput { + select { + case throughputMeasurement := <-lgStabilizationCommunicationChannel: + { + throughputStabilizer.AddMeasurement( + throughputMeasurement.Throughput) + direction.StableThroughput = throughputStabilizer.IsStable() + if *debugCliFlag { + fmt.Printf( + "################# %v is instantaneously %s.\n", direction.DirectionLabel, + utilities.Conditional(direction.StableThroughput, "stable", "unstable")) + } + direction.ThroughputDataLogger.LogRecord(throughputMeasurement) + for i := range throughputMeasurement.GranularThroughputDataPoints { + datapoint := throughputMeasurement.GranularThroughputDataPoints[i] + datapoint.Direction = "Download" + direction.GranularThroughputDataLogger.LogRecord(datapoint) + } + + lastThroughputRate = throughputMeasurement.Throughput + lastThroughputOpenConnectionCount = throughputMeasurement.Connections - case uploadThroughputMeasurement := <-uploadThroughputChannel: - { - uploadThroughputStabilizer.AddMeasurement(uploadThroughputMeasurement) - uploadThroughputIsStable = uploadThroughputStabilizer.IsStable() - if *debugCliFlag { - fmt.Printf( - "################# Upload is instantaneously %s.\n", - utilities.Conditional(uploadThroughputIsStable, "stable", "unstable")) + if direction.StableThroughput { + throughputGeneratorCtxCancel() + } } - uploadThroughputDataLogger.LogRecord(uploadThroughputMeasurement) - for i := range uploadThroughputMeasurement.GranularThroughputDataPoints { - datapoint := uploadThroughputMeasurement.GranularThroughputDataPoints[i] - datapoint.Direction = "Upload" - granularThroughputDataLogger.LogRecord(datapoint) + case <-timeoutChannel: + { + break lg_timeout } + } + } - lastUploadThroughputRate = uploadThroughputMeasurement.Throughput - lastUploadThroughputOpenConnectionCount = uploadThroughputMeasurement.Connections + if direction.StableThroughput { + if *debugCliFlag { + fmt.Printf("################# Throughput is stable; beginning responsiveness testing.\n") } - case probeMeasurement := <-probeDataPointsChannel: - { - probeStabilizer.AddMeasurement(probeMeasurement) + } else { + fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Adding 15 seconds to calculate speculative RPM results.\n") + speculativeTimeoutDuration := time.Second * 15 + speculativeAbsoluteTimeoutTime := time.Now().Add(speculativeTimeoutDuration) + timeoutChannel = timeoutat.TimeoutAt( + operatingCtx, + speculativeAbsoluteTimeoutTime, + debugLevel, + ) + } - // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy - // is *actually* important, but it can't hurt? - responsivenessIsStable = probeStabilizer.IsStable() + perDirectionSelfRtts := ms.NewInfiniteMathematicalSeries[float64]() + perDirectionForeignRtts := ms.NewInfiniteMathematicalSeries[float64]() - if *debugCliFlag { - fmt.Printf( - "################# Responsiveness is instantaneously %s.\n", - utilities.Conditional(responsivenessIsStable, "stable", "unstable")) - } - if probeMeasurement.Type == probe.Foreign { + responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber( + proberOperatorCtx, + networkActivityCtx, + generateForeignProbeConfiguration, + generateSelfProbeConfiguration, + &direction.Lgcc, + direction.CreateLgdc().Direction(), // TODO: This could be better! + specParameters.ProbeInterval, + sslKeyFileConcurrentWriter, + *calculateExtendedStats, + direction.ProbeDebugging, + ) + + responsiveness_timeout: + for !direction.StableResponsiveness { + select { + case probeMeasurement := <-responsivenessStabilizationCommunicationChannel: + { + foreignDataPoint := probeMeasurement.First + selfDataPoint := probeMeasurement.Second + + responsivenessStabilizer.AddMeasurement( + (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds()) + + // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy + // is *actually* important, but it can't hurt? + direction.StableResponsiveness = responsivenessStabilizer.IsStable() + + if *debugCliFlag { + fmt.Printf( + "################# Responsiveness is instantaneously %s.\n", + utilities.Conditional(direction.StableResponsiveness, "stable", "unstable")) + } // There may be more than one round trip accumulated together. If that is the case, // we will blow them apart in to three separate measurements and each one will just // be 1 / measurement.RoundTripCount of the total length. - for range utilities.Iota(0, int(probeMeasurement.RoundTripCount)) { - foreignRtts.AddElement(probeMeasurement.Duration.Seconds() / - float64(probeMeasurement.RoundTripCount)) + for range utilities.Iota(0, int(foreignDataPoint.RoundTripCount)) { + foreignRtts.AddElement(foreignDataPoint.Duration.Seconds() / + float64(foreignDataPoint.RoundTripCount)) + perDirectionForeignRtts.AddElement(foreignDataPoint.Duration.Seconds() / + float64(foreignDataPoint.RoundTripCount)) } - } else if probeMeasurement.Type == probe.SelfDown || probeMeasurement.Type == probe.SelfUp { - selfRtts.AddElement(probeMeasurement.Duration.Seconds()) - if *printQualityAttenuation { - selfRttsQualityAttenuation.AddSample(probeMeasurement.Duration.Seconds()) + selfRtts.AddElement(selfDataPoint.Duration.Seconds()) + perDirectionSelfRtts.AddElement(selfDataPoint.Duration.Seconds()) + + if selfRttsQualityAttenuation != nil { + selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds()) } + + direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint) + direction.SelfProbeDataLogger.LogRecord(*selfDataPoint) } + case throughputMeasurement := <-lgStabilizationCommunicationChannel: + { + if *debugCliFlag { + fmt.Printf("Adding a throughput measurement.\n") + } + // There may be more than one round trip accumulated together. If that is the case, + direction.ThroughputDataLogger.LogRecord(throughputMeasurement) + for i := range throughputMeasurement.GranularThroughputDataPoints { + datapoint := throughputMeasurement.GranularThroughputDataPoints[i] + datapoint.Direction = direction.DirectionLabel + direction.GranularThroughputDataLogger.LogRecord(datapoint) + } + + lastThroughputRate = throughputMeasurement.Throughput + lastThroughputOpenConnectionCount = throughputMeasurement.Connections - if probeMeasurement.Type == probe.Foreign { - foreignProbeDataLogger.LogRecord(probeMeasurement) - } else if probeMeasurement.Type == probe.SelfDown || probeMeasurement.Type == probe.SelfUp { - selfProbeDataLogger.LogRecord(probeMeasurement) } - } - case <-timeoutChannel: - { - break timeout + case <-timeoutChannel: + { + break responsiveness_timeout + } } } - } - - // TODO: Reset timeout to RPM timeout stat? - // Did the test run to stability? - testRanToStability := (downloadThroughputIsStable && uploadThroughputIsStable && responsivenessIsStable) + // Did the test run to stability? + testRanToStability := direction.StableThroughput && direction.StableResponsiveness - if *debugCliFlag { - fmt.Printf("Stopping all the load generating data generators (stability: %s).\n", - utilities.Conditional(testRanToStability, "success", "failure")) - } - - /* At this point there are - 1. Load generators running - -- uploadLoadGeneratorOperatorCtx - -- downloadLoadGeneratorOperatorCtx - 2. Network connections opened by those load generators: - -- lgNetworkActivityCtx - 3. Probes - -- proberCtx - */ + if *debugCliFlag { + fmt.Printf("Stopping all the load generating data generators (stability: %s).\n", + utilities.Conditional(testRanToStability, "success", "failure")) + } - // First, stop the load generator and the probe operators (but *not* the network activity) - proberOperatorCtxCancel() - downloadLoadGeneratorOperatorCtxCancel() - uploadLoadGeneratorOperatorCtxCancel() + /* At this point there are + 1. Load generators running + -- uploadLoadGeneratorOperatorCtx + -- downloadLoadGeneratorOperatorCtx + 2. Network connections opened by those load generators: + -- lgNetworkActivityCtx + 3. Probes + -- proberCtx + */ - // Second, calculate the extended stats (if the user requested) + // First, stop the load generator and the probe operators (but *not* the network activity) + proberOperatorCtxCancel() + throughputCtxCancel() - extendedStats := extendedstats.AggregateExtendedStats{} - if *calculateExtendedStats { - if extendedstats.ExtendedStatsAvailable() { - func() { - // Put inside an IIFE so that we can use a defer! - downloadLoadGeneratingConnectionCollection.Lock.Lock() - defer downloadLoadGeneratingConnectionCollection.Lock.Unlock() + // Second, calculate the extended stats (if the user requested and they are available for the direction) + extendedStats := extendedstats.AggregateExtendedStats{} + if *calculateExtendedStats && direction.ExtendedStatsEligible { + if extendedstats.ExtendedStatsAvailable() { + func() { + // Put inside an IIFE so that we can use a defer! + direction.Lgcc.Lock.Lock() + defer direction.Lgcc.Lock.Unlock() - // Note: We do not trace upload connections! - for i := 0; i < downloadLoadGeneratingConnectionCollection.Len(); i++ { - // Assume that extended statistics are available -- the check was done explicitly at - // program startup if the calculateExtendedStats flag was set by the user on the command line. - currentLgc, _ := downloadLoadGeneratingConnectionCollection.Get(i) - if err := extendedStats.IncorporateConnectionStats( - (*currentLgc).Stats().ConnInfo.Conn); err != nil { + // Note: We do not trace upload connections! + downloadLgcCount, err := direction.Lgcc.Len() + if err != nil { fmt.Fprintf( os.Stderr, - "Warning: Could not add extended stats for the connection: %v\n", - err, + "Warning: Could not calculate the number of download load-generating connections; aborting extended stats preparation.\n", ) + return } - } - }() - } else { - // TODO: Should we just log here? - panic("Extended stats are not available but the user requested their calculation.") + for i := 0; i < downloadLgcCount; i++ { + // Assume that extended statistics are available -- the check was done explicitly at + // program startup if the calculateExtendedStats flag was set by the user on the command line. + currentLgc, _ := direction.Lgcc.Get(i) + if err := extendedStats.IncorporateConnectionStats( + (*currentLgc).Stats().ConnInfo.Conn); err != nil { + fmt.Fprintf( + os.Stderr, + "Warning: Could not add extended stats for the connection: %v\n", + err, + ) + } + } + }() + } else { + // TODO: Should we just log here? + panic("Extended stats are not available but the user requested their calculation.") + } } - } - // Third, stop the network connections opened by the load generators and probers. - networkActivityCtxCancel() + // Third, stop the network connections opened by the load generators and probers. + networkActivityCtxCancel() - // Finally, stop the world. - operatingCtxCancel() - - // Calculate the RPM - - // First, let's do a double-sided trim of the top/bottom 10% of our measurements. - selfRttsTotalCount := selfRtts.Len() - foreignRttsTotalCount := foreignRtts.Len() - - selfRttsTrimmed := selfRtts.DoubleSidedTrim(10) - foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(10) - - selfRttsTrimmedCount := selfRttsTrimmed.Len() - foreignRttsTrimmedCount := foreignRttsTrimmed.Len() - - // Then, let's take the mean of those ... - selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage() - foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage() + fmt.Printf( + "%v: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", + direction.DirectionLabel, + utilities.ToMbps(lastThroughputRate), + utilities.ToMBps(lastThroughputRate), + lastThroughputOpenConnectionCount, + ) - // Second, let's do the P90 calculations. - selfProbeRoundTripTimeP90 := selfRtts.Percentile(90) - foreignProbeRoundTripTimeP90 := foreignRtts.Percentile(90) + if *calculateExtendedStats { + fmt.Println(extendedStats.Repr()) + } + directionResult := rpm.CalculateRpm(perDirectionSelfRtts, perDirectionForeignRtts, specParameters.TrimmedMeanPct, 90) + if *debugCliFlag { + fmt.Printf("(%s RPM Calculation stats): %v\n", direction.DirectionLabel, directionResult.ToString()) + } - // Note: The specification indicates that we want to calculate the foreign probes as such: - // 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign - // where tcp_foreign, tls_foreign, http_foreign are the P90 RTTs for the connection - // of the tcp, tls and http connections, respectively. However, we cannot break out - // the individual RTTs so we assume that they are roughly equal. + if !testRanToStability { + fmt.Printf("Test did not run to stability, these results are estimates:\n") + } - // This is 60 because we measure in seconds not ms - p90Rpm := 60.0 / (float64(selfProbeRoundTripTimeP90+foreignProbeRoundTripTimeP90) / 2.0) - meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0) + fmt.Printf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, directionResult.PNRpm, 90) + fmt.Printf("%s RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel, + directionResult.MeanRpm, specParameters.TrimmedMeanPct) - if *debugCliFlag { - fmt.Printf( - `Total Self Probes: %d -Total Foreign Probes: %d -Trimmed Self Probes Count: %d -Trimmed Foreign Probes Count: %d -P90 Self RTT: %f -P90 Foreign RTT: %f -Trimmed Mean Self RTT: %f -Trimmed Mean Foreign RTT: %f -`, - selfRttsTotalCount, - foreignRttsTotalCount, - selfRttsTrimmedCount, - foreignRttsTrimmedCount, - selfProbeRoundTripTimeP90, - foreignProbeRoundTripTimeP90, - selfProbeRoundTripTimeMean, - foreignProbeRoundTripTimeMean, - ) - } + if len(*prometheusStatsFilename) > 0 { + var testStable int + if testRanToStability { + testStable = 1 + } + var buffer bytes.Buffer + buffer.WriteString(fmt.Sprintf("networkquality_%v_test_stable %d\n", + strings.ToLower(direction.DirectionLabel), testStable)) + buffer.WriteString(fmt.Sprintf("networkquality_%v_p90_rpm_value %d\n", + strings.ToLower(direction.DirectionLabel), int64(directionResult.PNRpm))) + buffer.WriteString(fmt.Sprintf("networkquality_%v_trimmed_rpm_value %d\n", + strings.ToLower(direction.DirectionLabel), + int64(directionResult.MeanRpm))) - if *printQualityAttenuation { - fmt.Println("Quality Attenuation Statistics:") - fmt.Printf( - `Number of losses: %d -Number of samples: %d -Loss: %f -Min: %.6f -Max: %.6f -Mean: %.6f -Variance: %.6f -Standard Deviation: %.6f -PDV(90): %.6f -PDV(99): %.6f -P(90): %.6f -P(99): %.6f -`, selfRttsQualityAttenuation.GetNumberOfLosses(), - selfRttsQualityAttenuation.GetNumberOfSamples(), - selfRttsQualityAttenuation.GetLossPercentage(), - selfRttsQualityAttenuation.GetMinimum(), - selfRttsQualityAttenuation.GetMaximum(), - selfRttsQualityAttenuation.GetAverage(), - selfRttsQualityAttenuation.GetVariance(), - selfRttsQualityAttenuation.GetStandardDeviation(), - selfRttsQualityAttenuation.GetPDV(90), - selfRttsQualityAttenuation.GetPDV(99), - selfRttsQualityAttenuation.GetPercentile(90), - selfRttsQualityAttenuation.GetPercentile(99)) - } + buffer.WriteString(fmt.Sprintf("networkquality_%v_bits_per_second %d\n", + strings.ToLower(direction.DirectionLabel), int64(lastThroughputRate))) + buffer.WriteString(fmt.Sprintf("networkquality_%v_connections %d\n", + strings.ToLower(direction.DirectionLabel), + int64(lastThroughputOpenConnectionCount))) - if !testRanToStability { - fmt.Printf("Test did not run to stability, these results are estimates:\n") - } + if err := os.WriteFile(*prometheusStatsFilename, buffer.Bytes(), 0o644); err != nil { + fmt.Printf("could not write %s: %s", *prometheusStatsFilename, err) + os.Exit(1) + } + } - fmt.Printf("RPM: %5.0f (P90)\n", p90Rpm) - fmt.Printf("RPM: %5.0f (Double-Sided 10%% Trimmed Mean)\n", meanRpm) + direction.ThroughputDataLogger.Export() + if *debugCliFlag { + fmt.Printf("Closing the %v throughput data logger.\n", direction.DirectionLabel) + } + direction.ThroughputDataLogger.Close() - fmt.Printf( - "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", - utilities.ToMbps(lastDownloadThroughputRate), - utilities.ToMBps(lastDownloadThroughputRate), - lastDownloadThroughputOpenConnectionCount, - ) - fmt.Printf( - "Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", - utilities.ToMbps(lastUploadThroughputRate), - utilities.ToMBps(lastUploadThroughputRate), - lastUploadThroughputOpenConnectionCount, - ) + direction.GranularThroughputDataLogger.Export() + if *debugCliFlag { + fmt.Printf("Closing the %v granular throughput data logger.\n", direction.DirectionLabel) + } + direction.GranularThroughputDataLogger.Close() - if *calculateExtendedStats { - fmt.Println(extendedStats.Repr()) + if *debugCliFlag { + fmt.Printf("In debugging mode, we will cool down between tests.\n") + time.Sleep(constants.CooldownPeriod) + fmt.Printf("Done cooling down.\n") + } } - selfProbeDataLogger.Export() - if *debugCliFlag { - fmt.Printf("Closing the self data logger.\n") - } - selfProbeDataLogger.Close() + result := rpm.CalculateRpm(selfRtts, foreignRtts, specParameters.TrimmedMeanPct, 90) - foreignProbeDataLogger.Export() if *debugCliFlag { - fmt.Printf("Closing the foreign data logger.\n") + fmt.Printf("(Final RPM Calculation stats): %v\n", result.ToString()) } - foreignProbeDataLogger.Close() - downloadThroughputDataLogger.Export() - if *debugCliFlag { - fmt.Printf("Closing the download throughput data logger.\n") - } - downloadThroughputDataLogger.Close() + fmt.Printf("Final RPM: %5.0f (P%d)\n", result.PNRpm, 90) + fmt.Printf("Final RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n", + result.MeanRpm, specParameters.TrimmedMeanPct) - uploadThroughputDataLogger.Export() - if *debugCliFlag { - fmt.Printf("Closing the upload throughput data logger.\n") - } - uploadThroughputDataLogger.Close() + // Stop the world. + operatingCtxCancel() - granularThroughputDataLogger.Export() + // Note: We do *not* have to export/close the upload *and* download + // sides of the self/foreign probe data loggers because they both + // refer to the same logger. Closing/exporting one will close/export + // the other. + uploadDirection.SelfProbeDataLogger.Export() if *debugCliFlag { - fmt.Printf("Closing the granular throughput data logger.\n") + fmt.Printf("Closing the self data loggers.\n") } - granularThroughputDataLogger.Close() + uploadDirection.SelfProbeDataLogger.Close() + uploadDirection.ForeignProbeDataLogger.Export() if *debugCliFlag { - fmt.Printf("In debugging mode, we will cool down.\n") - time.Sleep(constants.CooldownPeriod) - fmt.Printf("Done cooling down.\n") - } - - if len(*prometheusStatsFilename) > 0 { - var testStable int - if testRanToStability { - testStable = 1 - } - var buffer bytes.Buffer - buffer.WriteString(fmt.Sprintf("networkquality_test_stable %d\n", testStable)) - buffer.WriteString(fmt.Sprintf("networkquality_rpm_value %d\n", int64(p90Rpm))) - buffer.WriteString(fmt.Sprintf("networkquality_trimmed_rpm_value %d\n", - int64(meanRpm))) // utilities.ToMbps(lastDownloadThroughputRate), - - buffer.WriteString(fmt.Sprintf("networkquality_download_bits_per_second %d\n", int64(lastDownloadThroughputRate))) - buffer.WriteString(fmt.Sprintf("networkquality_download_connections %d\n", - int64(lastDownloadThroughputOpenConnectionCount))) - buffer.WriteString(fmt.Sprintf("networkquality_upload_bits_per_second %d\n", int64(lastUploadThroughputRate))) - buffer.WriteString(fmt.Sprintf("networkquality_upload_connections %d\n", - lastUploadThroughputOpenConnectionCount)) - - if err := os.WriteFile(*prometheusStatsFilename, buffer.Bytes(), 0o644); err != nil { - fmt.Printf("could not write %s: %s", *prometheusStatsFilename, err) - os.Exit(1) - } + fmt.Printf("Closing the foreign data loggers.\n") } + uploadDirection.SelfProbeDataLogger.Close() } |
