summaryrefslogtreecommitdiff
path: root/networkQuality.go
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2023-07-14 14:08:57 -0400
committerWill Hawkins <[email protected]>2023-07-14 15:26:30 -0400
commitcca033fe0d7389bfa647afa47a47de2a3f6af47d (patch)
treea114899b9b78cf775b920a303dfd98da76151c92 /networkQuality.go
parent123e75ac641721de9c19a652c9b1450b60bc7ef9 (diff)
[Feature] Relative RPM
With this feature, the user can use `--relative-rpm` to gather additional data: a relative RPM. The relative RPM score 1. Calculates an RPM before working conditions are achieved. 2. Achieves working conditions (upload and download) 3. Calculates an RPM under working conditions (upload and download) 4. Calculates the percent difference between the RPM calculated in (1) and the RPMs calculated in (3). Signed-off-by: Will Hawkins <[email protected]>
Diffstat (limited to 'networkQuality.go')
-rw-r--r--networkQuality.go164
1 files changed, 162 insertions, 2 deletions
diff --git a/networkQuality.go b/networkQuality.go
index 6e1940d..ed9dcfa 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -160,6 +160,11 @@ var (
false,
"Show version.",
)
+ calculateRelativeRpm = flag.Bool(
+ "relative-rpm",
+ false,
+ "Calculate a relative RPM.",
+ )
)
func main() {
@@ -475,6 +480,150 @@ func main() {
defer pprof.StopCPUProfile()
}
+ globalNumericBucketGenerator := series.NewNumericBucketGenerator[uint64](0)
+
+ var baselineRpm *rpm.Rpm[float64] = nil
+
+ if *calculateRelativeRpm {
+ baselineForeignDownloadRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
+ baselineFauxSelfDownloadRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
+ baselineStableResponsiveness := false
+ baselineProbeDebugging := debug.NewDebugWithPrefix(debugLevel, "Baseline RPM Calculation Probe")
+
+ timeoutDuration := specParameters.TestTimeout
+ timeoutAbsoluteTime := time.Now().Add(timeoutDuration)
+
+ timeoutChannel := timeoutat.TimeoutAt(
+ operatingCtx,
+ timeoutAbsoluteTime,
+ debugLevel,
+ )
+ if debug.IsDebug(debugLevel) {
+ fmt.Printf("Baseline RPM calculation will end no later than %v\n", timeoutAbsoluteTime)
+ }
+
+ baselineProberOperatorCtx, baselineProberOperatorCtxCancel := context.WithCancel(operatingCtx)
+
+ // This context is used to control the network activity (i.e., it controls all
+ // the connections that are open to do load generation and probing).
+ baselineNetworkActivityCtx, baselineNetworkActivityCtxCancel := context.WithCancel(operatingCtx)
+
+ baselineResponsivenessStabilizerDebugConfig :=
+ debug.NewDebugWithPrefix(debug.Debug, "Baseline Responsiveness Stabilizer")
+ baselineResponsivenessStabilizerDebugLevel := debug.Error
+ if *debugCliFlag {
+ baselineResponsivenessStabilizerDebugLevel = debug.Debug
+ }
+ baselineResponsivenessStabilizer := stabilizer.NewStabilizer[int64, uint64](
+ specParameters.MovingAvgDist, specParameters.StdDevTolerance,
+ specParameters.TrimmedMeanPct, "milliseconds",
+ baselineResponsivenessStabilizerDebugLevel,
+ baselineResponsivenessStabilizerDebugConfig)
+
+ baselineStabilityCheckTime := time.Now().Add(specParameters.EvalInterval)
+ baselineStabilityCheckTimeChannel := timeoutat.TimeoutAt(
+ operatingCtx,
+ baselineStabilityCheckTime,
+ debugLevel,
+ )
+
+ responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber(
+ baselineProberOperatorCtx,
+ baselineNetworkActivityCtx,
+ generateForeignProbeConfiguration,
+ generateSelfProbeConfiguration,
+ nil,
+ &globalNumericBucketGenerator,
+ lgc.LGC_DOWN,
+ specParameters.ProbeInterval,
+ sslKeyFileConcurrentWriter,
+ *calculateExtendedStats,
+ baselineProbeDebugging,
+ )
+
+ baseline_responsiveness_timeout:
+ for !baselineStableResponsiveness {
+ select {
+ case probeMeasurement := <-responsivenessStabilizationCommunicationChannel:
+ {
+ switch probeMeasurement.Type {
+ case series.SeriesMessageReserve:
+ {
+ bucket := probeMeasurement.Bucket
+ if *debugCliFlag {
+ fmt.Printf("baseline: Reserving a responsiveness bucket with id %v.\n", bucket)
+ }
+ baselineResponsivenessStabilizer.Reserve(bucket)
+ baselineForeignDownloadRtts.Reserve(bucket)
+ baselineFauxSelfDownloadRtts.Reserve(bucket)
+ }
+ case series.SeriesMessageMeasure:
+ {
+ bucket := probeMeasurement.Bucket
+ measurement := utilities.GetSome(probeMeasurement.Measure)
+ foreignDataPoint := measurement.Foreign
+
+ if *debugCliFlag {
+ fmt.Printf(
+ "baseline: Filling a responsiveness bucket with id %v with value %v.\n", bucket, measurement)
+ }
+ baselineResponsivenessStabilizer.AddMeasurement(
+ bucket, foreignDataPoint.Duration.Milliseconds())
+
+ if err := baselineForeignDownloadRtts.Fill(
+ bucket, foreignDataPoint.Duration.Seconds()); err != nil {
+ fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (baselineForeignDownloadRtts)\n", bucket)
+ }
+ if err := baselineFauxSelfDownloadRtts.Fill(
+ bucket, foreignDataPoint.Duration.Seconds()/3.0); err != nil {
+ fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (baselineFauxSelfDownloadRtts)\n", bucket)
+ }
+ }
+ }
+ }
+ case <-timeoutChannel:
+ {
+ break baseline_responsiveness_timeout
+ }
+ case <-baselineStabilityCheckTimeChannel:
+ {
+ if *debugCliFlag {
+ fmt.Printf("baseline responsiveness stability interval is complete.\n")
+ }
+
+ baselineStabilityCheckTime = time.Now().Add(specParameters.EvalInterval)
+ baselineStabilityCheckTimeChannel = timeoutat.TimeoutAt(
+ operatingCtx,
+ baselineStabilityCheckTime,
+ debugLevel,
+ )
+
+ // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy
+ // is *actually* important, but it can't hurt?
+ baselineStableResponsiveness = baselineResponsivenessStabilizer.IsStable()
+
+ if *debugCliFlag {
+ fmt.Printf(
+ "baseline responsiveness is instantaneously %s.\n",
+ utilities.Conditional(baselineStableResponsiveness, "stable", "unstable"))
+ }
+
+ baselineResponsivenessStabilizer.Interval()
+ }
+ }
+ }
+ baselineNetworkActivityCtxCancel()
+ baselineProberOperatorCtxCancel()
+
+ baselineRpm = rpm.CalculateRpm(baselineFauxSelfDownloadRtts,
+ baselineForeignDownloadRtts, specParameters.TrimmedMeanPct, specParameters.Percentile)
+
+ fmt.Printf("Baseline RPM: %5.0f (P%d)\n", baselineRpm.PNRpm, specParameters.Percentile)
+ fmt.Printf("Baseline RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n",
+ baselineRpm.MeanRpm, specParameters.TrimmedMeanPct)
+
+ }
+
// All tests will accumulate data to these series because it will all matter for RPM calculation!
selfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
foreignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
@@ -484,8 +633,6 @@ func main() {
selfRttsQualityAttenuation = qualityattenuation.NewSimpleQualityAttenuation()
}
- globalNumericBucketGenerator := series.NewNumericBucketGenerator[uint64](0)
-
for _, direction := range directions {
timeoutDuration := specParameters.TestTimeout
@@ -953,6 +1100,19 @@ Gaming QoO: %.0f
fmt.Printf("Final RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n",
result.MeanRpm, specParameters.TrimmedMeanPct)
+ if *calculateRelativeRpm {
+ if baselineRpm == nil {
+ fmt.Printf("User requested relative RPM calculation but an unloaded RPM was not calculated.")
+ } else {
+ relativeRpmFactorP := utilities.AbsPercentDifference(result.PNRpm, baselineRpm.PNRpm)
+ relativeRpmFactorTM := utilities.AbsPercentDifference(result.MeanRpm, baselineRpm.MeanRpm)
+ fmt.Printf("Working Conditions RPM Effect: %5.0f%% (P%d)\n",
+ relativeRpmFactorP, specParameters.Percentile)
+ fmt.Printf("Working Conditions RPM Effect: %5.0f%% (Single-Sided %v%% Trimmed Mean)\n",
+ relativeRpmFactorTM, specParameters.TrimmedMeanPct)
+ }
+ }
+
// Stop the world.
operatingCtxCancel()