summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2023-07-14 14:08:57 -0400
committerWill Hawkins <[email protected]>2023-07-14 15:26:30 -0400
commitcca033fe0d7389bfa647afa47a47de2a3f6af47d (patch)
treea114899b9b78cf775b920a303dfd98da76151c92
parent123e75ac641721de9c19a652c9b1450b60bc7ef9 (diff)
[Feature] Relative RPM
With this feature, the user can use `--relative-rpm` to gather additional data: a relative RPM. The relative RPM score 1. Calculates an RPM before working conditions are achieved. 2. Achieves working conditions (upload and download) 3. Calculates an RPM under working conditions (upload and download) 4. Calculates the percent difference between the RPM calculated in (1) and the RPMs calculated in (3). Signed-off-by: Will Hawkins <[email protected]>
-rw-r--r--networkQuality.go164
-rw-r--r--rpm/calculations.go4
-rw-r--r--rpm/rpm.go127
3 files changed, 233 insertions, 62 deletions
diff --git a/networkQuality.go b/networkQuality.go
index 6e1940d..ed9dcfa 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -160,6 +160,11 @@ var (
false,
"Show version.",
)
+ calculateRelativeRpm = flag.Bool(
+ "relative-rpm",
+ false,
+ "Calculate a relative RPM.",
+ )
)
func main() {
@@ -475,6 +480,150 @@ func main() {
defer pprof.StopCPUProfile()
}
+ globalNumericBucketGenerator := series.NewNumericBucketGenerator[uint64](0)
+
+ var baselineRpm *rpm.Rpm[float64] = nil
+
+ if *calculateRelativeRpm {
+ baselineForeignDownloadRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
+ baselineFauxSelfDownloadRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
+ baselineStableResponsiveness := false
+ baselineProbeDebugging := debug.NewDebugWithPrefix(debugLevel, "Baseline RPM Calculation Probe")
+
+ timeoutDuration := specParameters.TestTimeout
+ timeoutAbsoluteTime := time.Now().Add(timeoutDuration)
+
+ timeoutChannel := timeoutat.TimeoutAt(
+ operatingCtx,
+ timeoutAbsoluteTime,
+ debugLevel,
+ )
+ if debug.IsDebug(debugLevel) {
+ fmt.Printf("Baseline RPM calculation will end no later than %v\n", timeoutAbsoluteTime)
+ }
+
+ baselineProberOperatorCtx, baselineProberOperatorCtxCancel := context.WithCancel(operatingCtx)
+
+ // This context is used to control the network activity (i.e., it controls all
+ // the connections that are open to do load generation and probing).
+ baselineNetworkActivityCtx, baselineNetworkActivityCtxCancel := context.WithCancel(operatingCtx)
+
+ baselineResponsivenessStabilizerDebugConfig :=
+ debug.NewDebugWithPrefix(debug.Debug, "Baseline Responsiveness Stabilizer")
+ baselineResponsivenessStabilizerDebugLevel := debug.Error
+ if *debugCliFlag {
+ baselineResponsivenessStabilizerDebugLevel = debug.Debug
+ }
+ baselineResponsivenessStabilizer := stabilizer.NewStabilizer[int64, uint64](
+ specParameters.MovingAvgDist, specParameters.StdDevTolerance,
+ specParameters.TrimmedMeanPct, "milliseconds",
+ baselineResponsivenessStabilizerDebugLevel,
+ baselineResponsivenessStabilizerDebugConfig)
+
+ baselineStabilityCheckTime := time.Now().Add(specParameters.EvalInterval)
+ baselineStabilityCheckTimeChannel := timeoutat.TimeoutAt(
+ operatingCtx,
+ baselineStabilityCheckTime,
+ debugLevel,
+ )
+
+ responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber(
+ baselineProberOperatorCtx,
+ baselineNetworkActivityCtx,
+ generateForeignProbeConfiguration,
+ generateSelfProbeConfiguration,
+ nil,
+ &globalNumericBucketGenerator,
+ lgc.LGC_DOWN,
+ specParameters.ProbeInterval,
+ sslKeyFileConcurrentWriter,
+ *calculateExtendedStats,
+ baselineProbeDebugging,
+ )
+
+ baseline_responsiveness_timeout:
+ for !baselineStableResponsiveness {
+ select {
+ case probeMeasurement := <-responsivenessStabilizationCommunicationChannel:
+ {
+ switch probeMeasurement.Type {
+ case series.SeriesMessageReserve:
+ {
+ bucket := probeMeasurement.Bucket
+ if *debugCliFlag {
+ fmt.Printf("baseline: Reserving a responsiveness bucket with id %v.\n", bucket)
+ }
+ baselineResponsivenessStabilizer.Reserve(bucket)
+ baselineForeignDownloadRtts.Reserve(bucket)
+ baselineFauxSelfDownloadRtts.Reserve(bucket)
+ }
+ case series.SeriesMessageMeasure:
+ {
+ bucket := probeMeasurement.Bucket
+ measurement := utilities.GetSome(probeMeasurement.Measure)
+ foreignDataPoint := measurement.Foreign
+
+ if *debugCliFlag {
+ fmt.Printf(
+ "baseline: Filling a responsiveness bucket with id %v with value %v.\n", bucket, measurement)
+ }
+ baselineResponsivenessStabilizer.AddMeasurement(
+ bucket, foreignDataPoint.Duration.Milliseconds())
+
+ if err := baselineForeignDownloadRtts.Fill(
+ bucket, foreignDataPoint.Duration.Seconds()); err != nil {
+ fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (baselineForeignDownloadRtts)\n", bucket)
+ }
+ if err := baselineFauxSelfDownloadRtts.Fill(
+ bucket, foreignDataPoint.Duration.Seconds()/3.0); err != nil {
+ fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (baselineFauxSelfDownloadRtts)\n", bucket)
+ }
+ }
+ }
+ }
+ case <-timeoutChannel:
+ {
+ break baseline_responsiveness_timeout
+ }
+ case <-baselineStabilityCheckTimeChannel:
+ {
+ if *debugCliFlag {
+ fmt.Printf("baseline responsiveness stability interval is complete.\n")
+ }
+
+ baselineStabilityCheckTime = time.Now().Add(specParameters.EvalInterval)
+ baselineStabilityCheckTimeChannel = timeoutat.TimeoutAt(
+ operatingCtx,
+ baselineStabilityCheckTime,
+ debugLevel,
+ )
+
+ // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy
+ // is *actually* important, but it can't hurt?
+ baselineStableResponsiveness = baselineResponsivenessStabilizer.IsStable()
+
+ if *debugCliFlag {
+ fmt.Printf(
+ "baseline responsiveness is instantaneously %s.\n",
+ utilities.Conditional(baselineStableResponsiveness, "stable", "unstable"))
+ }
+
+ baselineResponsivenessStabilizer.Interval()
+ }
+ }
+ }
+ baselineNetworkActivityCtxCancel()
+ baselineProberOperatorCtxCancel()
+
+ baselineRpm = rpm.CalculateRpm(baselineFauxSelfDownloadRtts,
+ baselineForeignDownloadRtts, specParameters.TrimmedMeanPct, specParameters.Percentile)
+
+ fmt.Printf("Baseline RPM: %5.0f (P%d)\n", baselineRpm.PNRpm, specParameters.Percentile)
+ fmt.Printf("Baseline RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n",
+ baselineRpm.MeanRpm, specParameters.TrimmedMeanPct)
+
+ }
+
// All tests will accumulate data to these series because it will all matter for RPM calculation!
selfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
foreignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
@@ -484,8 +633,6 @@ func main() {
selfRttsQualityAttenuation = qualityattenuation.NewSimpleQualityAttenuation()
}
- globalNumericBucketGenerator := series.NewNumericBucketGenerator[uint64](0)
-
for _, direction := range directions {
timeoutDuration := specParameters.TestTimeout
@@ -953,6 +1100,19 @@ Gaming QoO: %.0f
fmt.Printf("Final RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n",
result.MeanRpm, specParameters.TrimmedMeanPct)
+ if *calculateRelativeRpm {
+ if baselineRpm == nil {
+ fmt.Printf("User requested relative RPM calculation but an unloaded RPM was not calculated.")
+ } else {
+ relativeRpmFactorP := utilities.AbsPercentDifference(result.PNRpm, baselineRpm.PNRpm)
+ relativeRpmFactorTM := utilities.AbsPercentDifference(result.MeanRpm, baselineRpm.MeanRpm)
+ fmt.Printf("Working Conditions RPM Effect: %5.0f%% (P%d)\n",
+ relativeRpmFactorP, specParameters.Percentile)
+ fmt.Printf("Working Conditions RPM Effect: %5.0f%% (Single-Sided %v%% Trimmed Mean)\n",
+ relativeRpmFactorTM, specParameters.TrimmedMeanPct)
+ }
+ }
+
// Stop the world.
operatingCtxCancel()
diff --git a/rpm/calculations.go b/rpm/calculations.go
index 8b3cf92..f3619dc 100644
--- a/rpm/calculations.go
+++ b/rpm/calculations.go
@@ -37,7 +37,7 @@ type Rpm[Data utilities.Number] struct {
func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered](
selfRtts series.WindowSeries[Data, Bucket], aggregatedForeignRtts series.WindowSeries[Data, Bucket], trimming uint, percentile uint,
-) Rpm[Data] {
+) *Rpm[Data] {
// There may be more than one round trip accumulated together. If that is the case,
// we will blow them apart in to three separate measurements and each one will just
// be 1 / 3.
@@ -82,7 +82,7 @@ func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered](
pnRpm := 60.0 / (float64(selfProbeRoundTripTimePN+foreignProbeRoundTripTimePN) / 2.0)
meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0)
- return Rpm[Data]{
+ return &Rpm[Data]{
SelfRttsTotal: selfRttsTotalCount, ForeignRttsTotal: foreignRttsTotalCount,
SelfRttsTrimmed: selfRttsTrimmedCount, ForeignRttsTrimmed: foreignRttsTrimmedCount,
SelfProbeRttPN: selfProbeRoundTripTimePN, ForeignProbeRttPN: foreignProbeRoundTripTimePN,
diff --git a/rpm/rpm.go b/rpm/rpm.go
index d1c07a5..3ebdefd 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -204,83 +204,95 @@ func ResponsivenessProber[BucketType utilities.Number](
}
var selfProbeConnection *lgc.LoadGeneratingConnection = nil
- func() {
- selfProbeConnectionCollection.Lock.Lock()
- defer selfProbeConnectionCollection.Lock.Unlock()
- selfProbeConnection, err = selfProbeConnectionCollection.GetRandom()
- if err != nil {
+ if selfProbeConnectionCollection != nil {
+ func() {
+ selfProbeConnectionCollection.Lock.Lock()
+ defer selfProbeConnectionCollection.Lock.Unlock()
+ selfProbeConnection, err = selfProbeConnectionCollection.GetRandom()
+ if err != nil {
+ if debug.IsWarn(debugging.Level) {
+ fmt.Printf(
+ "(%s) Failed to get a random %s load-generating connection on which to send a probe: %v.\n",
+ debugging.Prefix,
+ probeDirection,
+ err,
+ )
+ }
+ return
+ }
+ }()
+ }
+ if selfProbeConnectionCollection != nil && selfProbeConnection == nil {
+ return
+ }
+
+ var selfProbeDataPoint *probe.ProbeDataPoint = nil
+ if selfProbeConnection != nil {
+ // TODO: Make the following sanity check more than just a check.
+ // We only want to start a SelfUp probe on a connection that is
+ // in the RUNNING state.
+ if (*selfProbeConnection).Status() != lgc.LGC_STATUS_RUNNING {
if debug.IsWarn(debugging.Level) {
fmt.Printf(
- "(%s) Failed to get a random %s load-generating connection on which to send a probe: %v.\n",
+ "(%s) The selected random %s load-generating connection on which to send a probe was not running.\n",
debugging.Prefix,
probeDirection,
- err,
)
}
return
}
- }()
- if selfProbeConnection == nil {
- return
- }
- // TODO: Make the following sanity check more than just a check.
- // We only want to start a SelfUp probe on a connection that is
- // in the RUNNING state.
- if (*selfProbeConnection).Status() != lgc.LGC_STATUS_RUNNING {
- if debug.IsWarn(debugging.Level) {
+ if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) The selected random %s load-generating connection on which to send a probe was not running.\n",
+ "(%s) Selected %s load-generating connection with ID %d to send a self probe with Id %d.\n",
debugging.Prefix,
probeDirection,
+ (*selfProbeConnection).ClientId(),
+ probeCount,
)
}
- return
- }
-
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "(%s) Selected %s load-generating connection with ID %d to send a self probe with Id %d.\n",
- debugging.Prefix,
- probeDirection,
- (*selfProbeConnection).ClientId(),
+ selfProbeDataPoint, err = probe.Probe(
+ proberCtx,
+ (*selfProbeConnection).Client(),
+ selfProbeConfiguration.URL,
+ selfProbeConfiguration.Host,
+ utilities.Conditional(probeDirection == lgc.LGC_DOWN, probe.SelfDown, probe.SelfUp),
probeCount,
+ captureExtendedStats,
+ debugging,
)
- }
- selfProbeDataPoint, err := probe.Probe(
- proberCtx,
- (*selfProbeConnection).Client(),
- selfProbeConfiguration.URL,
- selfProbeConfiguration.Host,
- utilities.Conditional(probeDirection == lgc.LGC_DOWN, probe.SelfDown, probe.SelfUp),
- probeCount,
- captureExtendedStats,
- debugging,
- )
- if err != nil {
- // We may see an error here because the prober context was cancelled
- // and requests were attempting to be sent. This situation is not an
- // error (per se) so we will not log it as such.
+ if err != nil {
+ // We may see an error here because the prober context was cancelled
+ // and requests were attempting to be sent. This situation is not an
+ // error (per se) so we will not log it as such.
- if proberCtx.Err() != nil {
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "(%s) Failed to send a probe (id: %v) because the prober context was cancelled.\n",
- debugging.Prefix,
- probeCount,
- )
+ if proberCtx.Err() != nil {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) Failed to send a probe (id: %v) because the prober context was cancelled.\n",
+ debugging.Prefix,
+ probeCount,
+ )
+ }
+ return
}
+ fmt.Printf(
+ "(%s) There was an error sending a self probe with Id %d: %v\n",
+ debugging.Prefix,
+ probeCount,
+ err,
+ )
return
}
- fmt.Printf(
- "(%s) There was an error sending a self probe with Id %d: %v\n",
- debugging.Prefix,
- probeCount,
- err,
- )
- return
+ } else {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) Did not send a self probe at id %d of probes!\n",
+ debugging.Prefix,
+ probeCount,
+ )
+ }
}
-
if debug.IsDebug(debugging.Level) {
fmt.Printf(
"(%s) About to report results for round %d of probes!\n",
@@ -288,10 +300,9 @@ func ResponsivenessProber[BucketType utilities.Number](
probeCount,
)
}
-
dataPointsLock.Lock()
defer dataPointsLock.Unlock()
- // Now we have our four data points (three in the foreign probe data point and one in the self probe data point)
+ // Now we have our (maybe) four data points (three in the foreign probe data point and [maybe] one in the self probe data point)
if dataPoints != nil {
measurement := ResponsivenessProbeResult{
Foreign: foreignProbeDataPoint, Self: selfProbeDataPoint,