diff options
| -rw-r--r-- | constants/constants.go | 6 | ||||
| -rw-r--r-- | networkQuality.go | 144 | ||||
| -rw-r--r-- | rpm/rpm.go | 80 | ||||
| -rw-r--r-- | stabilizer/rev3.go | 20 | ||||
| -rw-r--r-- | tools/graphing.py | 2 |
5 files changed, 168 insertions, 84 deletions
diff --git a/constants/constants.go b/constants/constants.go index 99de22d..1f3ed57 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -25,10 +25,10 @@ var ( // The number of previous instantaneous measurements to consider when generating the so-called // instantaneous moving averages of a measurement. - InstantaneousThroughputMeasurementCount int = 4 - InstantaneousProbeMeasurementCount int = 1 + InstantaneousThroughputMeasurementCount uint64 = 4 + InstantaneousProbeMeasurementCount uint64 = 1 // The number of instantaneous moving averages to consider when determining stability. - InstantaneousMovingAverageStabilityCount int = 4 + InstantaneousMovingAverageStabilityCount uint64 = 4 // The standard deviation cutoff used to determine stability among the K preceding moving averages // of a measurement (as a percentage of the mean). StabilityStandardDeviation float64 = 5.0 diff --git a/networkQuality.go b/networkQuality.go index c3db256..759d562 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -58,15 +58,10 @@ var ( constants.DefaultDebug, "Enable debugging.", ) - sattimeout = flag.Int( - "sattimeout", - constants.DefaultTestTime, - "Maximum time to spend measuring saturation.", - ) rpmtimeout = flag.Int( "rpmtimeout", constants.RPMCalculationTime, - "Maximum time to spend calculating RPM.", + "Maximum time to spend calculating RPM (i.e., total test time.).", ) sslKeyFileName = flag.String( "ssl-key-file", @@ -88,12 +83,17 @@ var ( "", "Store granular information about tests results in files with this basename. Time and information type will be appended (before the first .) to create separate log files. Disabled by default.", ) + probeIntervalTime = flag.Uint( + "probe-interval-time", + 100, + "Time (in ms) between probes (foreign and self).", + ) ) func main() { flag.Parse() - timeoutDuration := time.Second * time.Duration(*sattimeout) + timeoutDuration := time.Second * time.Duration(*rpmtimeout) timeoutAbsoluteTime := time.Now().Add(timeoutDuration) configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort) @@ -102,21 +102,17 @@ func main() { // the others. operatingCtx, operatingCtxCancel := context.WithCancel(context.Background()) - // This context is used to control the load generators -- we cancel it when - // the system has completed its work. (i.e, rpm and saturation are stable). - // The *operator* contexts control stopping the goroutines that are running - // the process; the *throughput* contexts control whether the load generators - // continue to add new connections at every interval. + // The operator contexts. These contexts control the processes that manage + // network activity but do no control network activity. + uploadLoadGeneratorOperatorCtx, uploadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx) downloadLoadGeneratorOperatorCtx, downloadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx) + proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx) - // This context is used to control the load-generating network activity (i.e., it controls all + // This context is used to control the network activity (i.e., it controls all // the connections that are open to do load generation and probing). Cancelling this context will close // all the network connections that are responsible for generating the load. - lgNetworkActivityCtx, lgNetworkActivityCtxCancel := context.WithCancel(operatingCtx) - - // This context is used to control the activity of the prober. - proberCtx, proberCtxCancel := context.WithCancel(operatingCtx) + networkActivityCtx, networkActivityCtxCancel := context.WithCancel(operatingCtx) config := &config.Config{} var debugLevel debug.DebugLevel = debug.Error @@ -128,18 +124,18 @@ func main() { if *calculateExtendedStats && !extendedstats.ExtendedStatsAvailable() { *calculateExtendedStats = false fmt.Printf( - "Warning: Calculation of extended statistics was requested but they are not supported on this platform.\n", + "Warning: Calculation of extended statistics was requested but is not supported on this platform.\n", ) } var sslKeyFileConcurrentWriter *ccw.ConcurrentWriter = nil if *sslKeyFileName != "" { if sslKeyFileHandle, err := os.OpenFile(*sslKeyFileName, os.O_RDWR|os.O_CREATE, os.FileMode(0600)); err != nil { - fmt.Printf("Could not open the keyfile for writing: %v!\n", err) + fmt.Printf("Could not open the requested SSL key logging file for writing: %v!\n", err) sslKeyFileConcurrentWriter = nil } else { if err = utilities.SeekForAppend(sslKeyFileHandle); err != nil { - fmt.Printf("Could not seek to the end of the key file: %v!\n", err) + fmt.Printf("Could not seek to the end of the SSL key logging file: %v!\n", err) sslKeyFileConcurrentWriter = nil } else { if debug.IsDebug(debugLevel) { @@ -174,7 +170,7 @@ func main() { debugLevel, ) if debug.IsDebug(debugLevel) { - fmt.Printf("Test will end earlier than %v\n", timeoutAbsoluteTime) + fmt.Printf("Test will end no later than %v\n", timeoutAbsoluteTime) } // print the banner @@ -190,7 +186,7 @@ func main() { if err != nil { fmt.Fprintf( os.Stderr, - "Error: Profiling requested with storage in %s but that file could not be opened: %v\n", + "Error: Profiling requested but could not open the log file ( %s ) for writing: %v\n", *profile, err, ) @@ -312,6 +308,7 @@ func main() { KeyLogger: sslKeyFileConcurrentWriter, } } + generate_lgu := func() lgc.LoadGeneratingConnection { return &lgc.LoadGeneratingConnectionUpload{ Path: config.Urls.UploadUrl, @@ -346,33 +343,42 @@ func main() { // generate additional information! selfDownProbeConnectionCommunicationChannel, downloadThroughputChannel := rpm.LoadGenerator( - lgNetworkActivityCtx, + networkActivityCtx, downloadLoadGeneratorOperatorCtx, time.Second, generate_lgd, &downloadLoadGeneratingConnectionCollection, + *calculateExtendedStats, downloadDebugging, ) selfUpProbeConnectionCommunicationChannel, uploadThroughputChannel := rpm.LoadGenerator( - lgNetworkActivityCtx, + networkActivityCtx, uploadLoadGeneratorOperatorCtx, time.Second, generate_lgu, &uploadLoadGeneratingConnectionCollection, + *calculateExtendedStats, uploadDebugging, ) + // Handles for the first connection that the load-generating go routines (both up and + // download) open are passed because on the self[Down|Up]ProbeConnectionCommunicationChannel + // so that we can then start probes on those handles. selfDownProbeConnection := <-selfDownProbeConnectionCommunicationChannel selfUpProbeConnection := <-selfUpProbeConnectionCommunicationChannel + // The combined prober will handle launching, monitoring, etc of *both* the self and foreign + // probes. probeDataPointsChannel := rpm.CombinedProber( - proberCtx, + proberOperatorCtx, + networkActivityCtx, generateForeignProbeConfiguration, generateSelfProbeConfiguration, selfDownProbeConnection, selfUpProbeConnection, - time.Millisecond*100, + time.Millisecond*(time.Duration(*probeIntervalTime)), sslKeyFileConcurrentWriter, + *calculateExtendedStats, combinedProbeDebugging, ) @@ -386,6 +392,7 @@ func main() { // 2. K: The number of instantaneous moving averages to consider when determining stability. // 3: S: The standard deviation cutoff used to determine stability among the K preceding // moving averages of a measurement. + // See throughputI := constants.InstantaneousThroughputMeasurementCount probeI := constants.InstantaneousProbeMeasurementCount @@ -479,6 +486,9 @@ timeout: "################# Responsiveness is instantaneously %s.\n", utilities.Conditional(responsivenessIsStable, "stable", "unstable")) } if probeMeasurement.Type == rpm.Foreign { + // There may be more than one round trip accumulated together. If that is the case, + // we will blow them apart in to three separate measurements and each one will just + // be 1 / measurement.RoundTripCount of the total length. for range utilities.Iota(0, int(probeMeasurement.RoundTripCount)) { foreignRtts.AddElement(probeMeasurement.Duration.Seconds() / float64(probeMeasurement.RoundTripCount)) @@ -487,10 +497,6 @@ timeout: selfRtts.AddElement(probeMeasurement.Duration.Seconds()) } - // There may be more than one round trip accumulated together. If that is the case, - // we will blow them apart in to three separate measurements and each one will just - // be 1 / measurement.RoundTripCount of the total length. - if probeMeasurement.Type == rpm.Foreign { foreignProbeDataLogger.LogRecord(probeMeasurement) } else if probeMeasurement.Type == rpm.SelfDown || probeMeasurement.Type == rpm.SelfUp { @@ -523,8 +529,8 @@ timeout: -- proberCtx */ - // First, stop the load generators and the probes - proberCtxCancel() + // First, stop the load generator and the probe operators (but *not* the network activity) + proberOperatorCtxCancel() downloadLoadGeneratorOperatorCtxCancel() uploadLoadGeneratorOperatorCtxCancel() @@ -533,35 +539,57 @@ timeout: extendedStats := extendedstats.AggregateExtendedStats{} if *calculateExtendedStats { if extendedstats.ExtendedStatsAvailable() { - downloadLoadGeneratingConnectionCollection.Lock.Lock() - for i := 0; i < len(*downloadLoadGeneratingConnectionCollection.LGCs); i++ { - // Assume that extended statistics are available -- the check was done explicitly at - // program startup if the calculateExtendedStats flag was set by the user on the command line. - if err := extendedStats.IncorporateConnectionStats((*downloadLoadGeneratingConnectionCollection.LGCs)[i].Stats().ConnInfo.Conn); err != nil { - fmt.Fprintf( - os.Stderr, - "Warning: Could not add extended stats for the connection: %v\n", - err, - ) - } - } - downloadLoadGeneratingConnectionCollection.Lock.Unlock() + func() { + // Put inside an IIFE so that we can use a defer! + downloadLoadGeneratingConnectionCollection.Lock.Lock() + defer downloadLoadGeneratingConnectionCollection.Lock.Unlock() - // We do not trace upload connections! + // Note: We do not trace upload connections! + for i := 0; i < len(*downloadLoadGeneratingConnectionCollection.LGCs); i++ { + // Assume that extended statistics are available -- the check was done explicitly at + // program startup if the calculateExtendedStats flag was set by the user on the command line. + if err := extendedStats.IncorporateConnectionStats((*downloadLoadGeneratingConnectionCollection.LGCs)[i].Stats().ConnInfo.Conn); err != nil { + fmt.Fprintf( + os.Stderr, + "Warning: Could not add extended stats for the connection: %v\n", + err, + ) + } + } + }() } else { // TODO: Should we just log here? panic("Extended stats are not available but the user requested their calculation.") } } - // Third, stop the network connections opened by the load generators. - lgNetworkActivityCtxCancel() + // Third, stop the network connections opened by the load generators and probers. + networkActivityCtxCancel() // Finally, stop the world. operatingCtxCancel() // Calculate the RPM + // First, let's do a double-sided trim of the top/bottom 10% of our measurements. + + if *debugCliFlag { + fmt.Printf("") + } + + selfRttsTotalCount := selfRtts.Len() + foreignRttsTotalCount := foreignRtts.Len() + + selfRttsTrimmed := selfRtts.DoubleSidedTrim(10) + foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(10) + + selfRttsTrimmedCount := selfRttsTrimmed.Len() + foreignRttsTrimmedCount := foreignRttsTrimmed.Len() + + // Then, let's take the mean of those ... + selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage() + foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage() + selfProbeRoundTripTimeP90 := selfRtts.Percentile(90) // The specification indicates that we want to calculate the foreign probes as such: // 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign @@ -572,22 +600,32 @@ timeout: foreignProbeRoundTripTimeP90 := foreignRtts.Percentile(90) // This is 60 because we measure in seconds not ms - rpm := 60.0 / (float64(selfProbeRoundTripTimeP90+foreignProbeRoundTripTimeP90) / 2.0) + p90Rpm := 60.0 / (float64(selfProbeRoundTripTimeP90+foreignProbeRoundTripTimeP90) / 2.0) + meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0) if *debugCliFlag { fmt.Printf( - "Total Load-Generating Round Trips: %d, Total New-Connection Round Trips: %d, P90 LG RTT: %f, P90 NC RTT: %f\n", - selfRtts.Size(), - foreignRtts.Size(), + `Total Self Probes: %d, Total Foreign Probes: %d +Trimmed Self Probes Count: %d, Trimmed Foreign Probes Count: %d +P90 Self RTT: %f, P90 Foreign RTT: %f +Trimmed Mean Self RTT: %f, Trimmed Mean Foreign RTT: %f +`, + selfRttsTotalCount, + foreignRttsTotalCount, + selfRttsTrimmedCount, + foreignRttsTrimmedCount, selfProbeRoundTripTimeP90, foreignProbeRoundTripTimeP90, + selfProbeRoundTripTimeMean, + foreignProbeRoundTripTimeMean, ) } if !testRanToStability { fmt.Printf("Test did not run to stability, these results are estimates:\n") } - fmt.Printf("RPM: %5.0f\n", rpm) + fmt.Printf("P90 RPM: %5.0f\n", p90Rpm) + fmt.Printf("Trimmed Mean RPM: %5.0f\n", meanRpm) fmt.Printf( "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", @@ -40,19 +40,23 @@ func addFlows( lgcc *lgc.LoadGeneratingConnectionCollection, lgcGenerator func() lgc.LoadGeneratingConnection, debug debug.DebugLevel, -) { +) uint64 { lgcc.Lock.Lock() defer lgcc.Lock.Unlock() for i := uint64(0); i < toAdd; i++ { + // First, generate the connection. *lgcc.LGCs = append(*lgcc.LGCs, lgcGenerator()) + // Second, try to start the connection. if !(*lgcc.LGCs)[len(*lgcc.LGCs)-1].Start(ctx, debug) { + // If there was an error, we'll make sure that the caller knows it. fmt.Printf( "Error starting lgc with id %d!\n", (*lgcc.LGCs)[len(*lgcc.LGCs)-1].ClientId(), ) - return + return i } } + return toAdd } type ProbeConfiguration struct { @@ -70,10 +74,12 @@ type ProbeDataPoint struct { } type GranularThroughputDataPoint struct { - Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"` - Throughput float64 `Description:"Instantaneous throughput (B/s)."` - ConnID uint32 `Description:"Position of connection (ID)."` - Direction string `Description:"Direction of Throughput."` + Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"` + Throughput float64 `Description:"Instantaneous throughput (B/s)."` + ConnID uint32 `Description:"Position of connection (ID)."` + TCPRtt time.Duration `Description:"The underlying connection's RTT at probe time." Formatter:"Seconds"` + TCPCwnd uint32 `Description:"The underlying connection's congestion window at probe time."` + Direction string `Description:"Direction of Throughput."` } type ThroughputDataPoint struct { @@ -98,6 +104,15 @@ const ( Foreign ) +type ProbeRoundTripCountType uint16 + +const ( + DefaultDownRoundTripCount ProbeRoundTripCountType = 1 + SelfUpRoundTripCount ProbeRoundTripCountType = 1 + SelfDownRoundTripCount ProbeRoundTripCountType = 1 + ForeignRoundTripCount ProbeRoundTripCountType = 3 +) + func (pt ProbeType) Value() string { if pt == SelfUp { return "SelfUp" @@ -108,13 +123,14 @@ func (pt ProbeType) Value() string { } func Probe( - parentProbeCtx context.Context, + managingCtx context.Context, waitGroup *sync.WaitGroup, client *http.Client, probeUrl string, probeHost string, // optional: for use with a test_endpoint probeType ProbeType, result *chan ProbeDataPoint, + captureExtendedStats bool, debugging *debug.DebugWithPrefix, ) error { @@ -131,7 +147,7 @@ func Probe( probeTracer := NewProbeTracer(client, probeType, probeId, debugging) time_before_probe := time.Now() probe_req, err := http.NewRequestWithContext( - httptrace.WithClientTrace(parentProbeCtx, probeTracer.trace), + httptrace.WithClientTrace(managingCtx, probeTracer.trace), "GET", probeUrl, nil, @@ -195,9 +211,9 @@ func Probe( totalDelay, ) } - roundTripCount := uint64(1) + roundTripCount := DefaultDownRoundTripCount if probeType == Foreign { - roundTripCount = 3 + roundTripCount = ForeignRoundTripCount } // Careful!!! It's possible that this channel has been closed because the Prober that // started it has been stopped. Writing to a closed channel will cause a panic. It might not @@ -218,7 +234,7 @@ func Probe( tcpRtt := time.Duration(0 * time.Second) tcpCwnd := uint32(0) // TODO: Only get the extended stats for a connection if the user has requested them overall. - if extendedstats.ExtendedStatsAvailable() { + if captureExtendedStats && extendedstats.ExtendedStatsAvailable() { tcpInfo, err := extendedstats.GetTCPInfo(probeTracer.stats.ConnInfo.Conn) if err == nil { tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond @@ -229,7 +245,7 @@ func Probe( } dataPoint := ProbeDataPoint{ Time: time_before_probe, - RoundTripCount: roundTripCount, + RoundTripCount: uint64(roundTripCount), Duration: totalDelay, TCPRtt: tcpRtt, TCPCwnd: tcpCwnd, @@ -241,12 +257,14 @@ func Probe( func CombinedProber( proberCtx context.Context, + networkActivityCtx context.Context, foreignProbeConfigurationGenerator func() ProbeConfiguration, selfProbeConfigurationGenerator func() ProbeConfiguration, selfDownProbeConnection lgc.LoadGeneratingConnection, selfUpProbeConnection lgc.LoadGeneratingConnection, probeInterval time.Duration, keyLogger io.Writer, + captureExtendedStats bool, debugging *debug.DebugWithPrefix, ) (dataPoints chan ProbeDataPoint) { @@ -296,31 +314,34 @@ func CombinedProber( foreignProbeClient := &http.Client{Transport: &transport} + // Start Foreign Connection Prober probeCount++ go Probe( - proberCtx, + networkActivityCtx, &wg, foreignProbeClient, foreignProbeConfiguration.URL, foreignProbeConfiguration.Host, Foreign, &dataPoints, + captureExtendedStats, debugging, ) - // Start Download Connection Prober + // Start Self Download Connection Prober go Probe( - proberCtx, + networkActivityCtx, &wg, selfDownProbeConnection.Client(), selfProbeConfiguration.URL, selfProbeConfiguration.Host, SelfDown, &dataPoints, + captureExtendedStats, debugging, ) - // Start Upload Connection Prober + // Start Self Upload Connection Prober go Probe( proberCtx, &wg, @@ -329,6 +350,7 @@ func CombinedProber( selfProbeConfiguration.Host, SelfUp, &dataPoints, + captureExtendedStats, debugging, ) } @@ -356,6 +378,7 @@ func LoadGenerator( rampupInterval time.Duration, lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection. loadGeneratingConnections *lgc.LoadGeneratingConnectionCollection, + captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections? debugging *debug.DebugWithPrefix, // How can we forget debugging? ) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes. throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate. @@ -371,14 +394,13 @@ func LoadGenerator( flowsCreated := uint64(0) - addFlows( + flowsCreated += addFlows( networkActivityCtx, constants.StartingNumberOfLoadGeneratingConnections, loadGeneratingConnections, lgcGenerator, debugging.Level, ) - flowsCreated += constants.StartingNumberOfLoadGeneratingConnections // We have at least a single load-generating channel. This channel will be the one that // the self probes use. Let's send it back to the caller so that they can pass it on if they need to. @@ -425,7 +447,7 @@ func LoadGenerator( ) } // TODO: Do we add null connection to throughput? and how do we define it? Throughput -1 or 0? - granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, 0, uint32(i), ""}) + granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, 0, uint32(i), 0, 0, ""}) continue } allInvalid = false @@ -437,7 +459,21 @@ func LoadGenerator( currentInterval.Seconds(), ) instantaneousTotalThroughput += instantaneousConnectionThroughput - granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, instantaneousConnectionThroughput, uint32(i), ""}) + + tcpRtt := time.Duration(0 * time.Second) + tcpCwnd := uint32(0) + if captureExtendedStats && extendedstats.ExtendedStatsAvailable() { + if stats := (*loadGeneratingConnections.LGCs)[i].Stats(); stats != nil { + tcpInfo, err := extendedstats.GetTCPInfo(stats.ConnInfo.Conn) + if err == nil { + tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond + tcpCwnd = tcpInfo.Snd_cwnd + } else { + fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err) + } + } + } + granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, instantaneousConnectionThroughput, uint32(i), tcpRtt, tcpCwnd, ""}) } // For some reason, all the lgcs are invalid. This likely means that @@ -457,15 +493,13 @@ func LoadGenerator( throughputCalculations <- throughputDataPoint // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now! - addFlows( + flowsCreated += addFlows( networkActivityCtx, constants.AdditiveNumberOfLoadGeneratingConnections, loadGeneratingConnections, lgcGenerator, debugging.Level, ) - - flowsCreated += constants.AdditiveNumberOfLoadGeneratingConnections } if debug.IsDebug(debugging.Level) { diff --git a/stabilizer/rev3.go b/stabilizer/rev3.go index 79f5f27..d0518c0 100644 --- a/stabilizer/rev3.go +++ b/stabilizer/rev3.go @@ -29,7 +29,19 @@ type ThroughputStabilizer DataPointStabilizer // 3: S: The standard deviation cutoff used to determine stability among the K preceding // moving averages of a measurement. -func NewProbeStabilizer(i int, k int, s float64, debugLevel debug.DebugLevel, debug *debug.DebugWithPrefix) ProbeStabilizer { +// Rev3 Stabilizer Algorithm: +// Stabilization is achieved when the standard deviation of a given number of the most recent moving averages of +// instantaneous measurements is within an upper bound. +// +// Yes, that *is* a little confusing: +// The user will deliver us a steady diet of so-called instantaneous measurements. We will keep the I most recent +// of those measurements. Every time that we get a new instantaneous measurement, we will recalculate the moving +// average of the I most instantaneous measurements. We will call that an instantaneous moving average. We keep the K +// most recent instantaneous moving averages. Every time that we calculate a new instantaneous moving average, we will +// calculate the standard deviation of those values. If the calculated standard deviation is less than S, we declare +// stability. + +func NewProbeStabilizer(i uint64, k uint64, s float64, debugLevel debug.DebugLevel, debug *debug.DebugWithPrefix) ProbeStabilizer { return ProbeStabilizer{instantaneousMeasurements: ms.NewCappedMathematicalSeries[float64](i), movingAverages: ms.NewCappedMathematicalSeries[float64](k), stabilityStandardDeviation: s, @@ -57,7 +69,7 @@ func (r3 *ProbeStabilizer) AddMeasurement(measurement rpm.ProbeDataPoint) { "%s: MA: %f ns (previous %d intervals).\n", r3.dbgConfig.String(), r3.movingAverages.CalculateAverage(), - r3.movingAverages.Size(), + r3.movingAverages.Len(), ) } } @@ -96,7 +108,7 @@ func (r3 *ProbeStabilizer) IsStable() bool { return isStable } -func NewThroughputStabilizer(i int, k int, s float64, debugLevel debug.DebugLevel, debug *debug.DebugWithPrefix) ThroughputStabilizer { +func NewThroughputStabilizer(i uint64, k uint64, s float64, debugLevel debug.DebugLevel, debug *debug.DebugWithPrefix) ThroughputStabilizer { return ThroughputStabilizer{instantaneousMeasurements: ms.NewCappedMathematicalSeries[float64](i), movingAverages: ms.NewCappedMathematicalSeries[float64](k), stabilityStandardDeviation: s, @@ -118,7 +130,7 @@ func (r3 *ThroughputStabilizer) AddMeasurement(measurement rpm.ThroughputDataPoi "%s: MA: %f Mbps (previous %d intervals).\n", r3.dbgConfig.String(), r3.movingAverages.CalculateAverage(), - r3.movingAverages.Size(), + r3.movingAverages.Len(), ) } } diff --git a/tools/graphing.py b/tools/graphing.py index 5ba3fbf..eb1e6d7 100644 --- a/tools/graphing.py +++ b/tools/graphing.py @@ -108,7 +108,7 @@ def throughputClean(df): def granularClean(df): - df.columns = ["CreationTime", "Throughput", "ID", "Type", "Empty"] + df.columns = ["CreationTime", "Throughput", "ID", "RTT", "Cwnd", "Type", "Empty"] df = df.drop(columns=["Empty"]) df["CreationTime"] = pd.to_datetime(df["CreationTime"], format="%m-%d-%Y-%H-%M-%S.%f") df["Type"] = df["Type"].apply(str.strip) |
