diff options
| -rw-r--r-- | networkQuality.go | 164 | ||||
| -rw-r--r-- | rpm/calculations.go | 4 | ||||
| -rw-r--r-- | rpm/rpm.go | 127 |
3 files changed, 233 insertions, 62 deletions
diff --git a/networkQuality.go b/networkQuality.go index 6e1940d..ed9dcfa 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -160,6 +160,11 @@ var ( false, "Show version.", ) + calculateRelativeRpm = flag.Bool( + "relative-rpm", + false, + "Calculate a relative RPM.", + ) ) func main() { @@ -475,6 +480,150 @@ func main() { defer pprof.StopCPUProfile() } + globalNumericBucketGenerator := series.NewNumericBucketGenerator[uint64](0) + + var baselineRpm *rpm.Rpm[float64] = nil + + if *calculateRelativeRpm { + baselineForeignDownloadRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) + baselineFauxSelfDownloadRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) + baselineStableResponsiveness := false + baselineProbeDebugging := debug.NewDebugWithPrefix(debugLevel, "Baseline RPM Calculation Probe") + + timeoutDuration := specParameters.TestTimeout + timeoutAbsoluteTime := time.Now().Add(timeoutDuration) + + timeoutChannel := timeoutat.TimeoutAt( + operatingCtx, + timeoutAbsoluteTime, + debugLevel, + ) + if debug.IsDebug(debugLevel) { + fmt.Printf("Baseline RPM calculation will end no later than %v\n", timeoutAbsoluteTime) + } + + baselineProberOperatorCtx, baselineProberOperatorCtxCancel := context.WithCancel(operatingCtx) + + // This context is used to control the network activity (i.e., it controls all + // the connections that are open to do load generation and probing). + baselineNetworkActivityCtx, baselineNetworkActivityCtxCancel := context.WithCancel(operatingCtx) + + baselineResponsivenessStabilizerDebugConfig := + debug.NewDebugWithPrefix(debug.Debug, "Baseline Responsiveness Stabilizer") + baselineResponsivenessStabilizerDebugLevel := debug.Error + if *debugCliFlag { + baselineResponsivenessStabilizerDebugLevel = debug.Debug + } + baselineResponsivenessStabilizer := stabilizer.NewStabilizer[int64, uint64]( + specParameters.MovingAvgDist, specParameters.StdDevTolerance, + specParameters.TrimmedMeanPct, "milliseconds", + baselineResponsivenessStabilizerDebugLevel, + baselineResponsivenessStabilizerDebugConfig) + + baselineStabilityCheckTime := time.Now().Add(specParameters.EvalInterval) + baselineStabilityCheckTimeChannel := timeoutat.TimeoutAt( + operatingCtx, + baselineStabilityCheckTime, + debugLevel, + ) + + responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber( + baselineProberOperatorCtx, + baselineNetworkActivityCtx, + generateForeignProbeConfiguration, + generateSelfProbeConfiguration, + nil, + &globalNumericBucketGenerator, + lgc.LGC_DOWN, + specParameters.ProbeInterval, + sslKeyFileConcurrentWriter, + *calculateExtendedStats, + baselineProbeDebugging, + ) + + baseline_responsiveness_timeout: + for !baselineStableResponsiveness { + select { + case probeMeasurement := <-responsivenessStabilizationCommunicationChannel: + { + switch probeMeasurement.Type { + case series.SeriesMessageReserve: + { + bucket := probeMeasurement.Bucket + if *debugCliFlag { + fmt.Printf("baseline: Reserving a responsiveness bucket with id %v.\n", bucket) + } + baselineResponsivenessStabilizer.Reserve(bucket) + baselineForeignDownloadRtts.Reserve(bucket) + baselineFauxSelfDownloadRtts.Reserve(bucket) + } + case series.SeriesMessageMeasure: + { + bucket := probeMeasurement.Bucket + measurement := utilities.GetSome(probeMeasurement.Measure) + foreignDataPoint := measurement.Foreign + + if *debugCliFlag { + fmt.Printf( + "baseline: Filling a responsiveness bucket with id %v with value %v.\n", bucket, measurement) + } + baselineResponsivenessStabilizer.AddMeasurement( + bucket, foreignDataPoint.Duration.Milliseconds()) + + if err := baselineForeignDownloadRtts.Fill( + bucket, foreignDataPoint.Duration.Seconds()); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (baselineForeignDownloadRtts)\n", bucket) + } + if err := baselineFauxSelfDownloadRtts.Fill( + bucket, foreignDataPoint.Duration.Seconds()/3.0); err != nil { + fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (baselineFauxSelfDownloadRtts)\n", bucket) + } + } + } + } + case <-timeoutChannel: + { + break baseline_responsiveness_timeout + } + case <-baselineStabilityCheckTimeChannel: + { + if *debugCliFlag { + fmt.Printf("baseline responsiveness stability interval is complete.\n") + } + + baselineStabilityCheckTime = time.Now().Add(specParameters.EvalInterval) + baselineStabilityCheckTimeChannel = timeoutat.TimeoutAt( + operatingCtx, + baselineStabilityCheckTime, + debugLevel, + ) + + // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy + // is *actually* important, but it can't hurt? + baselineStableResponsiveness = baselineResponsivenessStabilizer.IsStable() + + if *debugCliFlag { + fmt.Printf( + "baseline responsiveness is instantaneously %s.\n", + utilities.Conditional(baselineStableResponsiveness, "stable", "unstable")) + } + + baselineResponsivenessStabilizer.Interval() + } + } + } + baselineNetworkActivityCtxCancel() + baselineProberOperatorCtxCancel() + + baselineRpm = rpm.CalculateRpm(baselineFauxSelfDownloadRtts, + baselineForeignDownloadRtts, specParameters.TrimmedMeanPct, specParameters.Percentile) + + fmt.Printf("Baseline RPM: %5.0f (P%d)\n", baselineRpm.PNRpm, specParameters.Percentile) + fmt.Printf("Baseline RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", + baselineRpm.MeanRpm, specParameters.TrimmedMeanPct) + + } + // All tests will accumulate data to these series because it will all matter for RPM calculation! selfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) foreignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0) @@ -484,8 +633,6 @@ func main() { selfRttsQualityAttenuation = qualityattenuation.NewSimpleQualityAttenuation() } - globalNumericBucketGenerator := series.NewNumericBucketGenerator[uint64](0) - for _, direction := range directions { timeoutDuration := specParameters.TestTimeout @@ -953,6 +1100,19 @@ Gaming QoO: %.0f fmt.Printf("Final RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n", result.MeanRpm, specParameters.TrimmedMeanPct) + if *calculateRelativeRpm { + if baselineRpm == nil { + fmt.Printf("User requested relative RPM calculation but an unloaded RPM was not calculated.") + } else { + relativeRpmFactorP := utilities.AbsPercentDifference(result.PNRpm, baselineRpm.PNRpm) + relativeRpmFactorTM := utilities.AbsPercentDifference(result.MeanRpm, baselineRpm.MeanRpm) + fmt.Printf("Working Conditions RPM Effect: %5.0f%% (P%d)\n", + relativeRpmFactorP, specParameters.Percentile) + fmt.Printf("Working Conditions RPM Effect: %5.0f%% (Single-Sided %v%% Trimmed Mean)\n", + relativeRpmFactorTM, specParameters.TrimmedMeanPct) + } + } + // Stop the world. operatingCtxCancel() diff --git a/rpm/calculations.go b/rpm/calculations.go index 8b3cf92..f3619dc 100644 --- a/rpm/calculations.go +++ b/rpm/calculations.go @@ -37,7 +37,7 @@ type Rpm[Data utilities.Number] struct { func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered]( selfRtts series.WindowSeries[Data, Bucket], aggregatedForeignRtts series.WindowSeries[Data, Bucket], trimming uint, percentile uint, -) Rpm[Data] { +) *Rpm[Data] { // There may be more than one round trip accumulated together. If that is the case, // we will blow them apart in to three separate measurements and each one will just // be 1 / 3. @@ -82,7 +82,7 @@ func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered]( pnRpm := 60.0 / (float64(selfProbeRoundTripTimePN+foreignProbeRoundTripTimePN) / 2.0) meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0) - return Rpm[Data]{ + return &Rpm[Data]{ SelfRttsTotal: selfRttsTotalCount, ForeignRttsTotal: foreignRttsTotalCount, SelfRttsTrimmed: selfRttsTrimmedCount, ForeignRttsTrimmed: foreignRttsTrimmedCount, SelfProbeRttPN: selfProbeRoundTripTimePN, ForeignProbeRttPN: foreignProbeRoundTripTimePN, @@ -204,83 +204,95 @@ func ResponsivenessProber[BucketType utilities.Number]( } var selfProbeConnection *lgc.LoadGeneratingConnection = nil - func() { - selfProbeConnectionCollection.Lock.Lock() - defer selfProbeConnectionCollection.Lock.Unlock() - selfProbeConnection, err = selfProbeConnectionCollection.GetRandom() - if err != nil { + if selfProbeConnectionCollection != nil { + func() { + selfProbeConnectionCollection.Lock.Lock() + defer selfProbeConnectionCollection.Lock.Unlock() + selfProbeConnection, err = selfProbeConnectionCollection.GetRandom() + if err != nil { + if debug.IsWarn(debugging.Level) { + fmt.Printf( + "(%s) Failed to get a random %s load-generating connection on which to send a probe: %v.\n", + debugging.Prefix, + probeDirection, + err, + ) + } + return + } + }() + } + if selfProbeConnectionCollection != nil && selfProbeConnection == nil { + return + } + + var selfProbeDataPoint *probe.ProbeDataPoint = nil + if selfProbeConnection != nil { + // TODO: Make the following sanity check more than just a check. + // We only want to start a SelfUp probe on a connection that is + // in the RUNNING state. + if (*selfProbeConnection).Status() != lgc.LGC_STATUS_RUNNING { if debug.IsWarn(debugging.Level) { fmt.Printf( - "(%s) Failed to get a random %s load-generating connection on which to send a probe: %v.\n", + "(%s) The selected random %s load-generating connection on which to send a probe was not running.\n", debugging.Prefix, probeDirection, - err, ) } return } - }() - if selfProbeConnection == nil { - return - } - // TODO: Make the following sanity check more than just a check. - // We only want to start a SelfUp probe on a connection that is - // in the RUNNING state. - if (*selfProbeConnection).Status() != lgc.LGC_STATUS_RUNNING { - if debug.IsWarn(debugging.Level) { + if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) The selected random %s load-generating connection on which to send a probe was not running.\n", + "(%s) Selected %s load-generating connection with ID %d to send a self probe with Id %d.\n", debugging.Prefix, probeDirection, + (*selfProbeConnection).ClientId(), + probeCount, ) } - return - } - - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "(%s) Selected %s load-generating connection with ID %d to send a self probe with Id %d.\n", - debugging.Prefix, - probeDirection, - (*selfProbeConnection).ClientId(), + selfProbeDataPoint, err = probe.Probe( + proberCtx, + (*selfProbeConnection).Client(), + selfProbeConfiguration.URL, + selfProbeConfiguration.Host, + utilities.Conditional(probeDirection == lgc.LGC_DOWN, probe.SelfDown, probe.SelfUp), probeCount, + captureExtendedStats, + debugging, ) - } - selfProbeDataPoint, err := probe.Probe( - proberCtx, - (*selfProbeConnection).Client(), - selfProbeConfiguration.URL, - selfProbeConfiguration.Host, - utilities.Conditional(probeDirection == lgc.LGC_DOWN, probe.SelfDown, probe.SelfUp), - probeCount, - captureExtendedStats, - debugging, - ) - if err != nil { - // We may see an error here because the prober context was cancelled - // and requests were attempting to be sent. This situation is not an - // error (per se) so we will not log it as such. + if err != nil { + // We may see an error here because the prober context was cancelled + // and requests were attempting to be sent. This situation is not an + // error (per se) so we will not log it as such. - if proberCtx.Err() != nil { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "(%s) Failed to send a probe (id: %v) because the prober context was cancelled.\n", - debugging.Prefix, - probeCount, - ) + if proberCtx.Err() != nil { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) Failed to send a probe (id: %v) because the prober context was cancelled.\n", + debugging.Prefix, + probeCount, + ) + } + return } + fmt.Printf( + "(%s) There was an error sending a self probe with Id %d: %v\n", + debugging.Prefix, + probeCount, + err, + ) return } - fmt.Printf( - "(%s) There was an error sending a self probe with Id %d: %v\n", - debugging.Prefix, - probeCount, - err, - ) - return + } else { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) Did not send a self probe at id %d of probes!\n", + debugging.Prefix, + probeCount, + ) + } } - if debug.IsDebug(debugging.Level) { fmt.Printf( "(%s) About to report results for round %d of probes!\n", @@ -288,10 +300,9 @@ func ResponsivenessProber[BucketType utilities.Number]( probeCount, ) } - dataPointsLock.Lock() defer dataPointsLock.Unlock() - // Now we have our four data points (three in the foreign probe data point and one in the self probe data point) + // Now we have our (maybe) four data points (three in the foreign probe data point and [maybe] one in the self probe data point) if dataPoints != nil { measurement := ResponsivenessProbeResult{ Foreign: foreignProbeDataPoint, Self: selfProbeDataPoint, |
