diff options
| author | Will Hawkins <[email protected]> | 2023-01-27 22:06:10 -0500 |
|---|---|---|
| committer | Will Hawkins <[email protected]> | 2023-01-27 22:06:10 -0500 |
| commit | c8350c13a09b8c19656cd32a065b69693b6117c5 (patch) | |
| tree | 083f345217d7f9a4d2136078f830b4887c9715e4 /networkQuality.go | |
| parent | b7bc5fa7dd75b8aa0bc3be22b1b3deab1979cd96 (diff) | |
[FEATURE] Finalize implementation of rev3 of the draft
Diffstat (limited to 'networkQuality.go')
| -rw-r--r-- | networkQuality.go | 144 |
1 files changed, 91 insertions, 53 deletions
diff --git a/networkQuality.go b/networkQuality.go index c3db256..759d562 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -58,15 +58,10 @@ var ( constants.DefaultDebug, "Enable debugging.", ) - sattimeout = flag.Int( - "sattimeout", - constants.DefaultTestTime, - "Maximum time to spend measuring saturation.", - ) rpmtimeout = flag.Int( "rpmtimeout", constants.RPMCalculationTime, - "Maximum time to spend calculating RPM.", + "Maximum time to spend calculating RPM (i.e., total test time.).", ) sslKeyFileName = flag.String( "ssl-key-file", @@ -88,12 +83,17 @@ var ( "", "Store granular information about tests results in files with this basename. Time and information type will be appended (before the first .) to create separate log files. Disabled by default.", ) + probeIntervalTime = flag.Uint( + "probe-interval-time", + 100, + "Time (in ms) between probes (foreign and self).", + ) ) func main() { flag.Parse() - timeoutDuration := time.Second * time.Duration(*sattimeout) + timeoutDuration := time.Second * time.Duration(*rpmtimeout) timeoutAbsoluteTime := time.Now().Add(timeoutDuration) configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort) @@ -102,21 +102,17 @@ func main() { // the others. operatingCtx, operatingCtxCancel := context.WithCancel(context.Background()) - // This context is used to control the load generators -- we cancel it when - // the system has completed its work. (i.e, rpm and saturation are stable). - // The *operator* contexts control stopping the goroutines that are running - // the process; the *throughput* contexts control whether the load generators - // continue to add new connections at every interval. + // The operator contexts. These contexts control the processes that manage + // network activity but do no control network activity. + uploadLoadGeneratorOperatorCtx, uploadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx) downloadLoadGeneratorOperatorCtx, downloadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx) + proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx) - // This context is used to control the load-generating network activity (i.e., it controls all + // This context is used to control the network activity (i.e., it controls all // the connections that are open to do load generation and probing). Cancelling this context will close // all the network connections that are responsible for generating the load. - lgNetworkActivityCtx, lgNetworkActivityCtxCancel := context.WithCancel(operatingCtx) - - // This context is used to control the activity of the prober. - proberCtx, proberCtxCancel := context.WithCancel(operatingCtx) + networkActivityCtx, networkActivityCtxCancel := context.WithCancel(operatingCtx) config := &config.Config{} var debugLevel debug.DebugLevel = debug.Error @@ -128,18 +124,18 @@ func main() { if *calculateExtendedStats && !extendedstats.ExtendedStatsAvailable() { *calculateExtendedStats = false fmt.Printf( - "Warning: Calculation of extended statistics was requested but they are not supported on this platform.\n", + "Warning: Calculation of extended statistics was requested but is not supported on this platform.\n", ) } var sslKeyFileConcurrentWriter *ccw.ConcurrentWriter = nil if *sslKeyFileName != "" { if sslKeyFileHandle, err := os.OpenFile(*sslKeyFileName, os.O_RDWR|os.O_CREATE, os.FileMode(0600)); err != nil { - fmt.Printf("Could not open the keyfile for writing: %v!\n", err) + fmt.Printf("Could not open the requested SSL key logging file for writing: %v!\n", err) sslKeyFileConcurrentWriter = nil } else { if err = utilities.SeekForAppend(sslKeyFileHandle); err != nil { - fmt.Printf("Could not seek to the end of the key file: %v!\n", err) + fmt.Printf("Could not seek to the end of the SSL key logging file: %v!\n", err) sslKeyFileConcurrentWriter = nil } else { if debug.IsDebug(debugLevel) { @@ -174,7 +170,7 @@ func main() { debugLevel, ) if debug.IsDebug(debugLevel) { - fmt.Printf("Test will end earlier than %v\n", timeoutAbsoluteTime) + fmt.Printf("Test will end no later than %v\n", timeoutAbsoluteTime) } // print the banner @@ -190,7 +186,7 @@ func main() { if err != nil { fmt.Fprintf( os.Stderr, - "Error: Profiling requested with storage in %s but that file could not be opened: %v\n", + "Error: Profiling requested but could not open the log file ( %s ) for writing: %v\n", *profile, err, ) @@ -312,6 +308,7 @@ func main() { KeyLogger: sslKeyFileConcurrentWriter, } } + generate_lgu := func() lgc.LoadGeneratingConnection { return &lgc.LoadGeneratingConnectionUpload{ Path: config.Urls.UploadUrl, @@ -346,33 +343,42 @@ func main() { // generate additional information! selfDownProbeConnectionCommunicationChannel, downloadThroughputChannel := rpm.LoadGenerator( - lgNetworkActivityCtx, + networkActivityCtx, downloadLoadGeneratorOperatorCtx, time.Second, generate_lgd, &downloadLoadGeneratingConnectionCollection, + *calculateExtendedStats, downloadDebugging, ) selfUpProbeConnectionCommunicationChannel, uploadThroughputChannel := rpm.LoadGenerator( - lgNetworkActivityCtx, + networkActivityCtx, uploadLoadGeneratorOperatorCtx, time.Second, generate_lgu, &uploadLoadGeneratingConnectionCollection, + *calculateExtendedStats, uploadDebugging, ) + // Handles for the first connection that the load-generating go routines (both up and + // download) open are passed because on the self[Down|Up]ProbeConnectionCommunicationChannel + // so that we can then start probes on those handles. selfDownProbeConnection := <-selfDownProbeConnectionCommunicationChannel selfUpProbeConnection := <-selfUpProbeConnectionCommunicationChannel + // The combined prober will handle launching, monitoring, etc of *both* the self and foreign + // probes. probeDataPointsChannel := rpm.CombinedProber( - proberCtx, + proberOperatorCtx, + networkActivityCtx, generateForeignProbeConfiguration, generateSelfProbeConfiguration, selfDownProbeConnection, selfUpProbeConnection, - time.Millisecond*100, + time.Millisecond*(time.Duration(*probeIntervalTime)), sslKeyFileConcurrentWriter, + *calculateExtendedStats, combinedProbeDebugging, ) @@ -386,6 +392,7 @@ func main() { // 2. K: The number of instantaneous moving averages to consider when determining stability. // 3: S: The standard deviation cutoff used to determine stability among the K preceding // moving averages of a measurement. + // See throughputI := constants.InstantaneousThroughputMeasurementCount probeI := constants.InstantaneousProbeMeasurementCount @@ -479,6 +486,9 @@ timeout: "################# Responsiveness is instantaneously %s.\n", utilities.Conditional(responsivenessIsStable, "stable", "unstable")) } if probeMeasurement.Type == rpm.Foreign { + // There may be more than one round trip accumulated together. If that is the case, + // we will blow them apart in to three separate measurements and each one will just + // be 1 / measurement.RoundTripCount of the total length. for range utilities.Iota(0, int(probeMeasurement.RoundTripCount)) { foreignRtts.AddElement(probeMeasurement.Duration.Seconds() / float64(probeMeasurement.RoundTripCount)) @@ -487,10 +497,6 @@ timeout: selfRtts.AddElement(probeMeasurement.Duration.Seconds()) } - // There may be more than one round trip accumulated together. If that is the case, - // we will blow them apart in to three separate measurements and each one will just - // be 1 / measurement.RoundTripCount of the total length. - if probeMeasurement.Type == rpm.Foreign { foreignProbeDataLogger.LogRecord(probeMeasurement) } else if probeMeasurement.Type == rpm.SelfDown || probeMeasurement.Type == rpm.SelfUp { @@ -523,8 +529,8 @@ timeout: -- proberCtx */ - // First, stop the load generators and the probes - proberCtxCancel() + // First, stop the load generator and the probe operators (but *not* the network activity) + proberOperatorCtxCancel() downloadLoadGeneratorOperatorCtxCancel() uploadLoadGeneratorOperatorCtxCancel() @@ -533,35 +539,57 @@ timeout: extendedStats := extendedstats.AggregateExtendedStats{} if *calculateExtendedStats { if extendedstats.ExtendedStatsAvailable() { - downloadLoadGeneratingConnectionCollection.Lock.Lock() - for i := 0; i < len(*downloadLoadGeneratingConnectionCollection.LGCs); i++ { - // Assume that extended statistics are available -- the check was done explicitly at - // program startup if the calculateExtendedStats flag was set by the user on the command line. - if err := extendedStats.IncorporateConnectionStats((*downloadLoadGeneratingConnectionCollection.LGCs)[i].Stats().ConnInfo.Conn); err != nil { - fmt.Fprintf( - os.Stderr, - "Warning: Could not add extended stats for the connection: %v\n", - err, - ) - } - } - downloadLoadGeneratingConnectionCollection.Lock.Unlock() + func() { + // Put inside an IIFE so that we can use a defer! + downloadLoadGeneratingConnectionCollection.Lock.Lock() + defer downloadLoadGeneratingConnectionCollection.Lock.Unlock() - // We do not trace upload connections! + // Note: We do not trace upload connections! + for i := 0; i < len(*downloadLoadGeneratingConnectionCollection.LGCs); i++ { + // Assume that extended statistics are available -- the check was done explicitly at + // program startup if the calculateExtendedStats flag was set by the user on the command line. + if err := extendedStats.IncorporateConnectionStats((*downloadLoadGeneratingConnectionCollection.LGCs)[i].Stats().ConnInfo.Conn); err != nil { + fmt.Fprintf( + os.Stderr, + "Warning: Could not add extended stats for the connection: %v\n", + err, + ) + } + } + }() } else { // TODO: Should we just log here? panic("Extended stats are not available but the user requested their calculation.") } } - // Third, stop the network connections opened by the load generators. - lgNetworkActivityCtxCancel() + // Third, stop the network connections opened by the load generators and probers. + networkActivityCtxCancel() // Finally, stop the world. operatingCtxCancel() // Calculate the RPM + // First, let's do a double-sided trim of the top/bottom 10% of our measurements. + + if *debugCliFlag { + fmt.Printf("") + } + + selfRttsTotalCount := selfRtts.Len() + foreignRttsTotalCount := foreignRtts.Len() + + selfRttsTrimmed := selfRtts.DoubleSidedTrim(10) + foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(10) + + selfRttsTrimmedCount := selfRttsTrimmed.Len() + foreignRttsTrimmedCount := foreignRttsTrimmed.Len() + + // Then, let's take the mean of those ... + selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage() + foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage() + selfProbeRoundTripTimeP90 := selfRtts.Percentile(90) // The specification indicates that we want to calculate the foreign probes as such: // 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign @@ -572,22 +600,32 @@ timeout: foreignProbeRoundTripTimeP90 := foreignRtts.Percentile(90) // This is 60 because we measure in seconds not ms - rpm := 60.0 / (float64(selfProbeRoundTripTimeP90+foreignProbeRoundTripTimeP90) / 2.0) + p90Rpm := 60.0 / (float64(selfProbeRoundTripTimeP90+foreignProbeRoundTripTimeP90) / 2.0) + meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0) if *debugCliFlag { fmt.Printf( - "Total Load-Generating Round Trips: %d, Total New-Connection Round Trips: %d, P90 LG RTT: %f, P90 NC RTT: %f\n", - selfRtts.Size(), - foreignRtts.Size(), + `Total Self Probes: %d, Total Foreign Probes: %d +Trimmed Self Probes Count: %d, Trimmed Foreign Probes Count: %d +P90 Self RTT: %f, P90 Foreign RTT: %f +Trimmed Mean Self RTT: %f, Trimmed Mean Foreign RTT: %f +`, + selfRttsTotalCount, + foreignRttsTotalCount, + selfRttsTrimmedCount, + foreignRttsTrimmedCount, selfProbeRoundTripTimeP90, foreignProbeRoundTripTimeP90, + selfProbeRoundTripTimeMean, + foreignProbeRoundTripTimeMean, ) } if !testRanToStability { fmt.Printf("Test did not run to stability, these results are estimates:\n") } - fmt.Printf("RPM: %5.0f\n", rpm) + fmt.Printf("P90 RPM: %5.0f\n", p90Rpm) + fmt.Printf("Trimmed Mean RPM: %5.0f\n", meanRpm) fmt.Printf( "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", |
