summaryrefslogtreecommitdiff
path: root/networkQuality.go
diff options
context:
space:
mode:
Diffstat (limited to 'networkQuality.go')
-rw-r--r--networkQuality.go924
1 files changed, 474 insertions, 450 deletions
diff --git a/networkQuality.go b/networkQuality.go
index f97b27c..776c0e7 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -22,6 +22,7 @@ import (
"net/url"
"os"
"runtime/pprof"
+ "strings"
"time"
"github.com/network-quality/goresponsiveness/ccw"
@@ -29,6 +30,7 @@ import (
"github.com/network-quality/goresponsiveness/constants"
"github.com/network-quality/goresponsiveness/datalogger"
"github.com/network-quality/goresponsiveness/debug"
+ "github.com/network-quality/goresponsiveness/direction"
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
"github.com/network-quality/goresponsiveness/ms"
@@ -68,10 +70,46 @@ var (
"Enable debugging.",
)
rpmtimeout = flag.Int(
- "rpmtimeout",
- constants.RPMCalculationTime,
- "Maximum time to spend calculating RPM (i.e., total test time.).",
+ "rpm.timeout",
+ constants.DefaultTestTime,
+ "Maximum time (in seconds) to spend calculating RPM (i.e., total test time.).",
)
+ rpmmad = flag.Int(
+ "rpm.mad",
+ constants.SpecParameterCliOptionsDefaults.Mad,
+ "Moving average distance -- number of intervals considered during stability calculations.",
+ )
+ rpmid = flag.Int(
+ "rpm.id",
+ constants.SpecParameterCliOptionsDefaults.Id,
+ "Duration of the interval between re-evaluating the network conditions (in seconds).",
+ )
+ rpmtmp = flag.Uint(
+ "rpm.tmp",
+ constants.SpecParameterCliOptionsDefaults.Tmp,
+ "Percent of measurements to trim when calculating statistics about network conditions (between 0 and 100).",
+ )
+ rpmsdt = flag.Float64(
+ "rpm.sdt",
+ constants.SpecParameterCliOptionsDefaults.Sdt,
+ "Cutoff in the standard deviation of measured values about network conditions between unstable and stable.",
+ )
+ rpmmnp = flag.Int(
+ "rpm.mnp",
+ constants.SpecParameterCliOptionsDefaults.Mnp,
+ "Maximimum number of parallel connections to establish when attempting to reach working conditions.",
+ )
+ rpmmps = flag.Int(
+ "rpm.mps",
+ constants.SpecParameterCliOptionsDefaults.Mps,
+ "Maximimum number of probes to send per second.",
+ )
+ rpmptc = flag.Float64(
+ "rpm.ptc",
+ constants.SpecParameterCliOptionsDefaults.Ptc,
+ "Percentage of the (discovered) total network capacity that probes are allowed to consume.",
+ )
+
sslKeyFileName = flag.String(
"ssl-key-file",
"",
@@ -97,11 +135,6 @@ var (
"",
"Store granular information about tests results in files with this basename. Time and information type will be appended (before the first .) to create separate log files. Disabled by default.",
)
- probeIntervalTime = flag.Uint(
- "probe-interval-time",
- 100,
- "Time (in ms) between probes (foreign and self).",
- )
connectToAddr = flag.String(
"connect-to",
"",
@@ -132,8 +165,26 @@ func main() {
os.Exit(0)
}
- timeoutDuration := time.Second * time.Duration(*rpmtimeout)
- timeoutAbsoluteTime := time.Now().Add(timeoutDuration)
+ var debugLevel debug.DebugLevel = debug.Error
+
+ if *debugCliFlag {
+ debugLevel = debug.Debug
+ }
+
+ specParameters, err := rpm.SpecParametersFromArguments(*rpmtimeout, *rpmmad, *rpmid,
+ *rpmtmp, *rpmsdt, *rpmmnp, *rpmmps, *rpmptc)
+ if err != nil {
+ fmt.Fprintf(
+ os.Stderr,
+ "Error: There was an error configuring the test with user-supplied parameters: %v\n",
+ err,
+ )
+ os.Exit(1)
+ }
+
+ if debug.IsDebug(debugLevel) {
+ fmt.Printf("Running the test according to the following spec parameters:\n%v\n", specParameters.ToString())
+ }
var configHostPort string
@@ -158,30 +209,14 @@ func main() {
// the others.
operatingCtx, operatingCtxCancel := context.WithCancel(context.Background())
- // The operator contexts. These contexts control the processes that manage
- // network activity but do not control network activity.
-
- uploadLoadGeneratorOperatorCtx, uploadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx)
- downloadLoadGeneratorOperatorCtx, downloadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx)
- proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx)
-
- // This context is used to control the network activity (i.e., it controls all
- // the connections that are open to do load generation and probing). Cancelling this context will close
- // all the network connections that are responsible for generating the load.
- networkActivityCtx, networkActivityCtxCancel := context.WithCancel(operatingCtx)
-
config := &config.Config{
ConnectToAddr: *connectToAddr,
}
- var debugLevel debug.DebugLevel = debug.Error
-
- if *debugCliFlag {
- debugLevel = debug.Debug
- }
if *calculateExtendedStats && !extendedstats.ExtendedStatsAvailable() {
*calculateExtendedStats = false
- fmt.Printf(
+ fmt.Fprintf(
+ os.Stderr,
"Warning: Calculation of extended statistics was requested but is not supported on this platform.\n",
)
}
@@ -223,53 +258,14 @@ func main() {
fmt.Printf("Configuration: %s\n", config)
}
- timeoutChannel := timeoutat.TimeoutAt(
- operatingCtx,
- timeoutAbsoluteTime,
- debugLevel,
- )
- if debug.IsDebug(debugLevel) {
- fmt.Printf("Test will end no later than %v\n", timeoutAbsoluteTime)
- }
-
- // print the banner
- dt := time.Now().UTC()
- fmt.Printf(
- "%s UTC Go Responsiveness to %s...\n",
- dt.Format("01-02-2006 15:04:05"),
- configHostPort,
- )
-
- if len(*profile) != 0 {
- f, err := os.Create(*profile)
- if err != nil {
- fmt.Fprintf(
- os.Stderr,
- "Error: Profiling requested but could not open the log file ( %s ) for writing: %v\n",
- *profile,
- err,
- )
- os.Exit(1)
- }
- pprof.StartCPUProfile(f)
- defer pprof.StopCPUProfile()
- }
- var selfProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] = nil
- var foreignProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] = nil
- var downloadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil
- var uploadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil
- var granularThroughputDataLogger datalogger.DataLogger[rpm.GranularThroughputDataPoint] = nil
+ downloadDirection := direction.Direction{}
+ uploadDirection := direction.Direction{}
// User wants to log data
if *dataLoggerBaseFileName != "" {
var err error = nil
unique := time.Now().UTC().Format("01-02-2006-15-04-05")
- dataLoggerSelfFilename := utilities.FilenameAppend(*dataLoggerBaseFileName, "-self-"+unique)
- dataLoggerForeignFilename := utilities.FilenameAppend(
- *dataLoggerBaseFileName,
- "-foreign-"+unique,
- )
dataLoggerDownloadThroughputFilename := utilities.FilenameAppend(
*dataLoggerBaseFileName,
"-throughput-download-"+unique,
@@ -278,100 +274,158 @@ func main() {
*dataLoggerBaseFileName,
"-throughput-upload-"+unique,
)
- dataLoggerGranularThroughputFilename := utilities.FilenameAppend(
+
+ dataLoggerDownloadGranularThroughputFilename := utilities.FilenameAppend(
*dataLoggerBaseFileName,
- "-throughput-granular-"+unique,
+ "-throughput-download-granular-"+unique,
)
- selfProbeDataLogger, err = datalogger.CreateCSVDataLogger[probe.ProbeDataPoint](
+ dataLoggerUploadGranularThroughputFilename := utilities.FilenameAppend(
+ *dataLoggerBaseFileName,
+ "-throughput-upload-granular-"+unique,
+ )
+
+ dataLoggerSelfFilename := utilities.FilenameAppend(*dataLoggerBaseFileName, "-self-"+unique)
+ dataLoggerForeignFilename := utilities.FilenameAppend(
+ *dataLoggerBaseFileName,
+ "-foreign-"+unique,
+ )
+
+ selfProbeDataLogger, err := datalogger.CreateCSVDataLogger[probe.ProbeDataPoint](
dataLoggerSelfFilename,
)
if err != nil {
- fmt.Printf(
+ fmt.Fprintf(
+ os.Stderr,
"Warning: Could not create the file for storing self probe results (%s). Disabling functionality.\n",
dataLoggerSelfFilename,
)
selfProbeDataLogger = nil
}
+ uploadDirection.SelfProbeDataLogger = selfProbeDataLogger
+ downloadDirection.SelfProbeDataLogger = selfProbeDataLogger
- foreignProbeDataLogger, err = datalogger.CreateCSVDataLogger[probe.ProbeDataPoint](
+ foreignProbeDataLogger, err := datalogger.CreateCSVDataLogger[probe.ProbeDataPoint](
dataLoggerForeignFilename,
)
if err != nil {
- fmt.Printf(
+ fmt.Fprintf(
+ os.Stderr,
"Warning: Could not create the file for storing foreign probe results (%s). Disabling functionality.\n",
dataLoggerForeignFilename,
)
foreignProbeDataLogger = nil
}
+ uploadDirection.ForeignProbeDataLogger = selfProbeDataLogger
+ downloadDirection.ForeignProbeDataLogger = foreignProbeDataLogger
- downloadThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint](
+ downloadDirection.ThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint](
dataLoggerDownloadThroughputFilename,
)
if err != nil {
- fmt.Printf(
+ fmt.Fprintf(
+ os.Stderr,
"Warning: Could not create the file for storing download throughput results (%s). Disabling functionality.\n",
dataLoggerDownloadThroughputFilename,
)
- downloadThroughputDataLogger = nil
+ downloadDirection.ThroughputDataLogger = nil
}
-
- uploadThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint](
+ uploadDirection.ThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint](
dataLoggerUploadThroughputFilename,
)
if err != nil {
- fmt.Printf(
+ fmt.Fprintf(
+ os.Stderr,
"Warning: Could not create the file for storing upload throughput results (%s). Disabling functionality.\n",
dataLoggerUploadThroughputFilename,
)
- uploadThroughputDataLogger = nil
+ uploadDirection.ThroughputDataLogger = nil
}
- granularThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.GranularThroughputDataPoint](
- dataLoggerGranularThroughputFilename,
+ downloadDirection.GranularThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.GranularThroughputDataPoint](
+ dataLoggerDownloadGranularThroughputFilename,
)
if err != nil {
- fmt.Printf(
- "Warning: Could not create the file for storing granular throughput results (%s). Disabling functionality.\n",
- dataLoggerGranularThroughputFilename,
+ fmt.Fprintf(
+ os.Stderr,
+ "Warning: Could not create the file for storing download granular throughput results (%s). Disabling functionality.\n",
+ dataLoggerDownloadGranularThroughputFilename,
+ )
+ downloadDirection.GranularThroughputDataLogger = nil
+ }
+ uploadDirection.GranularThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.GranularThroughputDataPoint](
+ dataLoggerUploadGranularThroughputFilename,
+ )
+ if err != nil {
+ fmt.Fprintf(
+ os.Stderr,
+ "Warning: Could not create the file for storing upload granular throughput results (%s). Disabling functionality.\n",
+ dataLoggerUploadGranularThroughputFilename,
)
- granularThroughputDataLogger = nil
+ uploadDirection.GranularThroughputDataLogger = nil
}
+
}
// If, for some reason, the data loggers are nil, make them Null Data Loggers so that we don't have conditional
// code later.
- if selfProbeDataLogger == nil {
- selfProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]()
+ if downloadDirection.SelfProbeDataLogger == nil {
+ downloadDirection.SelfProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]()
+ }
+ if uploadDirection.SelfProbeDataLogger == nil {
+ uploadDirection.SelfProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]()
}
- if foreignProbeDataLogger == nil {
- foreignProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]()
+
+ if downloadDirection.ForeignProbeDataLogger == nil {
+ downloadDirection.ForeignProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]()
}
- if downloadThroughputDataLogger == nil {
- downloadThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]()
+ if uploadDirection.ForeignProbeDataLogger == nil {
+ uploadDirection.ForeignProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]()
}
- if uploadThroughputDataLogger == nil {
- uploadThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]()
+
+ if downloadDirection.ThroughputDataLogger == nil {
+ downloadDirection.ThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]()
+ }
+ if uploadDirection.ThroughputDataLogger == nil {
+ uploadDirection.ThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]()
+ }
+
+ if downloadDirection.GranularThroughputDataLogger == nil {
+ downloadDirection.GranularThroughputDataLogger =
+ datalogger.CreateNullDataLogger[rpm.GranularThroughputDataPoint]()
}
- if granularThroughputDataLogger == nil {
- granularThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.GranularThroughputDataPoint]()
+ if uploadDirection.GranularThroughputDataLogger == nil {
+ uploadDirection.GranularThroughputDataLogger =
+ datalogger.CreateNullDataLogger[rpm.GranularThroughputDataPoint]()
}
/*
* Create (and then, ironically, name) two anonymous functions that, when invoked,
* will create load-generating connections for upload/download
*/
- generateLgdc := func() lgc.LoadGeneratingConnection {
+ downloadDirection.CreateLgdc = func() lgc.LoadGeneratingConnection {
lgd := lgc.NewLoadGeneratingConnectionDownload(config.Urls.LargeUrl,
sslKeyFileConcurrentWriter, config.ConnectToAddr, *insecureSkipVerify)
return &lgd
}
-
- generateLguc := func() lgc.LoadGeneratingConnection {
+ uploadDirection.CreateLgdc = func() lgc.LoadGeneratingConnection {
lgu := lgc.NewLoadGeneratingConnectionUpload(config.Urls.UploadUrl,
sslKeyFileConcurrentWriter, config.ConnectToAddr, *insecureSkipVerify)
return &lgu
}
+ downloadDirection.DirectionDebugging = debug.NewDebugWithPrefix(debugLevel, "download")
+ downloadDirection.ProbeDebugging = debug.NewDebugWithPrefix(debugLevel, "download probe")
+
+ uploadDirection.DirectionDebugging = debug.NewDebugWithPrefix(debugLevel, "upload")
+ uploadDirection.ProbeDebugging = debug.NewDebugWithPrefix(debugLevel, "upload probe")
+
+ downloadDirection.Lgcc = lgc.NewLoadGeneratingConnectionCollection()
+ uploadDirection.Lgcc = lgc.NewLoadGeneratingConnectionCollection()
+
+ // We do not do tracing on upload connections so there are no extended stats for those connections!
+ uploadDirection.ExtendedStatsEligible = false
+ downloadDirection.ExtendedStatsEligible = true
+
generateSelfProbeConfiguration := func() probe.ProbeConfiguration {
return probe.ProbeConfiguration{
URL: config.Urls.SmallUrl,
@@ -388,421 +442,391 @@ func main() {
}
}
- var downloadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "download")
- var uploadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "upload")
- var combinedProbeDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "combined probe")
-
- downloadLoadGeneratingConnectionCollection := lgc.NewLoadGeneratingConnectionCollection()
- uploadLoadGeneratingConnectionCollection := lgc.NewLoadGeneratingConnectionCollection()
+ downloadDirection.DirectionLabel = "Download"
+ uploadDirection.DirectionLabel = "Upload"
- // TODO: Separate contexts for load generation and data collection. If we do that, if either of the two
- // data collection go routines stops well before the other, they will continue to send probes and we can
- // generate additional information!
+ directions := []*direction.Direction{&downloadDirection, &uploadDirection}
- selfDownProbeConnectionCommunicationChannel, downloadThroughputChannel := rpm.LoadGenerator(
- networkActivityCtx,
- downloadLoadGeneratorOperatorCtx,
- time.Second,
- generateLgdc,
- &downloadLoadGeneratingConnectionCollection,
- *calculateExtendedStats,
- downloadDebugging,
- )
- selfUpProbeConnectionCommunicationChannel, uploadThroughputChannel := rpm.LoadGenerator(
- networkActivityCtx,
- uploadLoadGeneratorOperatorCtx,
- time.Second,
- generateLguc,
- &uploadLoadGeneratingConnectionCollection,
- *calculateExtendedStats,
- uploadDebugging,
+ // print the banner
+ dt := time.Now().UTC()
+ fmt.Printf(
+ "%s UTC Go Responsiveness to %s...\n",
+ dt.Format("01-02-2006 15:04:05"),
+ configHostPort,
)
- // Handles for the first connection that the load-generating go routines (both up and
- // download) open are passed back on the self[Down|Up]ProbeConnectionCommunicationChannel
- // so that we can then start probes on those connections.
- selfDownProbeConnection := <-selfDownProbeConnectionCommunicationChannel
- selfUpProbeConnection := <-selfUpProbeConnectionCommunicationChannel
+ if len(*profile) != 0 {
+ f, err := os.Create(*profile)
+ if err != nil {
+ fmt.Fprintf(
+ os.Stderr,
+ "Error: Profiling requested but could not open the log file ( %s ) for writing: %v\n",
+ *profile,
+ err,
+ )
+ os.Exit(1)
+ }
+ pprof.StartCPUProfile(f)
+ defer pprof.StopCPUProfile()
+ }
+
+ // All tests will accumulate data to these series because it will all matter for RPM calculation!
+ selfRtts := ms.NewInfiniteMathematicalSeries[float64]()
+ foreignRtts := ms.NewInfiniteMathematicalSeries[float64]()
- // The combined prober will handle launching, monitoring, etc of *both* the self and foreign
- // probes.
- probeDataPointsChannel := rpm.CombinedProber(
- proberOperatorCtx,
- networkActivityCtx,
- generateForeignProbeConfiguration,
- generateSelfProbeConfiguration,
- selfDownProbeConnection,
- selfUpProbeConnection,
- time.Millisecond*(time.Duration(*probeIntervalTime)),
- sslKeyFileConcurrentWriter,
- *calculateExtendedStats,
- combinedProbeDebugging,
- )
+ var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil
+ if *printQualityAttenuation {
+ selfRttsQualityAttenuation = qualityattenuation.NewSimpleQualityAttenuation()
+ }
- responsivenessIsStable := false
- downloadThroughputIsStable := false
- uploadThroughputIsStable := false
+ for _, direction := range directions {
- // Test parameters:
- // 1. I: The number of previous instantaneous measurements to consider when generating
- // the so-called instantaneous moving averages.
- // 2. K: The number of instantaneous moving averages to consider when determining stability.
- // 3: S: The standard deviation cutoff used to determine stability among the K preceding
- // moving averages of a measurement.
- // See
+ timeoutDuration := specParameters.TestTimeout
+ timeoutAbsoluteTime := time.Now().Add(timeoutDuration)
- throughputI := constants.InstantaneousThroughputMeasurementCount
- probeI := constants.InstantaneousProbeMeasurementCount
- K := constants.InstantaneousMovingAverageStabilityCount
- S := constants.StabilityStandardDeviation
+ timeoutChannel := timeoutat.TimeoutAt(
+ operatingCtx,
+ timeoutAbsoluteTime,
+ debugLevel,
+ )
+ if debug.IsDebug(debugLevel) {
+ fmt.Printf("%s Test will end no later than %v\n", direction.DirectionLabel, timeoutAbsoluteTime)
+ }
- downloadThroughputStabilizerDebugConfig :=
- debug.NewDebugWithPrefix(debug.Debug, "Download Throughput Stabilizer")
- downloadThroughputStabilizerDebugLevel := debug.Error
- if *debugCliFlag {
- downloadThroughputStabilizerDebugLevel = debug.Debug
- }
- downloadThroughputStabilizer := stabilizer.NewThroughputStabilizer(throughputI, K, S,
- downloadThroughputStabilizerDebugLevel, downloadThroughputStabilizerDebugConfig)
+ throughputCtx, throughputCtxCancel := context.WithCancel(operatingCtx)
+ proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx)
- uploadThroughputStabilizerDebugConfig :=
- debug.NewDebugWithPrefix(debug.Debug, "Upload Throughput Stabilizer")
- uploadThroughputStabilizerDebugLevel := debug.Error
- if *debugCliFlag {
- uploadThroughputStabilizerDebugLevel = debug.Debug
- }
- uploadThroughputStabilizer := stabilizer.NewThroughputStabilizer(throughputI, K, S,
- uploadThroughputStabilizerDebugLevel, uploadThroughputStabilizerDebugConfig)
+ // This context is used to control the network activity (i.e., it controls all
+ // the connections that are open to do load generation and probing). Cancelling this context will close
+ // all the network connections that are responsible for generating the load.
+ networkActivityCtx, networkActivityCtxCancel := context.WithCancel(operatingCtx)
- probeStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, "Probe Stabilizer")
- probeStabilizerDebugLevel := debug.Error
- if *debugCliFlag {
- probeStabilizerDebugLevel = debug.Debug
- }
- probeStabilizer := stabilizer.NewProbeStabilizer(probeI, K, S, probeStabilizerDebugLevel, probeStabilizerDebugConfig)
+ throughputGeneratorCtx, throughputGeneratorCtxCancel := context.WithCancel(throughputCtx)
- selfRtts := ms.NewInfiniteMathematicalSeries[float64]()
- selfRttsQualityAttenuation := qualityattenuation.NewSimpleQualityAttenuation()
- foreignRtts := ms.NewInfiniteMathematicalSeries[float64]()
+ lgStabilizationCommunicationChannel := rpm.LoadGenerator(
+ throughputCtx,
+ networkActivityCtx,
+ throughputGeneratorCtx,
+ specParameters.EvalInterval,
+ direction.CreateLgdc,
+ &direction.Lgcc,
+ specParameters.MaxParallelConns,
+ *calculateExtendedStats,
+ direction.DirectionDebugging,
+ )
- // For later debugging output, record the last throughputs on load-generating connectings
- // and the number of open connections.
- lastUploadThroughputRate := float64(0)
- lastUploadThroughputOpenConnectionCount := int(0)
- lastDownloadThroughputRate := float64(0)
- lastDownloadThroughputOpenConnectionCount := int(0)
+ throughputStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug,
+ fmt.Sprintf("%v Throughput Stabilizer", direction.DirectionLabel))
+ downloadThroughputStabilizerDebugLevel := debug.Error
+ if *debugCliFlag {
+ downloadThroughputStabilizerDebugLevel = debug.Debug
+ }
+ throughputStabilizer := stabilizer.NewStabilizer[float64](
+ uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance, 0, "bytes",
+ downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig)
- // Every time that there is a new measurement, the possibility exists that the measurements become unstable.
- // This allows us to continue pushing until *everything* is stable at the same time.
-timeout:
- for !(responsivenessIsStable && downloadThroughputIsStable && uploadThroughputIsStable) {
- select {
+ responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug,
+ fmt.Sprintf("%v Responsiveness Stabilizer", direction.DirectionLabel))
+ responsivenessStabilizerDebugLevel := debug.Error
+ if *debugCliFlag {
+ responsivenessStabilizerDebugLevel = debug.Debug
+ }
+ responsivenessStabilizer := stabilizer.NewStabilizer[int64](
+ uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance,
+ specParameters.TrimmedMeanPct, "milliseconds",
+ responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig)
- case downloadThroughputMeasurement := <-downloadThroughputChannel:
- {
- downloadThroughputStabilizer.AddMeasurement(downloadThroughputMeasurement)
- downloadThroughputIsStable = downloadThroughputStabilizer.IsStable()
- if *debugCliFlag {
- fmt.Printf(
- "################# Download is instantaneously %s.\n",
- utilities.Conditional(downloadThroughputIsStable, "stable", "unstable"))
- }
- downloadThroughputDataLogger.LogRecord(downloadThroughputMeasurement)
- for i := range downloadThroughputMeasurement.GranularThroughputDataPoints {
- datapoint := downloadThroughputMeasurement.GranularThroughputDataPoints[i]
- datapoint.Direction = "Download"
- granularThroughputDataLogger.LogRecord(datapoint)
- }
+ // For later debugging output, record the last throughputs on load-generating connectings
+ // and the number of open connections.
+ lastThroughputRate := float64(0)
+ lastThroughputOpenConnectionCount := int(0)
- lastDownloadThroughputRate = downloadThroughputMeasurement.Throughput
- lastDownloadThroughputOpenConnectionCount =
- downloadThroughputMeasurement.Connections
- }
+ lg_timeout:
+ for !direction.StableThroughput {
+ select {
+ case throughputMeasurement := <-lgStabilizationCommunicationChannel:
+ {
+ throughputStabilizer.AddMeasurement(
+ throughputMeasurement.Throughput)
+ direction.StableThroughput = throughputStabilizer.IsStable()
+ if *debugCliFlag {
+ fmt.Printf(
+ "################# %v is instantaneously %s.\n", direction.DirectionLabel,
+ utilities.Conditional(direction.StableThroughput, "stable", "unstable"))
+ }
+ direction.ThroughputDataLogger.LogRecord(throughputMeasurement)
+ for i := range throughputMeasurement.GranularThroughputDataPoints {
+ datapoint := throughputMeasurement.GranularThroughputDataPoints[i]
+ datapoint.Direction = "Download"
+ direction.GranularThroughputDataLogger.LogRecord(datapoint)
+ }
+
+ lastThroughputRate = throughputMeasurement.Throughput
+ lastThroughputOpenConnectionCount = throughputMeasurement.Connections
- case uploadThroughputMeasurement := <-uploadThroughputChannel:
- {
- uploadThroughputStabilizer.AddMeasurement(uploadThroughputMeasurement)
- uploadThroughputIsStable = uploadThroughputStabilizer.IsStable()
- if *debugCliFlag {
- fmt.Printf(
- "################# Upload is instantaneously %s.\n",
- utilities.Conditional(uploadThroughputIsStable, "stable", "unstable"))
+ if direction.StableThroughput {
+ throughputGeneratorCtxCancel()
+ }
}
- uploadThroughputDataLogger.LogRecord(uploadThroughputMeasurement)
- for i := range uploadThroughputMeasurement.GranularThroughputDataPoints {
- datapoint := uploadThroughputMeasurement.GranularThroughputDataPoints[i]
- datapoint.Direction = "Upload"
- granularThroughputDataLogger.LogRecord(datapoint)
+ case <-timeoutChannel:
+ {
+ break lg_timeout
}
+ }
+ }
- lastUploadThroughputRate = uploadThroughputMeasurement.Throughput
- lastUploadThroughputOpenConnectionCount = uploadThroughputMeasurement.Connections
+ if direction.StableThroughput {
+ if *debugCliFlag {
+ fmt.Printf("################# Throughput is stable; beginning responsiveness testing.\n")
}
- case probeMeasurement := <-probeDataPointsChannel:
- {
- probeStabilizer.AddMeasurement(probeMeasurement)
+ } else {
+ fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Adding 15 seconds to calculate speculative RPM results.\n")
+ speculativeTimeoutDuration := time.Second * 15
+ speculativeAbsoluteTimeoutTime := time.Now().Add(speculativeTimeoutDuration)
+ timeoutChannel = timeoutat.TimeoutAt(
+ operatingCtx,
+ speculativeAbsoluteTimeoutTime,
+ debugLevel,
+ )
+ }
- // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy
- // is *actually* important, but it can't hurt?
- responsivenessIsStable = probeStabilizer.IsStable()
+ perDirectionSelfRtts := ms.NewInfiniteMathematicalSeries[float64]()
+ perDirectionForeignRtts := ms.NewInfiniteMathematicalSeries[float64]()
- if *debugCliFlag {
- fmt.Printf(
- "################# Responsiveness is instantaneously %s.\n",
- utilities.Conditional(responsivenessIsStable, "stable", "unstable"))
- }
- if probeMeasurement.Type == probe.Foreign {
+ responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber(
+ proberOperatorCtx,
+ networkActivityCtx,
+ generateForeignProbeConfiguration,
+ generateSelfProbeConfiguration,
+ &direction.Lgcc,
+ direction.CreateLgdc().Direction(), // TODO: This could be better!
+ specParameters.ProbeInterval,
+ sslKeyFileConcurrentWriter,
+ *calculateExtendedStats,
+ direction.ProbeDebugging,
+ )
+
+ responsiveness_timeout:
+ for !direction.StableResponsiveness {
+ select {
+ case probeMeasurement := <-responsivenessStabilizationCommunicationChannel:
+ {
+ foreignDataPoint := probeMeasurement.First
+ selfDataPoint := probeMeasurement.Second
+
+ responsivenessStabilizer.AddMeasurement(
+ (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds())
+
+ // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy
+ // is *actually* important, but it can't hurt?
+ direction.StableResponsiveness = responsivenessStabilizer.IsStable()
+
+ if *debugCliFlag {
+ fmt.Printf(
+ "################# Responsiveness is instantaneously %s.\n",
+ utilities.Conditional(direction.StableResponsiveness, "stable", "unstable"))
+ }
// There may be more than one round trip accumulated together. If that is the case,
// we will blow them apart in to three separate measurements and each one will just
// be 1 / measurement.RoundTripCount of the total length.
- for range utilities.Iota(0, int(probeMeasurement.RoundTripCount)) {
- foreignRtts.AddElement(probeMeasurement.Duration.Seconds() /
- float64(probeMeasurement.RoundTripCount))
+ for range utilities.Iota(0, int(foreignDataPoint.RoundTripCount)) {
+ foreignRtts.AddElement(foreignDataPoint.Duration.Seconds() /
+ float64(foreignDataPoint.RoundTripCount))
+ perDirectionForeignRtts.AddElement(foreignDataPoint.Duration.Seconds() /
+ float64(foreignDataPoint.RoundTripCount))
}
- } else if probeMeasurement.Type == probe.SelfDown || probeMeasurement.Type == probe.SelfUp {
- selfRtts.AddElement(probeMeasurement.Duration.Seconds())
- if *printQualityAttenuation {
- selfRttsQualityAttenuation.AddSample(probeMeasurement.Duration.Seconds())
+ selfRtts.AddElement(selfDataPoint.Duration.Seconds())
+ perDirectionSelfRtts.AddElement(selfDataPoint.Duration.Seconds())
+
+ if selfRttsQualityAttenuation != nil {
+ selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds())
}
+
+ direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint)
+ direction.SelfProbeDataLogger.LogRecord(*selfDataPoint)
}
+ case throughputMeasurement := <-lgStabilizationCommunicationChannel:
+ {
+ if *debugCliFlag {
+ fmt.Printf("Adding a throughput measurement.\n")
+ }
+ // There may be more than one round trip accumulated together. If that is the case,
+ direction.ThroughputDataLogger.LogRecord(throughputMeasurement)
+ for i := range throughputMeasurement.GranularThroughputDataPoints {
+ datapoint := throughputMeasurement.GranularThroughputDataPoints[i]
+ datapoint.Direction = direction.DirectionLabel
+ direction.GranularThroughputDataLogger.LogRecord(datapoint)
+ }
+
+ lastThroughputRate = throughputMeasurement.Throughput
+ lastThroughputOpenConnectionCount = throughputMeasurement.Connections
- if probeMeasurement.Type == probe.Foreign {
- foreignProbeDataLogger.LogRecord(probeMeasurement)
- } else if probeMeasurement.Type == probe.SelfDown || probeMeasurement.Type == probe.SelfUp {
- selfProbeDataLogger.LogRecord(probeMeasurement)
}
- }
- case <-timeoutChannel:
- {
- break timeout
+ case <-timeoutChannel:
+ {
+ break responsiveness_timeout
+ }
}
}
- }
-
- // TODO: Reset timeout to RPM timeout stat?
- // Did the test run to stability?
- testRanToStability := (downloadThroughputIsStable && uploadThroughputIsStable && responsivenessIsStable)
+ // Did the test run to stability?
+ testRanToStability := direction.StableThroughput && direction.StableResponsiveness
- if *debugCliFlag {
- fmt.Printf("Stopping all the load generating data generators (stability: %s).\n",
- utilities.Conditional(testRanToStability, "success", "failure"))
- }
-
- /* At this point there are
- 1. Load generators running
- -- uploadLoadGeneratorOperatorCtx
- -- downloadLoadGeneratorOperatorCtx
- 2. Network connections opened by those load generators:
- -- lgNetworkActivityCtx
- 3. Probes
- -- proberCtx
- */
+ if *debugCliFlag {
+ fmt.Printf("Stopping all the load generating data generators (stability: %s).\n",
+ utilities.Conditional(testRanToStability, "success", "failure"))
+ }
- // First, stop the load generator and the probe operators (but *not* the network activity)
- proberOperatorCtxCancel()
- downloadLoadGeneratorOperatorCtxCancel()
- uploadLoadGeneratorOperatorCtxCancel()
+ /* At this point there are
+ 1. Load generators running
+ -- uploadLoadGeneratorOperatorCtx
+ -- downloadLoadGeneratorOperatorCtx
+ 2. Network connections opened by those load generators:
+ -- lgNetworkActivityCtx
+ 3. Probes
+ -- proberCtx
+ */
- // Second, calculate the extended stats (if the user requested)
+ // First, stop the load generator and the probe operators (but *not* the network activity)
+ proberOperatorCtxCancel()
+ throughputCtxCancel()
- extendedStats := extendedstats.AggregateExtendedStats{}
- if *calculateExtendedStats {
- if extendedstats.ExtendedStatsAvailable() {
- func() {
- // Put inside an IIFE so that we can use a defer!
- downloadLoadGeneratingConnectionCollection.Lock.Lock()
- defer downloadLoadGeneratingConnectionCollection.Lock.Unlock()
+ // Second, calculate the extended stats (if the user requested and they are available for the direction)
+ extendedStats := extendedstats.AggregateExtendedStats{}
+ if *calculateExtendedStats && direction.ExtendedStatsEligible {
+ if extendedstats.ExtendedStatsAvailable() {
+ func() {
+ // Put inside an IIFE so that we can use a defer!
+ direction.Lgcc.Lock.Lock()
+ defer direction.Lgcc.Lock.Unlock()
- // Note: We do not trace upload connections!
- for i := 0; i < downloadLoadGeneratingConnectionCollection.Len(); i++ {
- // Assume that extended statistics are available -- the check was done explicitly at
- // program startup if the calculateExtendedStats flag was set by the user on the command line.
- currentLgc, _ := downloadLoadGeneratingConnectionCollection.Get(i)
- if err := extendedStats.IncorporateConnectionStats(
- (*currentLgc).Stats().ConnInfo.Conn); err != nil {
+ // Note: We do not trace upload connections!
+ downloadLgcCount, err := direction.Lgcc.Len()
+ if err != nil {
fmt.Fprintf(
os.Stderr,
- "Warning: Could not add extended stats for the connection: %v\n",
- err,
+ "Warning: Could not calculate the number of download load-generating connections; aborting extended stats preparation.\n",
)
+ return
}
- }
- }()
- } else {
- // TODO: Should we just log here?
- panic("Extended stats are not available but the user requested their calculation.")
+ for i := 0; i < downloadLgcCount; i++ {
+ // Assume that extended statistics are available -- the check was done explicitly at
+ // program startup if the calculateExtendedStats flag was set by the user on the command line.
+ currentLgc, _ := direction.Lgcc.Get(i)
+ if err := extendedStats.IncorporateConnectionStats(
+ (*currentLgc).Stats().ConnInfo.Conn); err != nil {
+ fmt.Fprintf(
+ os.Stderr,
+ "Warning: Could not add extended stats for the connection: %v\n",
+ err,
+ )
+ }
+ }
+ }()
+ } else {
+ // TODO: Should we just log here?
+ panic("Extended stats are not available but the user requested their calculation.")
+ }
}
- }
- // Third, stop the network connections opened by the load generators and probers.
- networkActivityCtxCancel()
+ // Third, stop the network connections opened by the load generators and probers.
+ networkActivityCtxCancel()
- // Finally, stop the world.
- operatingCtxCancel()
-
- // Calculate the RPM
-
- // First, let's do a double-sided trim of the top/bottom 10% of our measurements.
- selfRttsTotalCount := selfRtts.Len()
- foreignRttsTotalCount := foreignRtts.Len()
-
- selfRttsTrimmed := selfRtts.DoubleSidedTrim(10)
- foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(10)
-
- selfRttsTrimmedCount := selfRttsTrimmed.Len()
- foreignRttsTrimmedCount := foreignRttsTrimmed.Len()
-
- // Then, let's take the mean of those ...
- selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage()
- foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage()
+ fmt.Printf(
+ "%v: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
+ direction.DirectionLabel,
+ utilities.ToMbps(lastThroughputRate),
+ utilities.ToMBps(lastThroughputRate),
+ lastThroughputOpenConnectionCount,
+ )
- // Second, let's do the P90 calculations.
- selfProbeRoundTripTimeP90 := selfRtts.Percentile(90)
- foreignProbeRoundTripTimeP90 := foreignRtts.Percentile(90)
+ if *calculateExtendedStats {
+ fmt.Println(extendedStats.Repr())
+ }
+ directionResult := rpm.CalculateRpm(perDirectionSelfRtts, perDirectionForeignRtts, specParameters.TrimmedMeanPct, 90)
+ if *debugCliFlag {
+ fmt.Printf("(%s RPM Calculation stats): %v\n", direction.DirectionLabel, directionResult.ToString())
+ }
- // Note: The specification indicates that we want to calculate the foreign probes as such:
- // 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign
- // where tcp_foreign, tls_foreign, http_foreign are the P90 RTTs for the connection
- // of the tcp, tls and http connections, respectively. However, we cannot break out
- // the individual RTTs so we assume that they are roughly equal.
+ if !testRanToStability {
+ fmt.Printf("Test did not run to stability, these results are estimates:\n")
+ }
- // This is 60 because we measure in seconds not ms
- p90Rpm := 60.0 / (float64(selfProbeRoundTripTimeP90+foreignProbeRoundTripTimeP90) / 2.0)
- meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0)
+ fmt.Printf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, directionResult.PNRpm, 90)
+ fmt.Printf("%s RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel,
+ directionResult.MeanRpm, specParameters.TrimmedMeanPct)
- if *debugCliFlag {
- fmt.Printf(
- `Total Self Probes: %d
-Total Foreign Probes: %d
-Trimmed Self Probes Count: %d
-Trimmed Foreign Probes Count: %d
-P90 Self RTT: %f
-P90 Foreign RTT: %f
-Trimmed Mean Self RTT: %f
-Trimmed Mean Foreign RTT: %f
-`,
- selfRttsTotalCount,
- foreignRttsTotalCount,
- selfRttsTrimmedCount,
- foreignRttsTrimmedCount,
- selfProbeRoundTripTimeP90,
- foreignProbeRoundTripTimeP90,
- selfProbeRoundTripTimeMean,
- foreignProbeRoundTripTimeMean,
- )
- }
+ if len(*prometheusStatsFilename) > 0 {
+ var testStable int
+ if testRanToStability {
+ testStable = 1
+ }
+ var buffer bytes.Buffer
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_test_stable %d\n",
+ strings.ToLower(direction.DirectionLabel), testStable))
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_p90_rpm_value %d\n",
+ strings.ToLower(direction.DirectionLabel), int64(directionResult.PNRpm)))
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_trimmed_rpm_value %d\n",
+ strings.ToLower(direction.DirectionLabel),
+ int64(directionResult.MeanRpm)))
- if *printQualityAttenuation {
- fmt.Println("Quality Attenuation Statistics:")
- fmt.Printf(
- `Number of losses: %d
-Number of samples: %d
-Loss: %f
-Min: %.6f
-Max: %.6f
-Mean: %.6f
-Variance: %.6f
-Standard Deviation: %.6f
-PDV(90): %.6f
-PDV(99): %.6f
-P(90): %.6f
-P(99): %.6f
-`, selfRttsQualityAttenuation.GetNumberOfLosses(),
- selfRttsQualityAttenuation.GetNumberOfSamples(),
- selfRttsQualityAttenuation.GetLossPercentage(),
- selfRttsQualityAttenuation.GetMinimum(),
- selfRttsQualityAttenuation.GetMaximum(),
- selfRttsQualityAttenuation.GetAverage(),
- selfRttsQualityAttenuation.GetVariance(),
- selfRttsQualityAttenuation.GetStandardDeviation(),
- selfRttsQualityAttenuation.GetPDV(90),
- selfRttsQualityAttenuation.GetPDV(99),
- selfRttsQualityAttenuation.GetPercentile(90),
- selfRttsQualityAttenuation.GetPercentile(99))
- }
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_bits_per_second %d\n",
+ strings.ToLower(direction.DirectionLabel), int64(lastThroughputRate)))
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_connections %d\n",
+ strings.ToLower(direction.DirectionLabel),
+ int64(lastThroughputOpenConnectionCount)))
- if !testRanToStability {
- fmt.Printf("Test did not run to stability, these results are estimates:\n")
- }
+ if err := os.WriteFile(*prometheusStatsFilename, buffer.Bytes(), 0o644); err != nil {
+ fmt.Printf("could not write %s: %s", *prometheusStatsFilename, err)
+ os.Exit(1)
+ }
+ }
- fmt.Printf("RPM: %5.0f (P90)\n", p90Rpm)
- fmt.Printf("RPM: %5.0f (Double-Sided 10%% Trimmed Mean)\n", meanRpm)
+ direction.ThroughputDataLogger.Export()
+ if *debugCliFlag {
+ fmt.Printf("Closing the %v throughput data logger.\n", direction.DirectionLabel)
+ }
+ direction.ThroughputDataLogger.Close()
- fmt.Printf(
- "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
- utilities.ToMbps(lastDownloadThroughputRate),
- utilities.ToMBps(lastDownloadThroughputRate),
- lastDownloadThroughputOpenConnectionCount,
- )
- fmt.Printf(
- "Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
- utilities.ToMbps(lastUploadThroughputRate),
- utilities.ToMBps(lastUploadThroughputRate),
- lastUploadThroughputOpenConnectionCount,
- )
+ direction.GranularThroughputDataLogger.Export()
+ if *debugCliFlag {
+ fmt.Printf("Closing the %v granular throughput data logger.\n", direction.DirectionLabel)
+ }
+ direction.GranularThroughputDataLogger.Close()
- if *calculateExtendedStats {
- fmt.Println(extendedStats.Repr())
+ if *debugCliFlag {
+ fmt.Printf("In debugging mode, we will cool down between tests.\n")
+ time.Sleep(constants.CooldownPeriod)
+ fmt.Printf("Done cooling down.\n")
+ }
}
- selfProbeDataLogger.Export()
- if *debugCliFlag {
- fmt.Printf("Closing the self data logger.\n")
- }
- selfProbeDataLogger.Close()
+ result := rpm.CalculateRpm(selfRtts, foreignRtts, specParameters.TrimmedMeanPct, 90)
- foreignProbeDataLogger.Export()
if *debugCliFlag {
- fmt.Printf("Closing the foreign data logger.\n")
+ fmt.Printf("(Final RPM Calculation stats): %v\n", result.ToString())
}
- foreignProbeDataLogger.Close()
- downloadThroughputDataLogger.Export()
- if *debugCliFlag {
- fmt.Printf("Closing the download throughput data logger.\n")
- }
- downloadThroughputDataLogger.Close()
+ fmt.Printf("Final RPM: %5.0f (P%d)\n", result.PNRpm, 90)
+ fmt.Printf("Final RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n",
+ result.MeanRpm, specParameters.TrimmedMeanPct)
- uploadThroughputDataLogger.Export()
- if *debugCliFlag {
- fmt.Printf("Closing the upload throughput data logger.\n")
- }
- uploadThroughputDataLogger.Close()
+ // Stop the world.
+ operatingCtxCancel()
- granularThroughputDataLogger.Export()
+ // Note: We do *not* have to export/close the upload *and* download
+ // sides of the self/foreign probe data loggers because they both
+ // refer to the same logger. Closing/exporting one will close/export
+ // the other.
+ uploadDirection.SelfProbeDataLogger.Export()
if *debugCliFlag {
- fmt.Printf("Closing the granular throughput data logger.\n")
+ fmt.Printf("Closing the self data loggers.\n")
}
- granularThroughputDataLogger.Close()
+ uploadDirection.SelfProbeDataLogger.Close()
+ uploadDirection.ForeignProbeDataLogger.Export()
if *debugCliFlag {
- fmt.Printf("In debugging mode, we will cool down.\n")
- time.Sleep(constants.CooldownPeriod)
- fmt.Printf("Done cooling down.\n")
- }
-
- if len(*prometheusStatsFilename) > 0 {
- var testStable int
- if testRanToStability {
- testStable = 1
- }
- var buffer bytes.Buffer
- buffer.WriteString(fmt.Sprintf("networkquality_test_stable %d\n", testStable))
- buffer.WriteString(fmt.Sprintf("networkquality_rpm_value %d\n", int64(p90Rpm)))
- buffer.WriteString(fmt.Sprintf("networkquality_trimmed_rpm_value %d\n",
- int64(meanRpm))) // utilities.ToMbps(lastDownloadThroughputRate),
-
- buffer.WriteString(fmt.Sprintf("networkquality_download_bits_per_second %d\n", int64(lastDownloadThroughputRate)))
- buffer.WriteString(fmt.Sprintf("networkquality_download_connections %d\n",
- int64(lastDownloadThroughputOpenConnectionCount)))
- buffer.WriteString(fmt.Sprintf("networkquality_upload_bits_per_second %d\n", int64(lastUploadThroughputRate)))
- buffer.WriteString(fmt.Sprintf("networkquality_upload_connections %d\n",
- lastUploadThroughputOpenConnectionCount))
-
- if err := os.WriteFile(*prometheusStatsFilename, buffer.Bytes(), 0o644); err != nil {
- fmt.Printf("could not write %s: %s", *prometheusStatsFilename, err)
- os.Exit(1)
- }
+ fmt.Printf("Closing the foreign data loggers.\n")
}
+ uploadDirection.SelfProbeDataLogger.Close()
}