summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2022-10-03 08:28:41 -0400
committerGitHub <[email protected]>2022-10-03 08:28:41 -0400
commit1ccd544faa96941a1be93e2cdd4874d3f1d9e82c (patch)
treeb4456e927c12a42c986e825cd265155d9e6bab8b
parenta4cabcf65b099b67747569b33fe43c345ea317ad (diff)
parentcc59a89fc3ac1fbdb16e417de01beecbd49616f0 (diff)
Merge pull request #37 from network-quality/Enhance-Throughput-Logging
[Feature] Extend Throughput Logging
-rw-r--r--extendedstats/darwin.go6
-rw-r--r--extendedstats/unix.go6
-rw-r--r--extendedstats/windows.go6
-rw-r--r--networkQuality.go181
-rw-r--r--rpm/rpm.go80
5 files changed, 195 insertions, 84 deletions
diff --git a/extendedstats/darwin.go b/extendedstats/darwin.go
index 0f52be5..17298c9 100644
--- a/extendedstats/darwin.go
+++ b/extendedstats/darwin.go
@@ -12,7 +12,7 @@ import (
"golang.org/x/sys/unix"
)
-type ExtendedStats struct {
+type AggregateExtendedStats struct {
Maxseg uint64
MaxSendMss uint64
MaxRecvMss uint64
@@ -38,7 +38,7 @@ type TCPInfo struct {
Snd_cwnd uint32
}
-func (es *ExtendedStats) IncorporateConnectionStats(basicConn net.Conn) error {
+func (es *AggregateExtendedStats) IncorporateConnectionStats(basicConn net.Conn) error {
if info, err := GetTCPInfo(basicConn); err != nil {
return fmt.Errorf("OOPS: Could not get the TCP info for the connection: %v", err)
} else {
@@ -54,7 +54,7 @@ func (es *ExtendedStats) IncorporateConnectionStats(basicConn net.Conn) error {
return nil
}
-func (es *ExtendedStats) Repr() string {
+func (es *AggregateExtendedStats) Repr() string {
return fmt.Sprintf(`Extended Statistics:
Maximum Segment Size: %v
Total Bytes Retransmitted: %v
diff --git a/extendedstats/unix.go b/extendedstats/unix.go
index 3db94fc..270a956 100644
--- a/extendedstats/unix.go
+++ b/extendedstats/unix.go
@@ -12,7 +12,7 @@ import (
"golang.org/x/sys/unix"
)
-type ExtendedStats struct {
+type AggregateExtendedStats struct {
MaxPathMtu uint64
MaxSendMss uint64
MaxRecvMss uint64
@@ -27,7 +27,7 @@ func ExtendedStatsAvailable() bool {
return true
}
-func (es *ExtendedStats) IncorporateConnectionStats(basicConn net.Conn) error {
+func (es *AggregateExtendedStats) IncorporateConnectionStats(basicConn net.Conn) error {
if info, err := GetTCPInfo(basicConn); err != nil {
return fmt.Errorf("OOPS: Could not get the TCP info for the connection: %v", err)
} else {
@@ -44,7 +44,7 @@ func (es *ExtendedStats) IncorporateConnectionStats(basicConn net.Conn) error {
return nil
}
-func (es *ExtendedStats) Repr() string {
+func (es *AggregateExtendedStats) Repr() string {
return fmt.Sprintf(`Extended Statistics:
Maximum Path MTU: %v
Maximum Send MSS: %v
diff --git a/extendedstats/windows.go b/extendedstats/windows.go
index 5c0fbd9..f38ffbd 100644
--- a/extendedstats/windows.go
+++ b/extendedstats/windows.go
@@ -13,7 +13,7 @@ import (
"golang.org/x/sys/windows"
)
-type ExtendedStats struct {
+type AggregateExtendedStats struct {
MaxMss uint64
TotalBytesSent uint64
TotalBytesReceived uint64
@@ -106,7 +106,7 @@ func ExtendedStatsAvailable() bool {
return true
}
-func (es *ExtendedStats) IncorporateConnectionStats(basicConn net.Conn) error {
+func (es *AggregateExtendedStats) IncorporateConnectionStats(basicConn net.Conn) error {
if info, err := getTCPInfoRaw(basicConn); err != nil {
return fmt.Errorf("OOPS: Could not get the TCP info for the connection: %v", err)
} else {
@@ -124,7 +124,7 @@ func (es *ExtendedStats) IncorporateConnectionStats(basicConn net.Conn) error {
return nil
}
-func (es *ExtendedStats) Repr() string {
+func (es *AggregateExtendedStats) Repr() string {
return fmt.Sprintf(`Extended Statistics:
Maximum Segment Size: %v
Total Bytes Retransmitted: %v
diff --git a/networkQuality.go b/networkQuality.go
index 8cbf148..de10d54 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -94,13 +94,21 @@ func main() {
timeoutDuration := time.Second * time.Duration(*sattimeout)
timeoutAbsoluteTime := time.Now().Add(timeoutDuration)
configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort)
+
+ // This is the overall operating context of the program. All other
+ // contexts descend from this one. Canceling this one cancels all
+ // the others.
operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background())
- lgDataCollectionCtx, cancelLGDataCollectionCtx := context.WithCancel(
- context.Background(),
- )
- foreignProbertCtx, foreignProberCtxCancel := context.WithCancel(
- context.Background(),
- )
+
+ //
+ lgDataCollectionCtx, cancelLGDataCollectionCtx := context.WithCancel(operatingCtx)
+
+ // This context is used to control the load-generating network activity (i.e., all
+ // the connections that are open to do load generation).
+ lgNetworkActivityCtx, cancelLgNetworkActivityCtx := context.WithCancel(operatingCtx)
+
+ // This context is used to control the activity of the foreign prober.
+ foreignProbertCtx, foreignProberCtxCancel := context.WithCancel(operatingCtx)
config := &config.Config{}
var debugLevel debug.DebugLevel = debug.Error
@@ -253,7 +261,7 @@ func main() {
/*
* Create (and then, ironically, name) two anonymous functions that, when invoked,
- * will create load-generating connections for upload/download/
+ * will create load-generating connections for upload/download
*/
generate_lgd := func() lgc.LoadGeneratingConnection {
return &lgc.LoadGeneratingConnectionDownload{
@@ -292,16 +300,18 @@ func main() {
// data collection go routines stops well before the other, they will continue to send probes and we can
// generate additional information!
- downloadDataCollectionChannel := rpm.LGCollectData(
+ downloadSaturationComplete, downloadDataCollectionChannel := rpm.LGCollectData(
lgDataCollectionCtx,
+ lgNetworkActivityCtx,
operatingCtx,
generate_lgd,
generateSelfProbeConfiguration,
downloadThroughputDataLogger,
downloadDebugging,
)
- uploadDataCollectionChannel := rpm.LGCollectData(
+ uploadSaturationComplete, uploadDataCollectionChannel := rpm.LGCollectData(
lgDataCollectionCtx,
+ lgNetworkActivityCtx,
operatingCtx,
generate_lgu,
generateSelfProbeConfiguration,
@@ -317,43 +327,29 @@ func main() {
)
dataCollectionTimeout := false
- uploadDataCollectionComplete := false
- downloadDataCollectionComplete := false
+ uploadDataGenerationComplete := false
+ downloadDataGenerationComplete := false
downloadDataCollectionResult := rpm.SelfDataCollectionResult{}
uploadDataCollectionResult := rpm.SelfDataCollectionResult{}
- for !(uploadDataCollectionComplete && downloadDataCollectionComplete) {
+ for !(uploadDataGenerationComplete && downloadDataGenerationComplete) {
select {
- case downloadDataCollectionResult = <-downloadDataCollectionChannel:
+ case fullyComplete := <-downloadSaturationComplete:
{
- downloadDataCollectionComplete = true
+ downloadDataGenerationComplete = true
if *debugCliFlag {
fmt.Printf(
- "################# download load-generating data collection is %s complete (%fMBps, %d flows)!\n",
- utilities.Conditional(
- dataCollectionTimeout,
- "(provisionally)",
- "",
- ),
- utilities.ToMBps(downloadDataCollectionResult.RateBps),
- len(downloadDataCollectionResult.LGCs),
- )
+ "################# download load-generating data generation is %s complete!\n",
+ utilities.Conditional(fullyComplete, "", "(provisionally)"))
}
}
- case uploadDataCollectionResult = <-uploadDataCollectionChannel:
+ case fullyComplete := <-uploadSaturationComplete:
{
- uploadDataCollectionComplete = true
+ uploadDataGenerationComplete = true
if *debugCliFlag {
fmt.Printf(
- "################# upload load-generating data collection is %s complete (%fMBps, %d flows)!\n",
- utilities.Conditional(
- dataCollectionTimeout,
- "(provisionally)",
- "",
- ),
- utilities.ToMBps(uploadDataCollectionResult.RateBps),
- len(uploadDataCollectionResult.LGCs),
- )
+ "################# upload load-generating data generation is %s complete!\n",
+ utilities.Conditional(fullyComplete, "", "(provisionally)"))
}
}
case <-timeoutChannel:
@@ -370,15 +366,15 @@ func main() {
if *debugCliFlag {
time.Sleep(constants.CooldownPeriod)
}
- return
+ return // Ends program
}
dataCollectionTimeout = true
// We timed out attempting to collect data about the link. So, we will
- // shut down all the collection xfers
+ // shut down the generators
cancelLGDataCollectionCtx()
// and then we will give ourselves some additional time in order
- // to complete provisional data collection.
+ // to see if we can get some provisional data.
timeoutAbsoluteTime = time.Now().
Add(time.Second * time.Duration(*rpmtimeout))
timeoutChannel = timeoutat.TimeoutAt(
@@ -395,31 +391,98 @@ func main() {
}
}
- // Shutdown the new-connection prober!
+ if *debugCliFlag {
+ fmt.Printf("Stopping all the load generating data generators.\n")
+ }
+ // Just cancel the data collection -- do *not* yet stop the actual load-generating
+ // network activity.
+ cancelLGDataCollectionCtx()
+
+ // Shutdown the foreign-connection prober!
+ if *debugCliFlag {
+ fmt.Printf("Stopping all foreign probers.\n")
+ }
foreignProberCtxCancel()
+ // Now that we stopped generation, let's give ourselves some time to collect
+ // all the data from our data generators.
+ timeoutAbsoluteTime = time.Now().
+ Add(time.Second * time.Duration(*rpmtimeout))
+ timeoutChannel = timeoutat.TimeoutAt(
+ operatingCtx,
+ timeoutAbsoluteTime,
+ debugLevel,
+ )
+
+ // Now that we have generated the data, let's collect it.
+ downloadDataCollectionComplete := false
+ uploadDataCollectionComplete := false
+ for !(downloadDataCollectionComplete && uploadDataCollectionComplete) {
+ select {
+ case downloadDataCollectionResult = <-downloadDataCollectionChannel:
+ {
+ downloadDataCollectionComplete = true
+ if *debugCliFlag {
+ fmt.Printf(
+ "################# download load-generating data collection is complete (%fMBps, %d flows)!\n",
+ utilities.ToMBps(downloadDataCollectionResult.RateBps),
+ len(downloadDataCollectionResult.LGCs),
+ )
+ }
+ }
+ case uploadDataCollectionResult = <-uploadDataCollectionChannel:
+ {
+ uploadDataCollectionComplete = true
+ if *debugCliFlag {
+ fmt.Printf(
+ "################# upload load-generating data collection is complete (%fMBps, %d flows)!\n",
+ utilities.ToMBps(uploadDataCollectionResult.RateBps),
+ len(uploadDataCollectionResult.LGCs),
+ )
+ }
+ }
+ case <-timeoutChannel:
+ {
+ // This is just bad news -- we generated data but could not collect it. Let's just fail.
+
+ fmt.Fprint(
+ os.Stderr,
+ "Error: Load-Generating data collection could not be completed in time and no provisional data could be gathered. Test failed.\n",
+ )
+ return // Ends program
+ }
+ }
+ }
+
// In the new version we are no longer going to wait to send probes until after
// saturation. When we get here we are now only going to compute the results
// and/or extended statistics!
- extendedStats := extendedstats.ExtendedStats{}
+ extendedStats := extendedstats.AggregateExtendedStats{}
- for i := 0; i < len(downloadDataCollectionResult.LGCs); i++ {
- // Assume that extended statistics are available -- the check was done explicitly at
- // program startup if the calculateExtendedStats flag was set by the user on the command line.
- if *calculateExtendedStats {
- if !extendedstats.ExtendedStatsAvailable() {
- panic("Extended stats are not available but the user requested their calculation.")
- }
- if err := extendedStats.IncorporateConnectionStats(downloadDataCollectionResult.LGCs[i].Stats().ConnInfo.Conn); err != nil {
- fmt.Fprintf(
- os.Stderr,
- "Warning: Could not add extended stats for the connection: %v\n",
- err,
- )
+ if *calculateExtendedStats {
+ if extendedstats.ExtendedStatsAvailable() {
+ for i := 0; i < len(downloadDataCollectionResult.LGCs); i++ {
+ // Assume that extended statistics are available -- the check was done explicitly at
+ // program startup if the calculateExtendedStats flag was set by the user on the command line.
+ if err := extendedStats.IncorporateConnectionStats(downloadDataCollectionResult.LGCs[i].Stats().ConnInfo.Conn); err != nil {
+ fmt.Fprintf(
+ os.Stderr,
+ "Warning: Could not add extended stats for the connection: %v\n",
+ err,
+ )
+ }
}
+ } else {
+ // TODO: Should we just log here?
+ panic("Extended stats are not available but the user requested their calculation.")
}
}
+
+ // And only now, when we are done getting the extended stats from the connections, can
+ // we actually shut down the load-generating network activity!
+ cancelLgNetworkActivityCtx()
+
fmt.Printf(
"Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
utilities.ToMbps(downloadDataCollectionResult.RateBps),
@@ -435,6 +498,16 @@ func main() {
foreignProbeDataPoints := utilities.ChannelToSlice(foreignProbeDataPointsChannel)
totalForeignRoundTrips := len(foreignProbeDataPoints)
+ // The specification indicates that we want to calculate the foreign probes as such:
+ // 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign
+ // where tcp_foreign, tls_foreign, http_foreign are the P90 RTTs for the connection
+ // of the tcp, tls and http connections, respectively. However, we cannot break out
+ // the individual RTTs so we assume that they are roughly equal. Call that _foreign:
+ // 1/3*_foreign + 1/3*_foreign + 1/3*_foreign =
+ // 1/3*(3*_foreign) =
+ // _foreign
+ // So, there's no need to divide by the number of RTTs defined in the ProbeDataPoints
+ // in the individual results.
foreignProbeRoundTripTimes := utilities.Fmap(
foreignProbeDataPoints,
func(dp rpm.ProbeDataPoint) float64 { return dp.Duration.Seconds() },
@@ -442,11 +515,11 @@ func main() {
foreignProbeRoundTripTimeP90 := utilities.CalculatePercentile(foreignProbeRoundTripTimes, 90)
downloadRoundTripTimes := utilities.Fmap(
- downloadDataCollectionResult.DataPoints,
+ downloadDataCollectionResult.ProbeDataPoints,
func(dcr rpm.ProbeDataPoint) float64 { return dcr.Duration.Seconds() },
)
uploadRoundTripTimes := utilities.Fmap(
- uploadDataCollectionResult.DataPoints,
+ uploadDataCollectionResult.ProbeDataPoints,
func(dcr rpm.ProbeDataPoint) float64 { return dcr.Duration.Seconds() },
)
selfProbeRoundTripTimes := append(downloadRoundTripTimes, uploadRoundTripTimes...)
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 61c6658..eab7b90 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -75,9 +75,10 @@ type ThroughputDataPoint struct {
}
type SelfDataCollectionResult struct {
- RateBps float64
- LGCs []lgc.LoadGeneratingConnection
- DataPoints []ProbeDataPoint
+ RateBps float64
+ LGCs []lgc.LoadGeneratingConnection
+ ProbeDataPoints []ProbeDataPoint
+ LoggingContinuation func()
}
type ProbeType int64
@@ -312,6 +313,7 @@ func SelfProber(
debugging = debug.NewDebugWithPrefix(debugging.Level, debugging.Prefix+" self probe")
go func() {
+ wg := sync.WaitGroup{}
probeCount := 0
for proberCtx.Err() == nil {
time.Sleep(selfProbeConfiguration.Interval)
@@ -329,7 +331,7 @@ func SelfProber(
// yet.
go Probe(
proberCtx,
- nil,
+ &wg,
selfProbeConfiguration.DataLogger,
defaultConnection.Client(),
selfProbeConfiguration.URL,
@@ -340,7 +342,14 @@ func SelfProber(
}
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) self probing driver is stopping after sending %d probes.\n",
+ "(%s) Self probe driver is going to start waiting for its probes to finish.\n",
+ debugging.Prefix,
+ )
+ }
+ utilities.OrTimeout(func() { wg.Wait() }, 2*time.Second)
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) Self probe driver is stopping after sending %d probes.\n",
debugging.Prefix,
probeCount,
)
@@ -351,27 +360,31 @@ func SelfProber(
}
func LGCollectData(
- lgDataCollectionCtx context.Context,
- operatingCtx context.Context,
+ saturationCtx context.Context,
+ networkActivityCtx context.Context,
+ controlCtx context.Context,
lgcGenerator func() lgc.LoadGeneratingConnection,
selfProbeConfigurationGenerator func() ProbeConfiguration,
throughputDataLogger datalogger.DataLogger[ThroughputDataPoint],
debugging *debug.DebugWithPrefix,
-) (resulted chan SelfDataCollectionResult) {
+) (saturated chan bool, resulted chan SelfDataCollectionResult) {
resulted = make(chan SelfDataCollectionResult)
+ saturated = make(chan bool)
go func() {
+ isSaturated := false
+
lgcs := make([]lgc.LoadGeneratingConnection, 0)
addFlows(
- lgDataCollectionCtx,
+ networkActivityCtx,
constants.StartingNumberOfLoadGeneratingConnections,
&lgcs,
lgcGenerator,
debugging.Level,
)
- selfProbeCtx, selfProbeCtxCancel := context.WithCancel(lgDataCollectionCtx)
+ selfProbeCtx, selfProbeCtxCancel := context.WithCancel(saturationCtx)
probeDataPointsChannel := SelfProber(selfProbeCtx,
lgcs[0],
&lgcs,
@@ -400,11 +413,21 @@ func LGCollectData(
for currentInterval := uint64(0); true; currentInterval++ {
- // When the program stops operating, then stop. When our invoker tells
- // us to stop, then stop.
- if operatingCtx.Err() != nil || lgDataCollectionCtx.Err() != nil {
- selfProbeCtxCancel()
- return
+ // Stop if the client has reached saturation on both sides (up and down)
+ if saturationCtx.Err() != nil {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf("%v: Stopping data-collection/saturation loop at %v because both sides are saturated.", debugging, time.Now())
+ }
+ break
+ }
+
+ // Stop if we timed out! Send back false to indicate that we are returning under duress.
+ if controlCtx.Err() != nil {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf("%v: Stopping data-collection/saturation loop at %v because our controller told us to do so.", debugging, time.Now())
+ }
+ saturated <- false
+ break
}
now := time.Now()
@@ -507,7 +530,11 @@ func LGCollectData(
// Special case: We won't make any adjustments on the first
// iteration.
- if currentInterval == 0 {
+ // Special case: If we are already saturated, let's move on.
+ // We would already be saturated and want to continue
+ // to do this loop because we are still generating good
+ // data!
+ if currentInterval == 0 || isSaturated {
continue
}
@@ -523,7 +550,7 @@ func LGCollectData(
)
}
addFlows(
- lgDataCollectionCtx,
+ networkActivityCtx,
constants.AdditiveNumberOfLoadGeneratingConnections,
&lgcs,
lgcGenerator,
@@ -545,19 +572,30 @@ func LGCollectData(
if debug.IsDebug(debugging.Level) {
fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging)
}
- break
+ // Do not break -- we want to continue looping so that we can continue to log.
+ // See comment at the beginning of the loop for its terminating condition.
+ isSaturated = true
+
+ // But, we do send back a flare that says we are saturated (and happily so)!
+ saturated <- true
} else {
// Else, add four more flows
if debug.IsDebug(debugging.Level) {
fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging)
}
- addFlows(lgDataCollectionCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level)
+ addFlows(networkActivityCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level)
previousFlowIncreaseInterval = currentInterval
}
}
-
}
+ // For whatever reason, we are done. Let's report our results.
+
+ // In the case that we ended happily, there should be no reason to do this (because
+ // the self-probe context is a descendant of the saturation context). However, if we
+ // were cancelled because of a timeout, we will need to explicitly cancel it. Multiple
+ // calls to a cancel function are a-okay.
selfProbeCtxCancel()
+
selfProbeDataPoints := make([]ProbeDataPoint, 0)
for dataPoint := range probeDataPointsChannel {
selfProbeDataPoints = append(selfProbeDataPoints, dataPoint)
@@ -569,7 +607,7 @@ func LGCollectData(
len(selfProbeDataPoints),
)
}
- resulted <- SelfDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, DataPoints: selfProbeDataPoints}
+ resulted <- SelfDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, ProbeDataPoints: selfProbeDataPoints}
}()
return
}