summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--networkQuality.go164
-rw-r--r--rpm/calculations.go4
-rw-r--r--rpm/rpm.go127
3 files changed, 233 insertions, 62 deletions
diff --git a/networkQuality.go b/networkQuality.go
index 6e1940d..ed9dcfa 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -160,6 +160,11 @@ var (
false,
"Show version.",
)
+ calculateRelativeRpm = flag.Bool(
+ "relative-rpm",
+ false,
+ "Calculate a relative RPM.",
+ )
)
func main() {
@@ -475,6 +480,150 @@ func main() {
defer pprof.StopCPUProfile()
}
+ globalNumericBucketGenerator := series.NewNumericBucketGenerator[uint64](0)
+
+ var baselineRpm *rpm.Rpm[float64] = nil
+
+ if *calculateRelativeRpm {
+ baselineForeignDownloadRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
+ baselineFauxSelfDownloadRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
+ baselineStableResponsiveness := false
+ baselineProbeDebugging := debug.NewDebugWithPrefix(debugLevel, "Baseline RPM Calculation Probe")
+
+ timeoutDuration := specParameters.TestTimeout
+ timeoutAbsoluteTime := time.Now().Add(timeoutDuration)
+
+ timeoutChannel := timeoutat.TimeoutAt(
+ operatingCtx,
+ timeoutAbsoluteTime,
+ debugLevel,
+ )
+ if debug.IsDebug(debugLevel) {
+ fmt.Printf("Baseline RPM calculation will end no later than %v\n", timeoutAbsoluteTime)
+ }
+
+ baselineProberOperatorCtx, baselineProberOperatorCtxCancel := context.WithCancel(operatingCtx)
+
+ // This context is used to control the network activity (i.e., it controls all
+ // the connections that are open to do load generation and probing).
+ baselineNetworkActivityCtx, baselineNetworkActivityCtxCancel := context.WithCancel(operatingCtx)
+
+ baselineResponsivenessStabilizerDebugConfig :=
+ debug.NewDebugWithPrefix(debug.Debug, "Baseline Responsiveness Stabilizer")
+ baselineResponsivenessStabilizerDebugLevel := debug.Error
+ if *debugCliFlag {
+ baselineResponsivenessStabilizerDebugLevel = debug.Debug
+ }
+ baselineResponsivenessStabilizer := stabilizer.NewStabilizer[int64, uint64](
+ specParameters.MovingAvgDist, specParameters.StdDevTolerance,
+ specParameters.TrimmedMeanPct, "milliseconds",
+ baselineResponsivenessStabilizerDebugLevel,
+ baselineResponsivenessStabilizerDebugConfig)
+
+ baselineStabilityCheckTime := time.Now().Add(specParameters.EvalInterval)
+ baselineStabilityCheckTimeChannel := timeoutat.TimeoutAt(
+ operatingCtx,
+ baselineStabilityCheckTime,
+ debugLevel,
+ )
+
+ responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber(
+ baselineProberOperatorCtx,
+ baselineNetworkActivityCtx,
+ generateForeignProbeConfiguration,
+ generateSelfProbeConfiguration,
+ nil,
+ &globalNumericBucketGenerator,
+ lgc.LGC_DOWN,
+ specParameters.ProbeInterval,
+ sslKeyFileConcurrentWriter,
+ *calculateExtendedStats,
+ baselineProbeDebugging,
+ )
+
+ baseline_responsiveness_timeout:
+ for !baselineStableResponsiveness {
+ select {
+ case probeMeasurement := <-responsivenessStabilizationCommunicationChannel:
+ {
+ switch probeMeasurement.Type {
+ case series.SeriesMessageReserve:
+ {
+ bucket := probeMeasurement.Bucket
+ if *debugCliFlag {
+ fmt.Printf("baseline: Reserving a responsiveness bucket with id %v.\n", bucket)
+ }
+ baselineResponsivenessStabilizer.Reserve(bucket)
+ baselineForeignDownloadRtts.Reserve(bucket)
+ baselineFauxSelfDownloadRtts.Reserve(bucket)
+ }
+ case series.SeriesMessageMeasure:
+ {
+ bucket := probeMeasurement.Bucket
+ measurement := utilities.GetSome(probeMeasurement.Measure)
+ foreignDataPoint := measurement.Foreign
+
+ if *debugCliFlag {
+ fmt.Printf(
+ "baseline: Filling a responsiveness bucket with id %v with value %v.\n", bucket, measurement)
+ }
+ baselineResponsivenessStabilizer.AddMeasurement(
+ bucket, foreignDataPoint.Duration.Milliseconds())
+
+ if err := baselineForeignDownloadRtts.Fill(
+ bucket, foreignDataPoint.Duration.Seconds()); err != nil {
+ fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (baselineForeignDownloadRtts)\n", bucket)
+ }
+ if err := baselineFauxSelfDownloadRtts.Fill(
+ bucket, foreignDataPoint.Duration.Seconds()/3.0); err != nil {
+ fmt.Printf("Attempting to fill a bucket (id: %d) that does not exist (baselineFauxSelfDownloadRtts)\n", bucket)
+ }
+ }
+ }
+ }
+ case <-timeoutChannel:
+ {
+ break baseline_responsiveness_timeout
+ }
+ case <-baselineStabilityCheckTimeChannel:
+ {
+ if *debugCliFlag {
+ fmt.Printf("baseline responsiveness stability interval is complete.\n")
+ }
+
+ baselineStabilityCheckTime = time.Now().Add(specParameters.EvalInterval)
+ baselineStabilityCheckTimeChannel = timeoutat.TimeoutAt(
+ operatingCtx,
+ baselineStabilityCheckTime,
+ debugLevel,
+ )
+
+ // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy
+ // is *actually* important, but it can't hurt?
+ baselineStableResponsiveness = baselineResponsivenessStabilizer.IsStable()
+
+ if *debugCliFlag {
+ fmt.Printf(
+ "baseline responsiveness is instantaneously %s.\n",
+ utilities.Conditional(baselineStableResponsiveness, "stable", "unstable"))
+ }
+
+ baselineResponsivenessStabilizer.Interval()
+ }
+ }
+ }
+ baselineNetworkActivityCtxCancel()
+ baselineProberOperatorCtxCancel()
+
+ baselineRpm = rpm.CalculateRpm(baselineFauxSelfDownloadRtts,
+ baselineForeignDownloadRtts, specParameters.TrimmedMeanPct, specParameters.Percentile)
+
+ fmt.Printf("Baseline RPM: %5.0f (P%d)\n", baselineRpm.PNRpm, specParameters.Percentile)
+ fmt.Printf("Baseline RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n",
+ baselineRpm.MeanRpm, specParameters.TrimmedMeanPct)
+
+ }
+
// All tests will accumulate data to these series because it will all matter for RPM calculation!
selfRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
foreignRtts := series.NewWindowSeries[float64, uint64](series.Forever, 0)
@@ -484,8 +633,6 @@ func main() {
selfRttsQualityAttenuation = qualityattenuation.NewSimpleQualityAttenuation()
}
- globalNumericBucketGenerator := series.NewNumericBucketGenerator[uint64](0)
-
for _, direction := range directions {
timeoutDuration := specParameters.TestTimeout
@@ -953,6 +1100,19 @@ Gaming QoO: %.0f
fmt.Printf("Final RPM: %5.0f (Single-Sided %v%% Trimmed Mean)\n",
result.MeanRpm, specParameters.TrimmedMeanPct)
+ if *calculateRelativeRpm {
+ if baselineRpm == nil {
+ fmt.Printf("User requested relative RPM calculation but an unloaded RPM was not calculated.")
+ } else {
+ relativeRpmFactorP := utilities.AbsPercentDifference(result.PNRpm, baselineRpm.PNRpm)
+ relativeRpmFactorTM := utilities.AbsPercentDifference(result.MeanRpm, baselineRpm.MeanRpm)
+ fmt.Printf("Working Conditions RPM Effect: %5.0f%% (P%d)\n",
+ relativeRpmFactorP, specParameters.Percentile)
+ fmt.Printf("Working Conditions RPM Effect: %5.0f%% (Single-Sided %v%% Trimmed Mean)\n",
+ relativeRpmFactorTM, specParameters.TrimmedMeanPct)
+ }
+ }
+
// Stop the world.
operatingCtxCancel()
diff --git a/rpm/calculations.go b/rpm/calculations.go
index 8b3cf92..f3619dc 100644
--- a/rpm/calculations.go
+++ b/rpm/calculations.go
@@ -37,7 +37,7 @@ type Rpm[Data utilities.Number] struct {
func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered](
selfRtts series.WindowSeries[Data, Bucket], aggregatedForeignRtts series.WindowSeries[Data, Bucket], trimming uint, percentile uint,
-) Rpm[Data] {
+) *Rpm[Data] {
// There may be more than one round trip accumulated together. If that is the case,
// we will blow them apart in to three separate measurements and each one will just
// be 1 / 3.
@@ -82,7 +82,7 @@ func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered](
pnRpm := 60.0 / (float64(selfProbeRoundTripTimePN+foreignProbeRoundTripTimePN) / 2.0)
meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0)
- return Rpm[Data]{
+ return &Rpm[Data]{
SelfRttsTotal: selfRttsTotalCount, ForeignRttsTotal: foreignRttsTotalCount,
SelfRttsTrimmed: selfRttsTrimmedCount, ForeignRttsTrimmed: foreignRttsTrimmedCount,
SelfProbeRttPN: selfProbeRoundTripTimePN, ForeignProbeRttPN: foreignProbeRoundTripTimePN,
diff --git a/rpm/rpm.go b/rpm/rpm.go
index d1c07a5..3ebdefd 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -204,83 +204,95 @@ func ResponsivenessProber[BucketType utilities.Number](
}
var selfProbeConnection *lgc.LoadGeneratingConnection = nil
- func() {
- selfProbeConnectionCollection.Lock.Lock()
- defer selfProbeConnectionCollection.Lock.Unlock()
- selfProbeConnection, err = selfProbeConnectionCollection.GetRandom()
- if err != nil {
+ if selfProbeConnectionCollection != nil {
+ func() {
+ selfProbeConnectionCollection.Lock.Lock()
+ defer selfProbeConnectionCollection.Lock.Unlock()
+ selfProbeConnection, err = selfProbeConnectionCollection.GetRandom()
+ if err != nil {
+ if debug.IsWarn(debugging.Level) {
+ fmt.Printf(
+ "(%s) Failed to get a random %s load-generating connection on which to send a probe: %v.\n",
+ debugging.Prefix,
+ probeDirection,
+ err,
+ )
+ }
+ return
+ }
+ }()
+ }
+ if selfProbeConnectionCollection != nil && selfProbeConnection == nil {
+ return
+ }
+
+ var selfProbeDataPoint *probe.ProbeDataPoint = nil
+ if selfProbeConnection != nil {
+ // TODO: Make the following sanity check more than just a check.
+ // We only want to start a SelfUp probe on a connection that is
+ // in the RUNNING state.
+ if (*selfProbeConnection).Status() != lgc.LGC_STATUS_RUNNING {
if debug.IsWarn(debugging.Level) {
fmt.Printf(
- "(%s) Failed to get a random %s load-generating connection on which to send a probe: %v.\n",
+ "(%s) The selected random %s load-generating connection on which to send a probe was not running.\n",
debugging.Prefix,
probeDirection,
- err,
)
}
return
}
- }()
- if selfProbeConnection == nil {
- return
- }
- // TODO: Make the following sanity check more than just a check.
- // We only want to start a SelfUp probe on a connection that is
- // in the RUNNING state.
- if (*selfProbeConnection).Status() != lgc.LGC_STATUS_RUNNING {
- if debug.IsWarn(debugging.Level) {
+ if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) The selected random %s load-generating connection on which to send a probe was not running.\n",
+ "(%s) Selected %s load-generating connection with ID %d to send a self probe with Id %d.\n",
debugging.Prefix,
probeDirection,
+ (*selfProbeConnection).ClientId(),
+ probeCount,
)
}
- return
- }
-
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "(%s) Selected %s load-generating connection with ID %d to send a self probe with Id %d.\n",
- debugging.Prefix,
- probeDirection,
- (*selfProbeConnection).ClientId(),
+ selfProbeDataPoint, err = probe.Probe(
+ proberCtx,
+ (*selfProbeConnection).Client(),
+ selfProbeConfiguration.URL,
+ selfProbeConfiguration.Host,
+ utilities.Conditional(probeDirection == lgc.LGC_DOWN, probe.SelfDown, probe.SelfUp),
probeCount,
+ captureExtendedStats,
+ debugging,
)
- }
- selfProbeDataPoint, err := probe.Probe(
- proberCtx,
- (*selfProbeConnection).Client(),
- selfProbeConfiguration.URL,
- selfProbeConfiguration.Host,
- utilities.Conditional(probeDirection == lgc.LGC_DOWN, probe.SelfDown, probe.SelfUp),
- probeCount,
- captureExtendedStats,
- debugging,
- )
- if err != nil {
- // We may see an error here because the prober context was cancelled
- // and requests were attempting to be sent. This situation is not an
- // error (per se) so we will not log it as such.
+ if err != nil {
+ // We may see an error here because the prober context was cancelled
+ // and requests were attempting to be sent. This situation is not an
+ // error (per se) so we will not log it as such.
- if proberCtx.Err() != nil {
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "(%s) Failed to send a probe (id: %v) because the prober context was cancelled.\n",
- debugging.Prefix,
- probeCount,
- )
+ if proberCtx.Err() != nil {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) Failed to send a probe (id: %v) because the prober context was cancelled.\n",
+ debugging.Prefix,
+ probeCount,
+ )
+ }
+ return
}
+ fmt.Printf(
+ "(%s) There was an error sending a self probe with Id %d: %v\n",
+ debugging.Prefix,
+ probeCount,
+ err,
+ )
return
}
- fmt.Printf(
- "(%s) There was an error sending a self probe with Id %d: %v\n",
- debugging.Prefix,
- probeCount,
- err,
- )
- return
+ } else {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) Did not send a self probe at id %d of probes!\n",
+ debugging.Prefix,
+ probeCount,
+ )
+ }
}
-
if debug.IsDebug(debugging.Level) {
fmt.Printf(
"(%s) About to report results for round %d of probes!\n",
@@ -288,10 +300,9 @@ func ResponsivenessProber[BucketType utilities.Number](
probeCount,
)
}
-
dataPointsLock.Lock()
defer dataPointsLock.Unlock()
- // Now we have our four data points (three in the foreign probe data point and one in the self probe data point)
+ // Now we have our (maybe) four data points (three in the foreign probe data point and [maybe] one in the self probe data point)
if dataPoints != nil {
measurement := ResponsivenessProbeResult{
Foreign: foreignProbeDataPoint, Self: selfProbeDataPoint,