summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--constants/constants.go6
-rw-r--r--networkQuality.go144
-rw-r--r--rpm/rpm.go80
-rw-r--r--stabilizer/rev3.go20
-rw-r--r--tools/graphing.py2
5 files changed, 168 insertions, 84 deletions
diff --git a/constants/constants.go b/constants/constants.go
index 99de22d..1f3ed57 100644
--- a/constants/constants.go
+++ b/constants/constants.go
@@ -25,10 +25,10 @@ var (
// The number of previous instantaneous measurements to consider when generating the so-called
// instantaneous moving averages of a measurement.
- InstantaneousThroughputMeasurementCount int = 4
- InstantaneousProbeMeasurementCount int = 1
+ InstantaneousThroughputMeasurementCount uint64 = 4
+ InstantaneousProbeMeasurementCount uint64 = 1
// The number of instantaneous moving averages to consider when determining stability.
- InstantaneousMovingAverageStabilityCount int = 4
+ InstantaneousMovingAverageStabilityCount uint64 = 4
// The standard deviation cutoff used to determine stability among the K preceding moving averages
// of a measurement (as a percentage of the mean).
StabilityStandardDeviation float64 = 5.0
diff --git a/networkQuality.go b/networkQuality.go
index c3db256..759d562 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -58,15 +58,10 @@ var (
constants.DefaultDebug,
"Enable debugging.",
)
- sattimeout = flag.Int(
- "sattimeout",
- constants.DefaultTestTime,
- "Maximum time to spend measuring saturation.",
- )
rpmtimeout = flag.Int(
"rpmtimeout",
constants.RPMCalculationTime,
- "Maximum time to spend calculating RPM.",
+ "Maximum time to spend calculating RPM (i.e., total test time.).",
)
sslKeyFileName = flag.String(
"ssl-key-file",
@@ -88,12 +83,17 @@ var (
"",
"Store granular information about tests results in files with this basename. Time and information type will be appended (before the first .) to create separate log files. Disabled by default.",
)
+ probeIntervalTime = flag.Uint(
+ "probe-interval-time",
+ 100,
+ "Time (in ms) between probes (foreign and self).",
+ )
)
func main() {
flag.Parse()
- timeoutDuration := time.Second * time.Duration(*sattimeout)
+ timeoutDuration := time.Second * time.Duration(*rpmtimeout)
timeoutAbsoluteTime := time.Now().Add(timeoutDuration)
configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort)
@@ -102,21 +102,17 @@ func main() {
// the others.
operatingCtx, operatingCtxCancel := context.WithCancel(context.Background())
- // This context is used to control the load generators -- we cancel it when
- // the system has completed its work. (i.e, rpm and saturation are stable).
- // The *operator* contexts control stopping the goroutines that are running
- // the process; the *throughput* contexts control whether the load generators
- // continue to add new connections at every interval.
+ // The operator contexts. These contexts control the processes that manage
+ // network activity but do no control network activity.
+
uploadLoadGeneratorOperatorCtx, uploadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx)
downloadLoadGeneratorOperatorCtx, downloadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx)
+ proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx)
- // This context is used to control the load-generating network activity (i.e., it controls all
+ // This context is used to control the network activity (i.e., it controls all
// the connections that are open to do load generation and probing). Cancelling this context will close
// all the network connections that are responsible for generating the load.
- lgNetworkActivityCtx, lgNetworkActivityCtxCancel := context.WithCancel(operatingCtx)
-
- // This context is used to control the activity of the prober.
- proberCtx, proberCtxCancel := context.WithCancel(operatingCtx)
+ networkActivityCtx, networkActivityCtxCancel := context.WithCancel(operatingCtx)
config := &config.Config{}
var debugLevel debug.DebugLevel = debug.Error
@@ -128,18 +124,18 @@ func main() {
if *calculateExtendedStats && !extendedstats.ExtendedStatsAvailable() {
*calculateExtendedStats = false
fmt.Printf(
- "Warning: Calculation of extended statistics was requested but they are not supported on this platform.\n",
+ "Warning: Calculation of extended statistics was requested but is not supported on this platform.\n",
)
}
var sslKeyFileConcurrentWriter *ccw.ConcurrentWriter = nil
if *sslKeyFileName != "" {
if sslKeyFileHandle, err := os.OpenFile(*sslKeyFileName, os.O_RDWR|os.O_CREATE, os.FileMode(0600)); err != nil {
- fmt.Printf("Could not open the keyfile for writing: %v!\n", err)
+ fmt.Printf("Could not open the requested SSL key logging file for writing: %v!\n", err)
sslKeyFileConcurrentWriter = nil
} else {
if err = utilities.SeekForAppend(sslKeyFileHandle); err != nil {
- fmt.Printf("Could not seek to the end of the key file: %v!\n", err)
+ fmt.Printf("Could not seek to the end of the SSL key logging file: %v!\n", err)
sslKeyFileConcurrentWriter = nil
} else {
if debug.IsDebug(debugLevel) {
@@ -174,7 +170,7 @@ func main() {
debugLevel,
)
if debug.IsDebug(debugLevel) {
- fmt.Printf("Test will end earlier than %v\n", timeoutAbsoluteTime)
+ fmt.Printf("Test will end no later than %v\n", timeoutAbsoluteTime)
}
// print the banner
@@ -190,7 +186,7 @@ func main() {
if err != nil {
fmt.Fprintf(
os.Stderr,
- "Error: Profiling requested with storage in %s but that file could not be opened: %v\n",
+ "Error: Profiling requested but could not open the log file ( %s ) for writing: %v\n",
*profile,
err,
)
@@ -312,6 +308,7 @@ func main() {
KeyLogger: sslKeyFileConcurrentWriter,
}
}
+
generate_lgu := func() lgc.LoadGeneratingConnection {
return &lgc.LoadGeneratingConnectionUpload{
Path: config.Urls.UploadUrl,
@@ -346,33 +343,42 @@ func main() {
// generate additional information!
selfDownProbeConnectionCommunicationChannel, downloadThroughputChannel := rpm.LoadGenerator(
- lgNetworkActivityCtx,
+ networkActivityCtx,
downloadLoadGeneratorOperatorCtx,
time.Second,
generate_lgd,
&downloadLoadGeneratingConnectionCollection,
+ *calculateExtendedStats,
downloadDebugging,
)
selfUpProbeConnectionCommunicationChannel, uploadThroughputChannel := rpm.LoadGenerator(
- lgNetworkActivityCtx,
+ networkActivityCtx,
uploadLoadGeneratorOperatorCtx,
time.Second,
generate_lgu,
&uploadLoadGeneratingConnectionCollection,
+ *calculateExtendedStats,
uploadDebugging,
)
+ // Handles for the first connection that the load-generating go routines (both up and
+ // download) open are passed because on the self[Down|Up]ProbeConnectionCommunicationChannel
+ // so that we can then start probes on those handles.
selfDownProbeConnection := <-selfDownProbeConnectionCommunicationChannel
selfUpProbeConnection := <-selfUpProbeConnectionCommunicationChannel
+ // The combined prober will handle launching, monitoring, etc of *both* the self and foreign
+ // probes.
probeDataPointsChannel := rpm.CombinedProber(
- proberCtx,
+ proberOperatorCtx,
+ networkActivityCtx,
generateForeignProbeConfiguration,
generateSelfProbeConfiguration,
selfDownProbeConnection,
selfUpProbeConnection,
- time.Millisecond*100,
+ time.Millisecond*(time.Duration(*probeIntervalTime)),
sslKeyFileConcurrentWriter,
+ *calculateExtendedStats,
combinedProbeDebugging,
)
@@ -386,6 +392,7 @@ func main() {
// 2. K: The number of instantaneous moving averages to consider when determining stability.
// 3: S: The standard deviation cutoff used to determine stability among the K preceding
// moving averages of a measurement.
+ // See
throughputI := constants.InstantaneousThroughputMeasurementCount
probeI := constants.InstantaneousProbeMeasurementCount
@@ -479,6 +486,9 @@ timeout:
"################# Responsiveness is instantaneously %s.\n", utilities.Conditional(responsivenessIsStable, "stable", "unstable"))
}
if probeMeasurement.Type == rpm.Foreign {
+ // There may be more than one round trip accumulated together. If that is the case,
+ // we will blow them apart in to three separate measurements and each one will just
+ // be 1 / measurement.RoundTripCount of the total length.
for range utilities.Iota(0, int(probeMeasurement.RoundTripCount)) {
foreignRtts.AddElement(probeMeasurement.Duration.Seconds() / float64(probeMeasurement.RoundTripCount))
@@ -487,10 +497,6 @@ timeout:
selfRtts.AddElement(probeMeasurement.Duration.Seconds())
}
- // There may be more than one round trip accumulated together. If that is the case,
- // we will blow them apart in to three separate measurements and each one will just
- // be 1 / measurement.RoundTripCount of the total length.
-
if probeMeasurement.Type == rpm.Foreign {
foreignProbeDataLogger.LogRecord(probeMeasurement)
} else if probeMeasurement.Type == rpm.SelfDown || probeMeasurement.Type == rpm.SelfUp {
@@ -523,8 +529,8 @@ timeout:
-- proberCtx
*/
- // First, stop the load generators and the probes
- proberCtxCancel()
+ // First, stop the load generator and the probe operators (but *not* the network activity)
+ proberOperatorCtxCancel()
downloadLoadGeneratorOperatorCtxCancel()
uploadLoadGeneratorOperatorCtxCancel()
@@ -533,35 +539,57 @@ timeout:
extendedStats := extendedstats.AggregateExtendedStats{}
if *calculateExtendedStats {
if extendedstats.ExtendedStatsAvailable() {
- downloadLoadGeneratingConnectionCollection.Lock.Lock()
- for i := 0; i < len(*downloadLoadGeneratingConnectionCollection.LGCs); i++ {
- // Assume that extended statistics are available -- the check was done explicitly at
- // program startup if the calculateExtendedStats flag was set by the user on the command line.
- if err := extendedStats.IncorporateConnectionStats((*downloadLoadGeneratingConnectionCollection.LGCs)[i].Stats().ConnInfo.Conn); err != nil {
- fmt.Fprintf(
- os.Stderr,
- "Warning: Could not add extended stats for the connection: %v\n",
- err,
- )
- }
- }
- downloadLoadGeneratingConnectionCollection.Lock.Unlock()
+ func() {
+ // Put inside an IIFE so that we can use a defer!
+ downloadLoadGeneratingConnectionCollection.Lock.Lock()
+ defer downloadLoadGeneratingConnectionCollection.Lock.Unlock()
- // We do not trace upload connections!
+ // Note: We do not trace upload connections!
+ for i := 0; i < len(*downloadLoadGeneratingConnectionCollection.LGCs); i++ {
+ // Assume that extended statistics are available -- the check was done explicitly at
+ // program startup if the calculateExtendedStats flag was set by the user on the command line.
+ if err := extendedStats.IncorporateConnectionStats((*downloadLoadGeneratingConnectionCollection.LGCs)[i].Stats().ConnInfo.Conn); err != nil {
+ fmt.Fprintf(
+ os.Stderr,
+ "Warning: Could not add extended stats for the connection: %v\n",
+ err,
+ )
+ }
+ }
+ }()
} else {
// TODO: Should we just log here?
panic("Extended stats are not available but the user requested their calculation.")
}
}
- // Third, stop the network connections opened by the load generators.
- lgNetworkActivityCtxCancel()
+ // Third, stop the network connections opened by the load generators and probers.
+ networkActivityCtxCancel()
// Finally, stop the world.
operatingCtxCancel()
// Calculate the RPM
+ // First, let's do a double-sided trim of the top/bottom 10% of our measurements.
+
+ if *debugCliFlag {
+ fmt.Printf("")
+ }
+
+ selfRttsTotalCount := selfRtts.Len()
+ foreignRttsTotalCount := foreignRtts.Len()
+
+ selfRttsTrimmed := selfRtts.DoubleSidedTrim(10)
+ foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(10)
+
+ selfRttsTrimmedCount := selfRttsTrimmed.Len()
+ foreignRttsTrimmedCount := foreignRttsTrimmed.Len()
+
+ // Then, let's take the mean of those ...
+ selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage()
+ foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage()
+
selfProbeRoundTripTimeP90 := selfRtts.Percentile(90)
// The specification indicates that we want to calculate the foreign probes as such:
// 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign
@@ -572,22 +600,32 @@ timeout:
foreignProbeRoundTripTimeP90 := foreignRtts.Percentile(90)
// This is 60 because we measure in seconds not ms
- rpm := 60.0 / (float64(selfProbeRoundTripTimeP90+foreignProbeRoundTripTimeP90) / 2.0)
+ p90Rpm := 60.0 / (float64(selfProbeRoundTripTimeP90+foreignProbeRoundTripTimeP90) / 2.0)
+ meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0)
if *debugCliFlag {
fmt.Printf(
- "Total Load-Generating Round Trips: %d, Total New-Connection Round Trips: %d, P90 LG RTT: %f, P90 NC RTT: %f\n",
- selfRtts.Size(),
- foreignRtts.Size(),
+ `Total Self Probes: %d, Total Foreign Probes: %d
+Trimmed Self Probes Count: %d, Trimmed Foreign Probes Count: %d
+P90 Self RTT: %f, P90 Foreign RTT: %f
+Trimmed Mean Self RTT: %f, Trimmed Mean Foreign RTT: %f
+`,
+ selfRttsTotalCount,
+ foreignRttsTotalCount,
+ selfRttsTrimmedCount,
+ foreignRttsTrimmedCount,
selfProbeRoundTripTimeP90,
foreignProbeRoundTripTimeP90,
+ selfProbeRoundTripTimeMean,
+ foreignProbeRoundTripTimeMean,
)
}
if !testRanToStability {
fmt.Printf("Test did not run to stability, these results are estimates:\n")
}
- fmt.Printf("RPM: %5.0f\n", rpm)
+ fmt.Printf("P90 RPM: %5.0f\n", p90Rpm)
+ fmt.Printf("Trimmed Mean RPM: %5.0f\n", meanRpm)
fmt.Printf(
"Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
diff --git a/rpm/rpm.go b/rpm/rpm.go
index eb10ec2..bf1270a 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -40,19 +40,23 @@ func addFlows(
lgcc *lgc.LoadGeneratingConnectionCollection,
lgcGenerator func() lgc.LoadGeneratingConnection,
debug debug.DebugLevel,
-) {
+) uint64 {
lgcc.Lock.Lock()
defer lgcc.Lock.Unlock()
for i := uint64(0); i < toAdd; i++ {
+ // First, generate the connection.
*lgcc.LGCs = append(*lgcc.LGCs, lgcGenerator())
+ // Second, try to start the connection.
if !(*lgcc.LGCs)[len(*lgcc.LGCs)-1].Start(ctx, debug) {
+ // If there was an error, we'll make sure that the caller knows it.
fmt.Printf(
"Error starting lgc with id %d!\n",
(*lgcc.LGCs)[len(*lgcc.LGCs)-1].ClientId(),
)
- return
+ return i
}
}
+ return toAdd
}
type ProbeConfiguration struct {
@@ -70,10 +74,12 @@ type ProbeDataPoint struct {
}
type GranularThroughputDataPoint struct {
- Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"`
- Throughput float64 `Description:"Instantaneous throughput (B/s)."`
- ConnID uint32 `Description:"Position of connection (ID)."`
- Direction string `Description:"Direction of Throughput."`
+ Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"`
+ Throughput float64 `Description:"Instantaneous throughput (B/s)."`
+ ConnID uint32 `Description:"Position of connection (ID)."`
+ TCPRtt time.Duration `Description:"The underlying connection's RTT at probe time." Formatter:"Seconds"`
+ TCPCwnd uint32 `Description:"The underlying connection's congestion window at probe time."`
+ Direction string `Description:"Direction of Throughput."`
}
type ThroughputDataPoint struct {
@@ -98,6 +104,15 @@ const (
Foreign
)
+type ProbeRoundTripCountType uint16
+
+const (
+ DefaultDownRoundTripCount ProbeRoundTripCountType = 1
+ SelfUpRoundTripCount ProbeRoundTripCountType = 1
+ SelfDownRoundTripCount ProbeRoundTripCountType = 1
+ ForeignRoundTripCount ProbeRoundTripCountType = 3
+)
+
func (pt ProbeType) Value() string {
if pt == SelfUp {
return "SelfUp"
@@ -108,13 +123,14 @@ func (pt ProbeType) Value() string {
}
func Probe(
- parentProbeCtx context.Context,
+ managingCtx context.Context,
waitGroup *sync.WaitGroup,
client *http.Client,
probeUrl string,
probeHost string, // optional: for use with a test_endpoint
probeType ProbeType,
result *chan ProbeDataPoint,
+ captureExtendedStats bool,
debugging *debug.DebugWithPrefix,
) error {
@@ -131,7 +147,7 @@ func Probe(
probeTracer := NewProbeTracer(client, probeType, probeId, debugging)
time_before_probe := time.Now()
probe_req, err := http.NewRequestWithContext(
- httptrace.WithClientTrace(parentProbeCtx, probeTracer.trace),
+ httptrace.WithClientTrace(managingCtx, probeTracer.trace),
"GET",
probeUrl,
nil,
@@ -195,9 +211,9 @@ func Probe(
totalDelay,
)
}
- roundTripCount := uint64(1)
+ roundTripCount := DefaultDownRoundTripCount
if probeType == Foreign {
- roundTripCount = 3
+ roundTripCount = ForeignRoundTripCount
}
// Careful!!! It's possible that this channel has been closed because the Prober that
// started it has been stopped. Writing to a closed channel will cause a panic. It might not
@@ -218,7 +234,7 @@ func Probe(
tcpRtt := time.Duration(0 * time.Second)
tcpCwnd := uint32(0)
// TODO: Only get the extended stats for a connection if the user has requested them overall.
- if extendedstats.ExtendedStatsAvailable() {
+ if captureExtendedStats && extendedstats.ExtendedStatsAvailable() {
tcpInfo, err := extendedstats.GetTCPInfo(probeTracer.stats.ConnInfo.Conn)
if err == nil {
tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond
@@ -229,7 +245,7 @@ func Probe(
}
dataPoint := ProbeDataPoint{
Time: time_before_probe,
- RoundTripCount: roundTripCount,
+ RoundTripCount: uint64(roundTripCount),
Duration: totalDelay,
TCPRtt: tcpRtt,
TCPCwnd: tcpCwnd,
@@ -241,12 +257,14 @@ func Probe(
func CombinedProber(
proberCtx context.Context,
+ networkActivityCtx context.Context,
foreignProbeConfigurationGenerator func() ProbeConfiguration,
selfProbeConfigurationGenerator func() ProbeConfiguration,
selfDownProbeConnection lgc.LoadGeneratingConnection,
selfUpProbeConnection lgc.LoadGeneratingConnection,
probeInterval time.Duration,
keyLogger io.Writer,
+ captureExtendedStats bool,
debugging *debug.DebugWithPrefix,
) (dataPoints chan ProbeDataPoint) {
@@ -296,31 +314,34 @@ func CombinedProber(
foreignProbeClient := &http.Client{Transport: &transport}
+ // Start Foreign Connection Prober
probeCount++
go Probe(
- proberCtx,
+ networkActivityCtx,
&wg,
foreignProbeClient,
foreignProbeConfiguration.URL,
foreignProbeConfiguration.Host,
Foreign,
&dataPoints,
+ captureExtendedStats,
debugging,
)
- // Start Download Connection Prober
+ // Start Self Download Connection Prober
go Probe(
- proberCtx,
+ networkActivityCtx,
&wg,
selfDownProbeConnection.Client(),
selfProbeConfiguration.URL,
selfProbeConfiguration.Host,
SelfDown,
&dataPoints,
+ captureExtendedStats,
debugging,
)
- // Start Upload Connection Prober
+ // Start Self Upload Connection Prober
go Probe(
proberCtx,
&wg,
@@ -329,6 +350,7 @@ func CombinedProber(
selfProbeConfiguration.Host,
SelfUp,
&dataPoints,
+ captureExtendedStats,
debugging,
)
}
@@ -356,6 +378,7 @@ func LoadGenerator(
rampupInterval time.Duration,
lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection.
loadGeneratingConnections *lgc.LoadGeneratingConnectionCollection,
+ captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections?
debugging *debug.DebugWithPrefix, // How can we forget debugging?
) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes.
throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate.
@@ -371,14 +394,13 @@ func LoadGenerator(
flowsCreated := uint64(0)
- addFlows(
+ flowsCreated += addFlows(
networkActivityCtx,
constants.StartingNumberOfLoadGeneratingConnections,
loadGeneratingConnections,
lgcGenerator,
debugging.Level,
)
- flowsCreated += constants.StartingNumberOfLoadGeneratingConnections
// We have at least a single load-generating channel. This channel will be the one that
// the self probes use. Let's send it back to the caller so that they can pass it on if they need to.
@@ -425,7 +447,7 @@ func LoadGenerator(
)
}
// TODO: Do we add null connection to throughput? and how do we define it? Throughput -1 or 0?
- granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, 0, uint32(i), ""})
+ granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, 0, uint32(i), 0, 0, ""})
continue
}
allInvalid = false
@@ -437,7 +459,21 @@ func LoadGenerator(
currentInterval.Seconds(),
)
instantaneousTotalThroughput += instantaneousConnectionThroughput
- granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, instantaneousConnectionThroughput, uint32(i), ""})
+
+ tcpRtt := time.Duration(0 * time.Second)
+ tcpCwnd := uint32(0)
+ if captureExtendedStats && extendedstats.ExtendedStatsAvailable() {
+ if stats := (*loadGeneratingConnections.LGCs)[i].Stats(); stats != nil {
+ tcpInfo, err := extendedstats.GetTCPInfo(stats.ConnInfo.Conn)
+ if err == nil {
+ tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond
+ tcpCwnd = tcpInfo.Snd_cwnd
+ } else {
+ fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err)
+ }
+ }
+ }
+ granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, instantaneousConnectionThroughput, uint32(i), tcpRtt, tcpCwnd, ""})
}
// For some reason, all the lgcs are invalid. This likely means that
@@ -457,15 +493,13 @@ func LoadGenerator(
throughputCalculations <- throughputDataPoint
// Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now!
- addFlows(
+ flowsCreated += addFlows(
networkActivityCtx,
constants.AdditiveNumberOfLoadGeneratingConnections,
loadGeneratingConnections,
lgcGenerator,
debugging.Level,
)
-
- flowsCreated += constants.AdditiveNumberOfLoadGeneratingConnections
}
if debug.IsDebug(debugging.Level) {
diff --git a/stabilizer/rev3.go b/stabilizer/rev3.go
index 79f5f27..d0518c0 100644
--- a/stabilizer/rev3.go
+++ b/stabilizer/rev3.go
@@ -29,7 +29,19 @@ type ThroughputStabilizer DataPointStabilizer
// 3: S: The standard deviation cutoff used to determine stability among the K preceding
// moving averages of a measurement.
-func NewProbeStabilizer(i int, k int, s float64, debugLevel debug.DebugLevel, debug *debug.DebugWithPrefix) ProbeStabilizer {
+// Rev3 Stabilizer Algorithm:
+// Stabilization is achieved when the standard deviation of a given number of the most recent moving averages of
+// instantaneous measurements is within an upper bound.
+//
+// Yes, that *is* a little confusing:
+// The user will deliver us a steady diet of so-called instantaneous measurements. We will keep the I most recent
+// of those measurements. Every time that we get a new instantaneous measurement, we will recalculate the moving
+// average of the I most instantaneous measurements. We will call that an instantaneous moving average. We keep the K
+// most recent instantaneous moving averages. Every time that we calculate a new instantaneous moving average, we will
+// calculate the standard deviation of those values. If the calculated standard deviation is less than S, we declare
+// stability.
+
+func NewProbeStabilizer(i uint64, k uint64, s float64, debugLevel debug.DebugLevel, debug *debug.DebugWithPrefix) ProbeStabilizer {
return ProbeStabilizer{instantaneousMeasurements: ms.NewCappedMathematicalSeries[float64](i),
movingAverages: ms.NewCappedMathematicalSeries[float64](k),
stabilityStandardDeviation: s,
@@ -57,7 +69,7 @@ func (r3 *ProbeStabilizer) AddMeasurement(measurement rpm.ProbeDataPoint) {
"%s: MA: %f ns (previous %d intervals).\n",
r3.dbgConfig.String(),
r3.movingAverages.CalculateAverage(),
- r3.movingAverages.Size(),
+ r3.movingAverages.Len(),
)
}
}
@@ -96,7 +108,7 @@ func (r3 *ProbeStabilizer) IsStable() bool {
return isStable
}
-func NewThroughputStabilizer(i int, k int, s float64, debugLevel debug.DebugLevel, debug *debug.DebugWithPrefix) ThroughputStabilizer {
+func NewThroughputStabilizer(i uint64, k uint64, s float64, debugLevel debug.DebugLevel, debug *debug.DebugWithPrefix) ThroughputStabilizer {
return ThroughputStabilizer{instantaneousMeasurements: ms.NewCappedMathematicalSeries[float64](i),
movingAverages: ms.NewCappedMathematicalSeries[float64](k),
stabilityStandardDeviation: s,
@@ -118,7 +130,7 @@ func (r3 *ThroughputStabilizer) AddMeasurement(measurement rpm.ThroughputDataPoi
"%s: MA: %f Mbps (previous %d intervals).\n",
r3.dbgConfig.String(),
r3.movingAverages.CalculateAverage(),
- r3.movingAverages.Size(),
+ r3.movingAverages.Len(),
)
}
}
diff --git a/tools/graphing.py b/tools/graphing.py
index 5ba3fbf..eb1e6d7 100644
--- a/tools/graphing.py
+++ b/tools/graphing.py
@@ -108,7 +108,7 @@ def throughputClean(df):
def granularClean(df):
- df.columns = ["CreationTime", "Throughput", "ID", "Type", "Empty"]
+ df.columns = ["CreationTime", "Throughput", "ID", "RTT", "Cwnd", "Type", "Empty"]
df = df.drop(columns=["Empty"])
df["CreationTime"] = pd.to_datetime(df["CreationTime"], format="%m-%d-%Y-%H-%M-%S.%f")
df["Type"] = df["Type"].apply(str.strip)