summaryrefslogtreecommitdiff
path: root/networkQuality.go
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2022-07-01 01:49:08 -0400
committerWill Hawkins <[email protected]>2022-07-01 01:49:08 -0400
commit4fd7d42026f85da367afaaafeefd217b983f71ca (patch)
tree928add298c86ba9ece5552dce56adc066103e84e /networkQuality.go
parent631a845c807c0e9aca897f231763d3dcec76de20 (diff)
[Feature] Support spec v2
This is a WIP for supporting v2 of the RPM spec.
Diffstat (limited to 'networkQuality.go')
-rw-r--r--networkQuality.go214
1 files changed, 54 insertions, 160 deletions
diff --git a/networkQuality.go b/networkQuality.go
index 93c42e0..d5c3523 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -16,10 +16,8 @@ package main
import (
"context"
- "crypto/tls"
"flag"
"fmt"
- "net/http"
"os"
"runtime/pprof"
"time"
@@ -33,7 +31,6 @@ import (
"github.com/network-quality/goresponsiveness/rpm"
"github.com/network-quality/goresponsiveness/timeoutat"
"github.com/network-quality/goresponsiveness/utilities"
- "golang.org/x/net/http2"
)
var (
@@ -98,7 +95,7 @@ func main() {
timeoutAbsoluteTime := time.Now().Add(timeoutDuration)
configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort)
operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background())
- saturationCtx, cancelSaturationCtx := context.WithCancel(
+ lgDataCollectionCtx, cancelLGDataCollectionCtx := context.WithCancel(
context.Background(),
)
config := &config.Config{}
@@ -187,84 +184,90 @@ func main() {
* Create (and then, ironically, name) two anonymous functions that, when invoked,
* will create load-generating connections for upload/download/
*/
- generate_lbd := func() lgc.LoadGeneratingConnection {
+ generate_lgd := func() lgc.LoadGeneratingConnection {
return &lgc.LoadGeneratingConnectionDownload{
Path: config.Urls.LargeUrl,
KeyLogger: sslKeyFileConcurrentWriter,
}
}
- generate_lbu := func() lgc.LoadGeneratingConnection {
+ generate_lgu := func() lgc.LoadGeneratingConnection {
return &lgc.LoadGeneratingConnectionUpload{
Path: config.Urls.UploadUrl,
KeyLogger: sslKeyFileConcurrentWriter,
}
}
+ generate_lg_probe_configuration := func() rpm.ProbeConfiguration {
+ return rpm.ProbeConfiguration{URL: config.Urls.SmallUrl}
+ }
+
var downloadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "download")
var uploadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "upload")
- downloadSaturationChannel := rpm.Saturate(
- saturationCtx,
+ downloadDataCollectionChannel := rpm.LGCollectData(
+ lgDataCollectionCtx,
operatingCtx,
- generate_lbd,
+ generate_lgd,
+ generate_lg_probe_configuration,
downloadDebugging,
)
- uploadSaturationChannel := rpm.Saturate(
- saturationCtx,
+ uploadDataCollectionChannel := rpm.LGCollectData(
+ lgDataCollectionCtx,
operatingCtx,
- generate_lbu,
+ generate_lgu,
+ generate_lg_probe_configuration,
uploadDebugging,
)
- saturationTimeout := false
- uploadSaturated := false
- downloadSaturated := false
- downloadSaturation := rpm.SaturationResult{}
- uploadSaturation := rpm.SaturationResult{}
+ dataCollectionTimeout := false
+ uploadDataCollectionComplete := false
+ downloadDataCollectionComple := false
+ downloadDataCollectionResult := rpm.LGDataCollectionResult{}
+ uploadDataCollectionResult := rpm.LGDataCollectionResult{}
- for !(uploadSaturated && downloadSaturated) {
+ for !(uploadDataCollectionComplete && downloadDataCollectionComple) {
select {
- case downloadSaturation = <-downloadSaturationChannel:
+ case downloadDataCollectionResult = <-downloadDataCollectionChannel:
{
- downloadSaturated = true
+ downloadDataCollectionComple = true
if *debugCliFlag {
fmt.Printf(
- "################# download is %s saturated (%fMBps, %d flows)!\n",
+ "################# download load-generating data collection is %s complete (%fMBps, %d flows)!\n",
utilities.Conditional(
- saturationTimeout,
+ dataCollectionTimeout,
"(provisionally)",
"",
),
- utilities.ToMBps(downloadSaturation.RateBps),
- len(downloadSaturation.LGCs),
+ utilities.ToMBps(downloadDataCollectionResult.RateBps),
+ len(downloadDataCollectionResult.LGCs),
)
}
}
- case uploadSaturation = <-uploadSaturationChannel:
+ case uploadDataCollectionResult = <-uploadDataCollectionChannel:
{
- uploadSaturated = true
+ uploadDataCollectionComplete = true
if *debugCliFlag {
fmt.Printf(
- "################# upload is %s saturated (%fMBps, %d flows)!\n",
+ "################# upload load-generating data collection is %s complete (%fMBps, %d flows)!\n",
utilities.Conditional(
- saturationTimeout,
+ dataCollectionTimeout,
"(provisionally)",
"",
),
- utilities.ToMBps(uploadSaturation.RateBps),
- len(uploadSaturation.LGCs),
+ utilities.ToMBps(uploadDataCollectionResult.RateBps),
+ len(uploadDataCollectionResult.LGCs),
)
}
}
case <-timeoutChannel:
{
- if saturationTimeout {
- // We already timedout on saturation. This signal means that
- // we are timedout on getting the provisional saturation. We
+ if dataCollectionTimeout {
+ // We already timedout on data collection. This signal means that
+ // we are timedout on getting the provisional data collection. We
// will exit!
fmt.Fprint(
os.Stderr,
- "Error: Saturation could not be completed in time and no provisional rates could be assessed. Test failed.\n",
+ "Error: Load-Generating data collection could not be completed in time and no provisional data could be gathered. Test failed.\n",
)
cancelOperatingCtx()
if *debugCliFlag {
@@ -272,13 +275,13 @@ func main() {
}
return
}
- saturationTimeout = true
+ dataCollectionTimeout = true
- // We timed out attempting to saturate the link. So, we will
- // shut down all the saturation xfers
- cancelSaturationCtx()
+ // We timed out attempting to collect data about the link. So, we will
+ // shut down all the collection xfers
+ cancelLGDataCollectionCtx()
// and then we will give ourselves some additional time in order
- // to calculate a provisional saturation.
+ // to complete provisional data collection.
timeoutAbsoluteTime = time.Now().
Add(time.Second * time.Duration(*rpmtimeout))
timeoutChannel = timeoutat.TimeoutAt(
@@ -288,39 +291,27 @@ func main() {
)
if *debugCliFlag {
fmt.Printf(
- "################# timeout reaching saturation!\n",
+ "################# timeout collecting load-generating data!\n",
)
}
}
}
}
- // Give ourselves no more than 15 seconds to complete the RPM calculation.
- // This is conditional because (above) we may have already added the time.
- // We did it up there so that we could also limit the amount of time waiting
- // for a conditional saturation calculation.
- if !saturationTimeout {
- timeoutAbsoluteTime = time.Now().Add(time.Second * time.Duration(*rpmtimeout))
- timeoutChannel = timeoutat.TimeoutAt(
- operatingCtx,
- timeoutAbsoluteTime,
- debugLevel,
- )
- }
+ // In the new version we are no longer going to wait to send probes until after
+ // saturation. When we get here we are now only going to compute the results
+ // and/or extended statistics!
- totalMeasurements := uint64(0)
- totalMeasurementTimes := float64(0)
- measurementTimeout := false
extendedStats := extendedstats.ExtendedStats{}
- for i := 0; i < len(downloadSaturation.LGCs); i++ {
+ for i := 0; i < len(downloadDataCollectionResult.LGCs); i++ {
// Assume that extended statistics are available -- the check was done explicitly at
// program startup if the calculateExtendedStats flag was set by the user on the command line.
if *calculateExtendedStats {
if !extendedstats.ExtendedStatsAvailable() {
panic("Extended stats are not available but the user requested their calculation.")
}
- if err := extendedStats.IncorporateConnectionStats(downloadSaturation.LGCs[i].Stats().ConnInfo.Conn); err != nil {
+ if err := extendedStats.IncorporateConnectionStats(downloadDataCollectionResult.LGCs[i].Stats().ConnInfo.Conn); err != nil {
fmt.Fprintf(
os.Stderr,
"Warning: Could not add extended stats for the connection: %v",
@@ -329,116 +320,19 @@ func main() {
}
}
}
-
- for i := 0; i < constants.MeasurementProbeCount && !measurementTimeout; i++ {
- if len(downloadSaturation.LGCs) == 0 {
- continue
- }
- randomLGCsIndex := utilities.RandBetween(len(downloadSaturation.LGCs))
- if !downloadSaturation.LGCs[randomLGCsIndex].IsValid() {
- if *debugCliFlag {
- fmt.Printf(
- "%v: The randomly selected saturated connection (with id %d) was invalid. Skipping.\n",
- debugCliFlag,
- downloadSaturation.LGCs[randomLGCsIndex].ClientId(),
- )
- }
-
- // Protect against pathological cases where we continuously select
- // invalid connections and never
- // do the select below
- if time.Since(timeoutAbsoluteTime) > 0 {
- if *debugCliFlag {
- fmt.Printf(
- "Pathologically could not find valid saturated connections use for measurement.\n",
- )
- }
- break
- }
- continue
- }
-
- unsaturatedMeasurementTransport := http2.Transport{}
- unsaturatedMeasurementTransport.TLSClientConfig = &tls.Config{}
- if sslKeyFileConcurrentWriter != nil {
- unsaturatedMeasurementTransport.TLSClientConfig.KeyLogWriter = sslKeyFileConcurrentWriter
- }
- unsaturatedMeasurementTransport.TLSClientConfig.InsecureSkipVerify = true
- newClient := http.Client{Transport: &unsaturatedMeasurementTransport}
-
- unsaturatedMeasurementProbe := rpm.NewProbe(&newClient, debugLevel)
-
- saturatedMeasurementProbe := rpm.NewProbe(
- downloadSaturation.LGCs[randomLGCsIndex].Client(),
- debugLevel,
- )
-
- select {
- case <-timeoutChannel:
- {
- measurementTimeout = true
- }
- case sequentialMeasurementTimes := <-rpm.CalculateProbeMeasurements(operatingCtx, *strictFlag, saturatedMeasurementProbe, unsaturatedMeasurementProbe, config.Urls.SmallUrl, debugLevel):
- {
- if sequentialMeasurementTimes.Err != nil {
- fmt.Printf(
- "Failed to calculate a time for sequential measurements: %v\n",
- sequentialMeasurementTimes.Err,
- )
- continue
- }
-
- if debug.IsDebug(debugLevel) {
- fmt.Printf("unsaturatedMeasurementProbe: %v\n", unsaturatedMeasurementProbe)
- }
- // We know that we have a good Sequential measurement.
- totalMeasurements += uint64(sequentialMeasurementTimes.MeasurementCount)
- totalMeasurementTimes += sequentialMeasurementTimes.Delay.Seconds()
- if debug.IsDebug(debugLevel) {
- fmt.Printf(
- "most-recent sequential measurement time: %v; most-recent sequential measurement count: %v\n",
- sequentialMeasurementTimes.Delay.Seconds(),
- sequentialMeasurementTimes.MeasurementCount,
- )
- }
- }
- }
- }
-
fmt.Printf(
"Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
- utilities.ToMbps(downloadSaturation.RateBps),
- utilities.ToMBps(downloadSaturation.RateBps),
- len(downloadSaturation.LGCs),
+ utilities.ToMbps(downloadDataCollectionResult.RateBps),
+ utilities.ToMBps(downloadDataCollectionResult.RateBps),
+ len(downloadDataCollectionResult.LGCs),
)
fmt.Printf(
"Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
- utilities.ToMbps(uploadSaturation.RateBps),
- utilities.ToMBps(uploadSaturation.RateBps),
- len(uploadSaturation.LGCs),
+ utilities.ToMbps(uploadDataCollectionResult.RateBps),
+ utilities.ToMBps(uploadDataCollectionResult.RateBps),
+ len(uploadDataCollectionResult.LGCs),
)
- if totalMeasurements != 0 {
- // "... it sums the five time values for each probe, and divides by the
- // total
- // number of probes to compute an average probe duration. The
- // reciprocal of this, normalized to 60 seconds, gives the Round-trips
- // Per Minute (RPM)."
- // "average probe duration" = totalMeasurementTimes / totalMeasurements.
- // The reciprocol of this = 1 / (totalMeasurementTimes / totalMeasurements) <-
- // semantically the probes-per-second.
- // Normalized to 60 seconds: 60 * (1
- // / ((totalMeasurementTimes / totalMeasurements)))) <- semantically the number of
- // probes per minute.
- rpm := float64(
- time.Minute.Seconds(),
- ) / (totalMeasurementTimes / (float64(totalMeasurements)))
- fmt.Printf("Total measurements: %d\n", totalMeasurements)
- fmt.Printf("RPM: %5.0f\n", rpm)
- } else {
- fmt.Printf("Error occurred calculating RPM -- no probe measurements received.\n")
- }
-
if *calculateExtendedStats {
fmt.Println(extendedStats.Repr())
}