summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2022-11-07 01:54:24 -0500
committerWill Hawkins <[email protected]>2022-12-11 16:53:59 -0500
commit094eb99990f1cf734efb8104e0089dbc6920547e (patch)
tree8b8bb26481690f852115ebc236be169669f1b4f0
parent096b9d30559f86e07117ff5459c720900a408a11 (diff)
[Feature] Rev 3 of Stability (Basic implementation)
-rw-r--r--constants/constants.go4
-rw-r--r--networkQuality.go431
-rw-r--r--rpm/rpm.go120
3 files changed, 276 insertions, 279 deletions
diff --git a/constants/constants.go b/constants/constants.go
index 6934459..99de22d 100644
--- a/constants/constants.go
+++ b/constants/constants.go
@@ -28,15 +28,13 @@ var (
InstantaneousThroughputMeasurementCount int = 4
InstantaneousProbeMeasurementCount int = 1
// The number of instantaneous moving averages to consider when determining stability.
- InstantaneousMovingAverageCount int = 4
+ InstantaneousMovingAverageStabilityCount int = 4
// The standard deviation cutoff used to determine stability among the K preceding moving averages
// of a measurement (as a percentage of the mean).
StabilityStandardDeviation float64 = 5.0
// The amount of time that the client will cooldown if it is in debug mode.
CooldownPeriod time.Duration = 4 * time.Second
- // The number of probes to send when calculating RTT.
- MeasurementProbeCount int = 5
// The amount of time that we give ourselves to calculate the RPM.
RPMCalculationTime int = 10
diff --git a/networkQuality.go b/networkQuality.go
index 806f2f7..9235a90 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -29,7 +29,9 @@ import (
"github.com/network-quality/goresponsiveness/debug"
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
+ "github.com/network-quality/goresponsiveness/ms"
"github.com/network-quality/goresponsiveness/rpm"
+ "github.com/network-quality/goresponsiveness/stabilizer"
"github.com/network-quality/goresponsiveness/timeoutat"
"github.com/network-quality/goresponsiveness/utilities"
)
@@ -98,17 +100,24 @@ func main() {
// This is the overall operating context of the program. All other
// contexts descend from this one. Canceling this one cancels all
// the others.
- operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background())
+ operatingCtx, operatingCtxCancel := context.WithCancel(context.Background())
- //
- lgDataCollectionCtx, cancelLGDataCollectionCtx := context.WithCancel(operatingCtx)
+ // This context is used to control the load generators -- we cancel it when
+ // the system has completed its work. (i.e, rpm and saturation are stable).
+ // The *operator* contexts control stopping the goroutines that are running
+ // the process; the *throughput* contexts control whether the load generators
+ // continue to add new connections at every interval.
+ uploadLoadGeneratorOperatorCtx, uploadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx)
+ downloadLoadGeneratorOperatorCtx, downloadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx)
- // This context is used to control the load-generating network activity (i.e., all
- // the connections that are open to do load generation).
- lgNetworkActivityCtx, cancelLgNetworkActivityCtx := context.WithCancel(operatingCtx)
+ // This context is used to control the load-generating network activity (i.e., it controls all
+ // the connections that are open to do load generation and probing). Cancelling this context will close
+ // all the network connections that are responsible for generating the load.
+ lgNetworkActivityCtx, lgNetworkActivityCtxCancel := context.WithCancel(operatingCtx)
+
+ // This context is used to control the activity of the prober.
+ proberCtx, proberCtxCancel := context.WithCancel(operatingCtx)
- // This context is used to control the activity of the foreign prober.
- foreignProbertCtx, foreignProberCtxCancel := context.WithCancel(operatingCtx)
config := &config.Config{}
var debugLevel debug.DebugLevel = debug.Error
@@ -191,10 +200,11 @@ func main() {
}
}
- var selfDataLogger datalogger.DataLogger[rpm.ProbeDataPoint] = nil
- var foreignDataLogger datalogger.DataLogger[rpm.ProbeDataPoint] = nil
+ var selfProbeDataLogger datalogger.DataLogger[rpm.ProbeDataPoint] = nil
+ var foreignProbeDataLogger datalogger.DataLogger[rpm.ProbeDataPoint] = nil
var downloadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil
var uploadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil
+
// User wants to log data from each probe!
if *dataLoggerBaseFileName != "" {
var err error = nil
@@ -214,7 +224,7 @@ func main() {
"-throughput-upload"+unique,
)
- selfDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ProbeDataPoint](
+ selfProbeDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ProbeDataPoint](
dataLoggerSelfFilename,
)
if err != nil {
@@ -222,10 +232,10 @@ func main() {
"Warning: Could not create the file for storing self probe results (%s). Disabling functionality.\n",
dataLoggerSelfFilename,
)
- selfDataLogger = nil
+ selfProbeDataLogger = nil
}
- foreignDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ProbeDataPoint](
+ foreignProbeDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ProbeDataPoint](
dataLoggerForeignFilename,
)
if err != nil {
@@ -233,7 +243,7 @@ func main() {
"Warning: Could not create the file for storing foreign probe results (%s). Disabling functionality.\n",
dataLoggerForeignFilename,
)
- foreignDataLogger = nil
+ foreignProbeDataLogger = nil
}
downloadThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint](
@@ -258,6 +268,20 @@ func main() {
uploadThroughputDataLogger = nil
}
}
+ // If, for some reason, the data loggers are nil, make them Null Data Loggers so that we don't have conditional
+ // code later.
+ if selfProbeDataLogger == nil {
+ selfProbeDataLogger = datalogger.CreateNullDataLogger[rpm.ProbeDataPoint]()
+ }
+ if foreignProbeDataLogger == nil {
+ foreignProbeDataLogger = datalogger.CreateNullDataLogger[rpm.ProbeDataPoint]()
+ }
+ if downloadThroughputDataLogger == nil {
+ downloadThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]()
+ }
+ if uploadThroughputDataLogger == nil {
+ uploadThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]()
+ }
/*
* Create (and then, ironically, name) two anonymous functions that, when invoked,
@@ -278,194 +302,209 @@ func main() {
generateSelfProbeConfiguration := func() rpm.ProbeConfiguration {
return rpm.ProbeConfiguration{
- URL: config.Urls.SmallUrl,
- DataLogger: selfDataLogger,
- Interval: 100 * time.Millisecond,
+ URL: config.Urls.SmallUrl,
}
}
generateForeignProbeConfiguration := func() rpm.ProbeConfiguration {
return rpm.ProbeConfiguration{
- URL: config.Urls.SmallUrl,
- DataLogger: foreignDataLogger,
- Interval: 100 * time.Millisecond,
+ URL: config.Urls.SmallUrl,
}
}
var downloadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "download")
var uploadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "upload")
- var foreignDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "foreign probe")
+ var combinedProbeDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "combined probe")
+
+ downloadLoadGeneratingConnectionCollection := lgc.NewLoadGeneratingConnectionCollection()
+ uploadLoadGeneratingConnectionCollection := lgc.NewLoadGeneratingConnectionCollection()
// TODO: Separate contexts for load generation and data collection. If we do that, if either of the two
// data collection go routines stops well before the other, they will continue to send probes and we can
// generate additional information!
- downloadSaturationComplete, downloadDataCollectionChannel := rpm.LGCollectData(
- lgDataCollectionCtx,
+ selfProbeConnectionCommunicationChannel, downloadThroughputChannel := rpm.LoadGenerator(
lgNetworkActivityCtx,
- operatingCtx,
+ downloadLoadGeneratorOperatorCtx,
+ time.Second,
generate_lgd,
- generateSelfProbeConfiguration,
- downloadThroughputDataLogger,
+ &downloadLoadGeneratingConnectionCollection,
downloadDebugging,
)
- uploadSaturationComplete, uploadDataCollectionChannel := rpm.LGCollectData(
- lgDataCollectionCtx,
+ _, uploadThroughputChannel := rpm.LoadGenerator(
lgNetworkActivityCtx,
- operatingCtx,
+ uploadLoadGeneratorOperatorCtx,
+ time.Second,
generate_lgu,
- generateSelfProbeConfiguration,
- uploadThroughputDataLogger,
+ &uploadLoadGeneratingConnectionCollection,
uploadDebugging,
)
- foreignProbeDataPointsChannel := rpm.ForeignProber(
- foreignProbertCtx,
+ // start here.
+
+ selfDownProbeConnection := <-selfDownProbeConnectionCommunicationChannel
+ selfUpProbeConnection := <-selfUpProbeConnectionCommunicationChannel
+
+ probeDataPointsChannel := rpm.CombinedProber(
+ proberCtx,
generateForeignProbeConfiguration,
+ generateSelfProbeConfiguration,
+ selfProbeConnection,
+ time.Millisecond*100,
sslKeyFileConcurrentWriter,
- foreignDebugging,
+ combinedProbeDebugging,
)
- dataCollectionTimeout := false
- uploadDataGenerationComplete := false
- downloadDataGenerationComplete := false
- downloadDataCollectionResult := rpm.SelfDataCollectionResult{}
- uploadDataCollectionResult := rpm.SelfDataCollectionResult{}
+ responsivenessIsStable := false
+ downloadThroughputIsStable := false
+ uploadThroughputIsStable := false
- for !(uploadDataGenerationComplete && downloadDataGenerationComplete) {
- select {
- case fullyComplete := <-downloadSaturationComplete:
- {
- downloadDataGenerationComplete = true
- if *debugCliFlag {
- fmt.Printf(
- "################# download load-generating data generation is %s complete!\n",
- utilities.Conditional(fullyComplete, "", "(provisionally)"))
- }
- }
- case fullyComplete := <-uploadSaturationComplete:
- {
- uploadDataGenerationComplete = true
- if *debugCliFlag {
- fmt.Printf(
- "################# upload load-generating data generation is %s complete!\n",
- utilities.Conditional(fullyComplete, "", "(provisionally)"))
- }
- }
- case <-timeoutChannel:
- {
- if dataCollectionTimeout {
- // We already timedout on data collection. This signal means that
- // we are timedout on getting the provisional data collection. We
- // will exit!
- fmt.Fprint(
- os.Stderr,
- "Error: Load-Generating data collection could not be completed in time and no provisional data could be gathered. Test failed.\n",
- )
- cancelOperatingCtx()
- if *debugCliFlag {
- time.Sleep(constants.CooldownPeriod)
- }
- return // Ends program
- }
- dataCollectionTimeout = true
+ // Test parameters:
+ // 1. I: The number of previous instantaneous measurements to consider when generating
+ // the so-called instantaneous moving averages.
+ // 2. K: The number of instantaneous moving averages to consider when determining stability.
+ // 3: S: The standard deviation cutoff used to determine stability among the K preceding
+ // moving averages of a measurement.
- // We timed out attempting to collect data about the link. So, we will
- // shut down the generators
- cancelLGDataCollectionCtx()
- // and then we will give ourselves some additional time in order
- // to see if we can get some provisional data.
- timeoutAbsoluteTime = time.Now().
- Add(time.Second * time.Duration(*rpmtimeout))
- timeoutChannel = timeoutat.TimeoutAt(
- operatingCtx,
- timeoutAbsoluteTime,
- debugLevel,
- )
- if *debugCliFlag {
- fmt.Printf(
- "################# timeout collecting load-generating data!\n",
- )
- }
- }
- }
+ throughputI := constants.InstantaneousThroughputMeasurementCount
+ probeI := constants.InstantaneousProbeMeasurementCount
+ K := constants.InstantaneousMovingAverageStabilityCount
+ S := constants.StabilityStandardDeviation
+
+ downloadThroughputStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, "Download Throughput Stabilizer")
+ downloadThroughputStabilizerDebugLevel := debug.Error
+ if *debugCliFlag {
+ downloadThroughputStabilizerDebugLevel = debug.Debug
}
+ downloadThroughputStabilizer := stabilizer.NewThroughputStabilizer(throughputI, K, S, downloadThroughputStabilizerDebugLevel, downloadThroughputStabilizerDebugConfig)
+ uploadThroughputStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, "Upload Throughput Stabilizer")
+ uploadThroughputStabilizerDebugLevel := debug.Error
if *debugCliFlag {
- fmt.Printf("Stopping all the load generating data generators.\n")
+ uploadThroughputStabilizerDebugLevel = debug.Debug
}
- // Just cancel the data collection -- do *not* yet stop the actual load-generating
- // network activity.
- cancelLGDataCollectionCtx()
+ uploadThroughputStabilizer := stabilizer.NewThroughputStabilizer(throughputI, K, S, uploadThroughputStabilizerDebugLevel, uploadThroughputStabilizerDebugConfig)
- // Shutdown the foreign-connection prober!
+ probeStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, "Probe Stabilizer")
+ probeStabilizerDebugLevel := debug.Error
if *debugCliFlag {
- fmt.Printf("Stopping all foreign probers.\n")
+ probeStabilizerDebugLevel = debug.Debug
}
- foreignProberCtxCancel()
+ probeStabilizer := stabilizer.NewProbeStabilizer(probeI, K, S, probeStabilizerDebugLevel, probeStabilizerDebugConfig)
- // Now that we stopped generation, let's give ourselves some time to collect
- // all the data from our data generators.
- timeoutAbsoluteTime = time.Now().
- Add(time.Second * time.Duration(*rpmtimeout))
- timeoutChannel = timeoutat.TimeoutAt(
- operatingCtx,
- timeoutAbsoluteTime,
- debugLevel,
- )
+ selfRtts := ms.NewInfiniteMathematicalSeries[float64]()
+ foreignRtts := ms.NewInfiniteMathematicalSeries[float64]()
+
+ // For later debugging output, record the last throughputs on load-generating connectings
+ // and the number of open connections.
+ lastUploadThroughputRate := float64(0)
+ lastUploadThroughputOpenConnectionCount := int(0)
+ lastDownloadThroughputRate := float64(0)
+ lastDownloadThroughputOpenConnectionCount := int(0)
- // Now that we have generated the data, let's collect it.
- downloadDataCollectionComplete := false
- uploadDataCollectionComplete := false
- for !(downloadDataCollectionComplete && uploadDataCollectionComplete) {
+ // Every time that there is a new measurement, the possibility exists that the measurements become unstable.
+ // This allows us to continue pushing until *everything* is stable at the same time.
+timeout:
+ for !(responsivenessIsStable && downloadThroughputIsStable && uploadThroughputIsStable) {
select {
- case downloadDataCollectionResult = <-downloadDataCollectionChannel:
+
+ case downloadThroughputMeasurement := <-downloadThroughputChannel:
{
- downloadDataCollectionComplete = true
+ downloadThroughputStabilizer.AddMeasurement(downloadThroughputMeasurement)
+ downloadThroughputIsStable = downloadThroughputStabilizer.IsStable()
if *debugCliFlag {
fmt.Printf(
- "################# download load-generating data collection is complete (%fMBps, %d flows)!\n",
- utilities.ToMBps(downloadDataCollectionResult.RateBps),
- len(downloadDataCollectionResult.LGCs),
- )
+ "################# Download is instantaneously %s.\n", utilities.Conditional(downloadThroughputIsStable, "stable", "unstable"))
}
+ downloadThroughputDataLogger.LogRecord(downloadThroughputMeasurement)
+
+ lastDownloadThroughputRate = downloadThroughputMeasurement.Throughput
+ lastDownloadThroughputOpenConnectionCount = downloadThroughputMeasurement.Connections
}
- case uploadDataCollectionResult = <-uploadDataCollectionChannel:
+
+ case uploadThroughputMeasurement := <-uploadThroughputChannel:
{
- uploadDataCollectionComplete = true
+ uploadThroughputStabilizer.AddMeasurement(uploadThroughputMeasurement)
+ uploadThroughputIsStable = uploadThroughputStabilizer.IsStable()
if *debugCliFlag {
fmt.Printf(
- "################# upload load-generating data collection is complete (%fMBps, %d flows)!\n",
- utilities.ToMBps(uploadDataCollectionResult.RateBps),
- len(uploadDataCollectionResult.LGCs),
- )
+ "################# Upload is instantaneously %s.\n", utilities.Conditional(uploadThroughputIsStable, "stable", "unstable"))
}
+ uploadThroughputDataLogger.LogRecord(uploadThroughputMeasurement)
+
+ lastUploadThroughputRate = uploadThroughputMeasurement.Throughput
+ lastUploadThroughputOpenConnectionCount = uploadThroughputMeasurement.Connections
}
- case <-timeoutChannel:
+ case probeMeasurement := <-probeDataPointsChannel:
{
- // This is just bad news -- we generated data but could not collect it. Let's just fail.
+ probeStabilizer.AddMeasurement(probeMeasurement)
+
+ // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy
+ // is *actually* important, but it can't hurt?
+ responsivenessIsStable = probeStabilizer.IsStable()
- fmt.Fprint(
- os.Stderr,
- "Error: Load-Generating data collection could not be completed in time and no provisional data could be gathered. Test failed.\n",
- )
- return // Ends program
+ if *debugCliFlag {
+ fmt.Printf(
+ "################# Responsiveness is instantaneously %s.\n", utilities.Conditional(responsivenessIsStable, "stable", "unstable"))
+ }
+ if probeMeasurement.Type == rpm.Foreign {
+ for range utilities.Iota(0, int(probeMeasurement.RoundTripCount)) {
+ foreignRtts.AddElement(probeMeasurement.Duration.Seconds() / float64(probeMeasurement.RoundTripCount))
+
+ }
+ } else if probeMeasurement.Type == rpm.Self {
+ selfRtts.AddElement(probeMeasurement.Duration.Seconds())
+ }
+
+ // There may be more than one round trip accumulated together. If that is the case,
+ // we will blow them apart in to three separate measurements and each one will just
+ // be 1 / measurement.RoundTripCount of the total length.
+
+ if probeMeasurement.Type == rpm.Foreign {
+ foreignProbeDataLogger.LogRecord(probeMeasurement)
+ } else if probeMeasurement.Type == rpm.Self {
+ selfProbeDataLogger.LogRecord(probeMeasurement)
+ }
+ }
+ case <-timeoutChannel:
+ {
+ break timeout
}
}
}
- // In the new version we are no longer going to wait to send probes until after
- // saturation. When we get here we are now only going to compute the results
- // and/or extended statistics!
+ // Did the test run to stability?
+ testRanToStability := (downloadThroughputIsStable && uploadThroughputIsStable && responsivenessIsStable)
- extendedStats := extendedstats.AggregateExtendedStats{}
+ if *debugCliFlag {
+ fmt.Printf("Stopping all the load generating data generators (stability: %s).\n", utilities.Conditional(testRanToStability, "success", "failure"))
+ }
+
+ /* At this point there are
+ 1. Load generators running
+ -- uploadLoadGeneratorOperatorCtx
+ -- downloadLoadGeneratorOperatorCtx
+ 2. Network connections opened by those load generators:
+ -- lgNetworkActivityCtx
+ 3. Probes
+ -- proberCtx
+ */
+
+ // First, stop the load generators and the probes
+ proberCtxCancel()
+ downloadLoadGeneratorOperatorCtxCancel()
+ uploadLoadGeneratorOperatorCtxCancel()
+ // Second, calculate the extended stats (if the user requested)
+
+ extendedStats := extendedstats.AggregateExtendedStats{}
if *calculateExtendedStats {
if extendedstats.ExtendedStatsAvailable() {
- for i := 0; i < len(downloadDataCollectionResult.LGCs); i++ {
+ downloadLoadGeneratingConnectionCollection.Lock.Lock()
+ for i := 0; i < len(*downloadLoadGeneratingConnectionCollection.LGCs); i++ {
// Assume that extended statistics are available -- the check was done explicitly at
// program startup if the calculateExtendedStats flag was set by the user on the command line.
- if err := extendedStats.IncorporateConnectionStats(downloadDataCollectionResult.LGCs[i].Stats().ConnInfo.Conn); err != nil {
+ if err := extendedStats.IncorporateConnectionStats((*downloadLoadGeneratingConnectionCollection.LGCs)[i].Stats().ConnInfo.Conn); err != nil {
fmt.Fprintf(
os.Stderr,
"Warning: Could not add extended stats for the connection: %v\n",
@@ -473,66 +512,39 @@ func main() {
)
}
}
+ downloadLoadGeneratingConnectionCollection.Lock.Unlock()
+
+ // We do not trace upload connections!
} else {
// TODO: Should we just log here?
panic("Extended stats are not available but the user requested their calculation.")
}
}
- // And only now, when we are done getting the extended stats from the connections, can
- // we actually shut down the load-generating network activity!
- cancelLgNetworkActivityCtx()
+ // Third, stop the network connections opened by the load generators.
+ lgNetworkActivityCtxCancel()
- fmt.Printf(
- "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
- utilities.ToMbps(downloadDataCollectionResult.RateBps),
- utilities.ToMBps(downloadDataCollectionResult.RateBps),
- len(downloadDataCollectionResult.LGCs),
- )
- fmt.Printf(
- "Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
- utilities.ToMbps(uploadDataCollectionResult.RateBps),
- utilities.ToMBps(uploadDataCollectionResult.RateBps),
- len(uploadDataCollectionResult.LGCs),
- )
+ // Finally, stop the world.
+ operatingCtxCancel()
- foreignProbeDataPoints := utilities.ChannelToSlice(foreignProbeDataPointsChannel)
- totalForeignRoundTrips := len(foreignProbeDataPoints)
+ // Calculate the RPM
+
+ selfProbeRoundTripTimeP90 := selfRtts.Percentile(90)
// The specification indicates that we want to calculate the foreign probes as such:
// 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign
// where tcp_foreign, tls_foreign, http_foreign are the P90 RTTs for the connection
// of the tcp, tls and http connections, respectively. However, we cannot break out
- // the individual RTTs so we assume that they are roughly equal. Call that _foreign:
- // 1/3*_foreign + 1/3*_foreign + 1/3*_foreign =
- // 1/3*(3*_foreign) =
- // _foreign
- // So, there's no need to divide by the number of RTTs defined in the ProbeDataPoints
- // in the individual results.
- foreignProbeRoundTripTimes := utilities.Fmap(
- foreignProbeDataPoints,
- func(dp rpm.ProbeDataPoint) float64 { return dp.Duration.Seconds() },
- )
- foreignProbeRoundTripTimeP90 := utilities.CalculatePercentile(foreignProbeRoundTripTimes, 90)
-
- downloadRoundTripTimes := utilities.Fmap(
- downloadDataCollectionResult.ProbeDataPoints,
- func(dcr rpm.ProbeDataPoint) float64 { return dcr.Duration.Seconds() },
- )
- uploadRoundTripTimes := utilities.Fmap(
- uploadDataCollectionResult.ProbeDataPoints,
- func(dcr rpm.ProbeDataPoint) float64 { return dcr.Duration.Seconds() },
- )
- selfProbeRoundTripTimes := append(downloadRoundTripTimes, uploadRoundTripTimes...)
- totalSelfRoundTrips := len(selfProbeRoundTripTimes)
- selfProbeRoundTripTimeP90 := utilities.CalculatePercentile(selfProbeRoundTripTimes, 90)
+ // the individual RTTs so we assume that they are roughly equal. The good news is that
+ // we already did that roughly-equal split up when we added them to the foreignRtts IMS.
+ foreignProbeRoundTripTimeP90 := foreignRtts.Percentile(90)
rpm := 60.0 / (float64(selfProbeRoundTripTimeP90+foreignProbeRoundTripTimeP90) / 2.0)
if *debugCliFlag {
fmt.Printf(
"Total Load-Generating Round Trips: %d, Total New-Connection Round Trips: %d, P90 LG RTT: %f, P90 NC RTT: %f\n",
- totalSelfRoundTrips,
- totalForeignRoundTrips,
+ selfRtts.Size(),
+ foreignRtts.Size(),
selfProbeRoundTripTimeP90,
foreignProbeRoundTripTimeP90,
)
@@ -544,42 +556,47 @@ func main() {
fmt.Println(extendedStats.Repr())
}
- if !utilities.IsInterfaceNil(selfDataLogger) {
- selfDataLogger.Export()
- if *debugCliFlag {
- fmt.Printf("Closing the self data logger.\n")
- }
- selfDataLogger.Close()
+ selfProbeDataLogger.Export()
+ if *debugCliFlag {
+ fmt.Printf("Closing the self data logger.\n")
}
+ selfProbeDataLogger.Close()
- if !utilities.IsInterfaceNil(foreignDataLogger) {
- foreignDataLogger.Export()
- if *debugCliFlag {
- fmt.Printf("Closing the foreign data logger.\n")
- }
- foreignDataLogger.Close()
+ foreignProbeDataLogger.Export()
+ if *debugCliFlag {
+ fmt.Printf("Closing the foreign data logger.\n")
}
+ foreignProbeDataLogger.Close()
- if !utilities.IsInterfaceNil(downloadThroughputDataLogger) {
- downloadThroughputDataLogger.Export()
- if *debugCliFlag {
- fmt.Printf("Closing the download throughput data logger.\n")
- }
- downloadThroughputDataLogger.Close()
+ downloadThroughputDataLogger.Export()
+ if *debugCliFlag {
+ fmt.Printf("Closing the download throughput data logger.\n")
}
+ downloadThroughputDataLogger.Close()
- if !utilities.IsInterfaceNil(uploadThroughputDataLogger) {
- uploadThroughputDataLogger.Export()
- if *debugCliFlag {
- fmt.Printf("Closing the upload throughput data logger.\n")
- }
- uploadThroughputDataLogger.Close()
+ uploadThroughputDataLogger.Export()
+ if *debugCliFlag {
+ fmt.Printf("Closing the upload throughput data logger.\n")
}
+ uploadThroughputDataLogger.Close()
+
+ fmt.Printf(
+ "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
+ utilities.ToMbps(lastDownloadThroughputRate),
+ utilities.ToMBps(lastDownloadThroughputRate),
+ lastDownloadThroughputOpenConnectionCount,
+ )
+ fmt.Printf(
+ "Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
+ utilities.ToMbps(lastUploadThroughputRate),
+ utilities.ToMBps(lastUploadThroughputRate),
+ lastUploadThroughputOpenConnectionCount,
+ )
- cancelOperatingCtx()
if *debugCliFlag {
fmt.Printf("In debugging mode, we will cool down.\n")
time.Sleep(constants.CooldownPeriod)
fmt.Printf("Done cooling down.\n")
}
+
}
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 3c2fa7f..740db2b 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -25,7 +25,6 @@ import (
"time"
"github.com/network-quality/goresponsiveness/constants"
- "github.com/network-quality/goresponsiveness/datalogger"
"github.com/network-quality/goresponsiveness/debug"
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
@@ -38,16 +37,18 @@ import (
func addFlows(
ctx context.Context,
toAdd uint64,
- lgcs *[]lgc.LoadGeneratingConnection,
+ lgcc *lgc.LoadGeneratingConnectionCollection,
lgcGenerator func() lgc.LoadGeneratingConnection,
debug debug.DebugLevel,
) {
+ lgcc.Lock.Lock()
+ defer lgcc.Lock.Unlock()
for i := uint64(0); i < toAdd; i++ {
- *lgcs = append(*lgcs, lgcGenerator())
- if !(*lgcs)[len(*lgcs)-1].Start(ctx, debug) {
+ *lgcc.LGCs = append(*lgcc.LGCs, lgcGenerator())
+ if !(*lgcc.LGCs)[len(*lgcc.LGCs)-1].Start(ctx, debug) {
fmt.Printf(
"Error starting lgc with id %d!\n",
- (*lgcs)[len(*lgcs)-1].ClientId(),
+ (*lgcc.LGCs)[len(*lgcc.LGCs)-1].ClientId(),
)
return
}
@@ -55,8 +56,7 @@ func addFlows(
}
type ProbeConfiguration struct {
- URL string
- DataLogger datalogger.DataLogger[ProbeDataPoint]
+ URL string
}
type ProbeDataPoint struct {
@@ -65,6 +65,7 @@ type ProbeDataPoint struct {
Duration time.Duration `Description:"The duration for this measurement." Formatter:"Seconds"`
TCPRtt time.Duration `Description:"The underlying connection's RTT at probe time." Formatter:"Seconds"`
TCPCwnd uint32 `Description:"The underlying connection's congestion window at probe time."`
+ Type ProbeType `Description:"The type of the probe." Formatter:"Value"`
}
type ThroughputDataPoint struct {
@@ -97,7 +98,6 @@ func (pt ProbeType) Value() string {
func Probe(
parentProbeCtx context.Context,
waitGroup *sync.WaitGroup,
- logger datalogger.DataLogger[ProbeDataPoint],
client *http.Client,
probeUrl string,
probeType ProbeType,
@@ -111,7 +111,7 @@ func Probe(
}
if client == nil {
- return fmt.Errorf("Cannot start a probe with a nil client")
+ return fmt.Errorf("cannot start a probe with a nil client")
}
probeId := utilities.GenerateUniqueId()
@@ -182,7 +182,7 @@ func Probe(
if probeType == Foreign {
roundTripCount = 3
}
- // TODO: Careful!!! It's possible that this channel has been closed because the Prober that
+ // Careful!!! It's possible that this channel has been closed because the Prober that
// started it has been stopped. Writing to a closed channel will cause a panic. It might not
// matter because a panic just stops the go thread containing the paniced code and we are in
// a go thread that executes only this function.
@@ -200,6 +200,7 @@ func Probe(
}()
tcpRtt := time.Duration(0 * time.Second)
tcpCwnd := uint32(0)
+ // TODO: Only get the extended stats for a connection if the user has requested them overall.
if extendedstats.ExtendedStatsAvailable() {
tcpInfo, err := extendedstats.GetTCPInfo(probeTracer.stats.ConnInfo.Conn)
if err == nil {
@@ -215,9 +216,7 @@ func Probe(
Duration: totalDelay,
TCPRtt: tcpRtt,
TCPCwnd: tcpCwnd,
- }
- if !utilities.IsInterfaceNil(logger) {
- logger.LogRecord(dataPoint)
+ Type: probeType,
}
*result <- dataPoint
return nil
@@ -231,13 +230,17 @@ func CombinedProber(
probeInterval time.Duration,
keyLogger io.Writer,
debugging *debug.DebugWithPrefix,
-) (points chan ProbeDataPoint) {
- points = make(chan ProbeDataPoint)
+) (dataPoints chan ProbeDataPoint) {
+
+ // Make a channel to send back all the generated data points
+ // when we are probing.
+ dataPoints = make(chan ProbeDataPoint)
go func() {
wg := sync.WaitGroup{}
probeCount := 0
+ // As long as our context says that we can continue to probe!
for proberCtx.Err() == nil {
time.Sleep(probeInterval)
@@ -247,9 +250,9 @@ func CombinedProber(
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) About to send of the %d round of probes!\n",
+ "(%s) About to send round %d of probes!\n",
debugging.Prefix,
- probeCount,
+ probeCount+1,
)
}
transport := http2.Transport{}
@@ -273,28 +276,37 @@ func CombinedProber(
}
transport.TLSClientConfig.InsecureSkipVerify = true
- client := &http.Client{Transport: &transport}
+ foreignProbeClient := &http.Client{Transport: &transport}
probeCount++
go Probe(
proberCtx,
&wg,
- foreignProbeConfiguration.DataLogger,
- client,
+ foreignProbeClient,
foreignProbeConfiguration.URL,
Foreign,
- &points,
+ &dataPoints,
debugging,
)
go Probe(
proberCtx,
&wg,
- selfProbeConfiguration.DataLogger,
selfProbeConnection.Client(),
selfProbeConfiguration.URL,
- Self,
- &points,
+ SelfDown,
+ &dataPoints,
+ debugging,
+ )
+
+ // Start Upload Connection Prober
+ go Probe(
+ proberCtx,
+ &wg,
+ selfUpProbeConnection.Client(),
+ selfProbeConfiguration.URL,
+ SelfUp,
+ &dataPoints,
debugging,
)
}
@@ -311,22 +323,20 @@ func CombinedProber(
debugging.Prefix,
)
}
- close(points)
+ close(dataPoints)
}()
return
}
func LoadGenerator(
networkActivityCtx context.Context, // Create all network connections in this context.
- saturationCtx context.Context, // Continue logging, but stop adding flows when this context is canceled!
loadGeneratorCtx context.Context, // Stop our activity when we no longer need to generate load.
rampupInterval time.Duration,
lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection.
loadGeneratingConnections *lgc.LoadGeneratingConnectionCollection,
debugging *debug.DebugWithPrefix, // How can we forget debugging?
-) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to use for self probes.
+) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes.
throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate.
- lgcs []lgc.LoadGeneratingConnection, // The caller will want to look at this if they are interested in doing extended stats.
) {
throughputCalculations = make(chan ThroughputDataPoint)
@@ -334,25 +344,18 @@ func LoadGenerator(
// be read by the caller. We don't want to wait around until they are ready before we start doing our work.
// So, we'll make it buffered.
probeConnectionCommunicationChannel = make(chan lgc.LoadGeneratingConnection, 1)
- lgcs = make([]lgc.LoadGeneratingConnection, 0)
go func() {
- isSaturated := false
-
- lgcs := make([]lgc.LoadGeneratingConnection, 0)
-
flowsCreated := uint64(0)
- loadGeneratingConnections.Lock.Lock()
addFlows(
networkActivityCtx,
constants.StartingNumberOfLoadGeneratingConnections,
- loadGeneratingConnections.LGCs,
+ loadGeneratingConnections,
lgcGenerator,
debugging.Level,
)
- loadGeneratingConnections.Lock.Unlock()
flowsCreated += constants.StartingNumberOfLoadGeneratingConnections
// We have at least a single load-generating channel. This channel will be the one that
@@ -363,20 +366,11 @@ func LoadGenerator(
for currentInterval := uint64(0); true; currentInterval++ {
- // If the operationalCtx is canceled, then that means our work here is done ...
+ // If the loadGeneratorCtx is canceled, then that means our work here is done ...
if loadGeneratorCtx.Err() != nil {
break
}
- if saturationCtx.Err() != nil {
- isSaturated = true
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "%v: Received the saturated signal; continuing only to log from now on.\n",
- debugging,
- )
- }
- }
now := time.Now()
// At each 1-second interval
if nextSampleStartTime.Sub(now) > 0 {
@@ -435,26 +429,14 @@ func LoadGenerator(
throughputDataPoint := ThroughputDataPoint{time.Now(), instantaneousTotalThroughput, len(*loadGeneratingConnections.LGCs)}
throughputCalculations <- throughputDataPoint
- // Log that, if we are configured for logging.
- if !utilities.IsInterfaceNil(throughputDataLogger) {
- throughputDataLogger.LogRecord(throughputDataPoint)
- }
-
- // We don't actually want to create a new connection if we are saturated!
- if isSaturated {
- continue
- }
-
// Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now!
- loadGeneratingConnections.Lock.Lock()
addFlows(
networkActivityCtx,
constants.AdditiveNumberOfLoadGeneratingConnections,
- loadGeneratingConnections.LGCs,
+ loadGeneratingConnections,
lgcGenerator,
debugging.Level,
)
- loadGeneratingConnections.Lock.Unlock()
flowsCreated += constants.AdditiveNumberOfLoadGeneratingConnections
}
@@ -670,17 +652,17 @@ func (probe *ProbeTracer) SetGotConnTimeInfo(
os.Stderr,
"A self probe sent used a new connection!\n",
)
- } else if debug.IsDebug(probe.debug) {
- fmt.Printf("Properly reused a connection when doing a self probe!\n")
}
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) Got a reused connection for Probe %v at %v with info %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.GetConnectionDoneTime,
- probe.stats.ConnInfo,
- )
+ if gotConnInfo.Reused {
+ if debug.IsDebug(probe.debug) {
+ fmt.Printf(
+ "(%s Probe) Got a reused connection for Probe %v at %v with info %v\n",
+ probe.probeType.Value(),
+ probe.ProbeId(),
+ probe.stats.GetConnectionDoneTime,
+ probe.stats.ConnInfo,
+ )
+ }
}
}