summaryrefslogtreecommitdiff
path: root/networkQuality.go
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2023-01-27 22:06:10 -0500
committerWill Hawkins <[email protected]>2023-01-27 22:06:10 -0500
commitc8350c13a09b8c19656cd32a065b69693b6117c5 (patch)
tree083f345217d7f9a4d2136078f830b4887c9715e4 /networkQuality.go
parentb7bc5fa7dd75b8aa0bc3be22b1b3deab1979cd96 (diff)
[FEATURE] Finalize implementation of rev3 of the draft
Diffstat (limited to 'networkQuality.go')
-rw-r--r--networkQuality.go144
1 files changed, 91 insertions, 53 deletions
diff --git a/networkQuality.go b/networkQuality.go
index c3db256..759d562 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -58,15 +58,10 @@ var (
constants.DefaultDebug,
"Enable debugging.",
)
- sattimeout = flag.Int(
- "sattimeout",
- constants.DefaultTestTime,
- "Maximum time to spend measuring saturation.",
- )
rpmtimeout = flag.Int(
"rpmtimeout",
constants.RPMCalculationTime,
- "Maximum time to spend calculating RPM.",
+ "Maximum time to spend calculating RPM (i.e., total test time.).",
)
sslKeyFileName = flag.String(
"ssl-key-file",
@@ -88,12 +83,17 @@ var (
"",
"Store granular information about tests results in files with this basename. Time and information type will be appended (before the first .) to create separate log files. Disabled by default.",
)
+ probeIntervalTime = flag.Uint(
+ "probe-interval-time",
+ 100,
+ "Time (in ms) between probes (foreign and self).",
+ )
)
func main() {
flag.Parse()
- timeoutDuration := time.Second * time.Duration(*sattimeout)
+ timeoutDuration := time.Second * time.Duration(*rpmtimeout)
timeoutAbsoluteTime := time.Now().Add(timeoutDuration)
configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort)
@@ -102,21 +102,17 @@ func main() {
// the others.
operatingCtx, operatingCtxCancel := context.WithCancel(context.Background())
- // This context is used to control the load generators -- we cancel it when
- // the system has completed its work. (i.e, rpm and saturation are stable).
- // The *operator* contexts control stopping the goroutines that are running
- // the process; the *throughput* contexts control whether the load generators
- // continue to add new connections at every interval.
+ // The operator contexts. These contexts control the processes that manage
+ // network activity but do no control network activity.
+
uploadLoadGeneratorOperatorCtx, uploadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx)
downloadLoadGeneratorOperatorCtx, downloadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx)
+ proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx)
- // This context is used to control the load-generating network activity (i.e., it controls all
+ // This context is used to control the network activity (i.e., it controls all
// the connections that are open to do load generation and probing). Cancelling this context will close
// all the network connections that are responsible for generating the load.
- lgNetworkActivityCtx, lgNetworkActivityCtxCancel := context.WithCancel(operatingCtx)
-
- // This context is used to control the activity of the prober.
- proberCtx, proberCtxCancel := context.WithCancel(operatingCtx)
+ networkActivityCtx, networkActivityCtxCancel := context.WithCancel(operatingCtx)
config := &config.Config{}
var debugLevel debug.DebugLevel = debug.Error
@@ -128,18 +124,18 @@ func main() {
if *calculateExtendedStats && !extendedstats.ExtendedStatsAvailable() {
*calculateExtendedStats = false
fmt.Printf(
- "Warning: Calculation of extended statistics was requested but they are not supported on this platform.\n",
+ "Warning: Calculation of extended statistics was requested but is not supported on this platform.\n",
)
}
var sslKeyFileConcurrentWriter *ccw.ConcurrentWriter = nil
if *sslKeyFileName != "" {
if sslKeyFileHandle, err := os.OpenFile(*sslKeyFileName, os.O_RDWR|os.O_CREATE, os.FileMode(0600)); err != nil {
- fmt.Printf("Could not open the keyfile for writing: %v!\n", err)
+ fmt.Printf("Could not open the requested SSL key logging file for writing: %v!\n", err)
sslKeyFileConcurrentWriter = nil
} else {
if err = utilities.SeekForAppend(sslKeyFileHandle); err != nil {
- fmt.Printf("Could not seek to the end of the key file: %v!\n", err)
+ fmt.Printf("Could not seek to the end of the SSL key logging file: %v!\n", err)
sslKeyFileConcurrentWriter = nil
} else {
if debug.IsDebug(debugLevel) {
@@ -174,7 +170,7 @@ func main() {
debugLevel,
)
if debug.IsDebug(debugLevel) {
- fmt.Printf("Test will end earlier than %v\n", timeoutAbsoluteTime)
+ fmt.Printf("Test will end no later than %v\n", timeoutAbsoluteTime)
}
// print the banner
@@ -190,7 +186,7 @@ func main() {
if err != nil {
fmt.Fprintf(
os.Stderr,
- "Error: Profiling requested with storage in %s but that file could not be opened: %v\n",
+ "Error: Profiling requested but could not open the log file ( %s ) for writing: %v\n",
*profile,
err,
)
@@ -312,6 +308,7 @@ func main() {
KeyLogger: sslKeyFileConcurrentWriter,
}
}
+
generate_lgu := func() lgc.LoadGeneratingConnection {
return &lgc.LoadGeneratingConnectionUpload{
Path: config.Urls.UploadUrl,
@@ -346,33 +343,42 @@ func main() {
// generate additional information!
selfDownProbeConnectionCommunicationChannel, downloadThroughputChannel := rpm.LoadGenerator(
- lgNetworkActivityCtx,
+ networkActivityCtx,
downloadLoadGeneratorOperatorCtx,
time.Second,
generate_lgd,
&downloadLoadGeneratingConnectionCollection,
+ *calculateExtendedStats,
downloadDebugging,
)
selfUpProbeConnectionCommunicationChannel, uploadThroughputChannel := rpm.LoadGenerator(
- lgNetworkActivityCtx,
+ networkActivityCtx,
uploadLoadGeneratorOperatorCtx,
time.Second,
generate_lgu,
&uploadLoadGeneratingConnectionCollection,
+ *calculateExtendedStats,
uploadDebugging,
)
+ // Handles for the first connection that the load-generating go routines (both up and
+ // download) open are passed because on the self[Down|Up]ProbeConnectionCommunicationChannel
+ // so that we can then start probes on those handles.
selfDownProbeConnection := <-selfDownProbeConnectionCommunicationChannel
selfUpProbeConnection := <-selfUpProbeConnectionCommunicationChannel
+ // The combined prober will handle launching, monitoring, etc of *both* the self and foreign
+ // probes.
probeDataPointsChannel := rpm.CombinedProber(
- proberCtx,
+ proberOperatorCtx,
+ networkActivityCtx,
generateForeignProbeConfiguration,
generateSelfProbeConfiguration,
selfDownProbeConnection,
selfUpProbeConnection,
- time.Millisecond*100,
+ time.Millisecond*(time.Duration(*probeIntervalTime)),
sslKeyFileConcurrentWriter,
+ *calculateExtendedStats,
combinedProbeDebugging,
)
@@ -386,6 +392,7 @@ func main() {
// 2. K: The number of instantaneous moving averages to consider when determining stability.
// 3: S: The standard deviation cutoff used to determine stability among the K preceding
// moving averages of a measurement.
+ // See
throughputI := constants.InstantaneousThroughputMeasurementCount
probeI := constants.InstantaneousProbeMeasurementCount
@@ -479,6 +486,9 @@ timeout:
"################# Responsiveness is instantaneously %s.\n", utilities.Conditional(responsivenessIsStable, "stable", "unstable"))
}
if probeMeasurement.Type == rpm.Foreign {
+ // There may be more than one round trip accumulated together. If that is the case,
+ // we will blow them apart in to three separate measurements and each one will just
+ // be 1 / measurement.RoundTripCount of the total length.
for range utilities.Iota(0, int(probeMeasurement.RoundTripCount)) {
foreignRtts.AddElement(probeMeasurement.Duration.Seconds() / float64(probeMeasurement.RoundTripCount))
@@ -487,10 +497,6 @@ timeout:
selfRtts.AddElement(probeMeasurement.Duration.Seconds())
}
- // There may be more than one round trip accumulated together. If that is the case,
- // we will blow them apart in to three separate measurements and each one will just
- // be 1 / measurement.RoundTripCount of the total length.
-
if probeMeasurement.Type == rpm.Foreign {
foreignProbeDataLogger.LogRecord(probeMeasurement)
} else if probeMeasurement.Type == rpm.SelfDown || probeMeasurement.Type == rpm.SelfUp {
@@ -523,8 +529,8 @@ timeout:
-- proberCtx
*/
- // First, stop the load generators and the probes
- proberCtxCancel()
+ // First, stop the load generator and the probe operators (but *not* the network activity)
+ proberOperatorCtxCancel()
downloadLoadGeneratorOperatorCtxCancel()
uploadLoadGeneratorOperatorCtxCancel()
@@ -533,35 +539,57 @@ timeout:
extendedStats := extendedstats.AggregateExtendedStats{}
if *calculateExtendedStats {
if extendedstats.ExtendedStatsAvailable() {
- downloadLoadGeneratingConnectionCollection.Lock.Lock()
- for i := 0; i < len(*downloadLoadGeneratingConnectionCollection.LGCs); i++ {
- // Assume that extended statistics are available -- the check was done explicitly at
- // program startup if the calculateExtendedStats flag was set by the user on the command line.
- if err := extendedStats.IncorporateConnectionStats((*downloadLoadGeneratingConnectionCollection.LGCs)[i].Stats().ConnInfo.Conn); err != nil {
- fmt.Fprintf(
- os.Stderr,
- "Warning: Could not add extended stats for the connection: %v\n",
- err,
- )
- }
- }
- downloadLoadGeneratingConnectionCollection.Lock.Unlock()
+ func() {
+ // Put inside an IIFE so that we can use a defer!
+ downloadLoadGeneratingConnectionCollection.Lock.Lock()
+ defer downloadLoadGeneratingConnectionCollection.Lock.Unlock()
- // We do not trace upload connections!
+ // Note: We do not trace upload connections!
+ for i := 0; i < len(*downloadLoadGeneratingConnectionCollection.LGCs); i++ {
+ // Assume that extended statistics are available -- the check was done explicitly at
+ // program startup if the calculateExtendedStats flag was set by the user on the command line.
+ if err := extendedStats.IncorporateConnectionStats((*downloadLoadGeneratingConnectionCollection.LGCs)[i].Stats().ConnInfo.Conn); err != nil {
+ fmt.Fprintf(
+ os.Stderr,
+ "Warning: Could not add extended stats for the connection: %v\n",
+ err,
+ )
+ }
+ }
+ }()
} else {
// TODO: Should we just log here?
panic("Extended stats are not available but the user requested their calculation.")
}
}
- // Third, stop the network connections opened by the load generators.
- lgNetworkActivityCtxCancel()
+ // Third, stop the network connections opened by the load generators and probers.
+ networkActivityCtxCancel()
// Finally, stop the world.
operatingCtxCancel()
// Calculate the RPM
+ // First, let's do a double-sided trim of the top/bottom 10% of our measurements.
+
+ if *debugCliFlag {
+ fmt.Printf("")
+ }
+
+ selfRttsTotalCount := selfRtts.Len()
+ foreignRttsTotalCount := foreignRtts.Len()
+
+ selfRttsTrimmed := selfRtts.DoubleSidedTrim(10)
+ foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(10)
+
+ selfRttsTrimmedCount := selfRttsTrimmed.Len()
+ foreignRttsTrimmedCount := foreignRttsTrimmed.Len()
+
+ // Then, let's take the mean of those ...
+ selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage()
+ foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage()
+
selfProbeRoundTripTimeP90 := selfRtts.Percentile(90)
// The specification indicates that we want to calculate the foreign probes as such:
// 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign
@@ -572,22 +600,32 @@ timeout:
foreignProbeRoundTripTimeP90 := foreignRtts.Percentile(90)
// This is 60 because we measure in seconds not ms
- rpm := 60.0 / (float64(selfProbeRoundTripTimeP90+foreignProbeRoundTripTimeP90) / 2.0)
+ p90Rpm := 60.0 / (float64(selfProbeRoundTripTimeP90+foreignProbeRoundTripTimeP90) / 2.0)
+ meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0)
if *debugCliFlag {
fmt.Printf(
- "Total Load-Generating Round Trips: %d, Total New-Connection Round Trips: %d, P90 LG RTT: %f, P90 NC RTT: %f\n",
- selfRtts.Size(),
- foreignRtts.Size(),
+ `Total Self Probes: %d, Total Foreign Probes: %d
+Trimmed Self Probes Count: %d, Trimmed Foreign Probes Count: %d
+P90 Self RTT: %f, P90 Foreign RTT: %f
+Trimmed Mean Self RTT: %f, Trimmed Mean Foreign RTT: %f
+`,
+ selfRttsTotalCount,
+ foreignRttsTotalCount,
+ selfRttsTrimmedCount,
+ foreignRttsTrimmedCount,
selfProbeRoundTripTimeP90,
foreignProbeRoundTripTimeP90,
+ selfProbeRoundTripTimeMean,
+ foreignProbeRoundTripTimeMean,
)
}
if !testRanToStability {
fmt.Printf("Test did not run to stability, these results are estimates:\n")
}
- fmt.Printf("RPM: %5.0f\n", rpm)
+ fmt.Printf("P90 RPM: %5.0f\n", p90Rpm)
+ fmt.Printf("Trimmed Mean RPM: %5.0f\n", meanRpm)
fmt.Printf(
"Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",