summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--config/config.go134
-rw-r--r--constants/constants.go2
-rw-r--r--networkQuality.go443
-rw-r--r--rpm/rpm.go221
-rw-r--r--utilities/utilities.go5
5 files changed, 412 insertions, 393 deletions
diff --git a/config/config.go b/config/config.go
new file mode 100644
index 0000000..05ec61d
--- /dev/null
+++ b/config/config.go
@@ -0,0 +1,134 @@
+package config
+
+import (
+ "crypto/tls"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "strings"
+
+ "github.com/network-quality/goresponsiveness/utilities"
+ "golang.org/x/net/http2"
+)
+
+type ConfigUrls struct {
+ SmallUrl string `json:"small_https_download_url"`
+ LargeUrl string `json:"large_https_download_url"`
+ UploadUrl string `json:"https_upload_url"`
+}
+
+type Config struct {
+ Version int
+ Urls ConfigUrls `json:"urls"`
+ Source string
+ Test_Endpoint string
+}
+
+func (c *Config) Get(configHost string, configPath string) error {
+ configTransport := http2.Transport{}
+ configTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
+ configClient := &http.Client{Transport: &configTransport}
+ // Extraneous /s in URLs is normally okay, but the Apple CDN does not
+ // like them. Make sure that we put exactly one (1) / between the host
+ // and the path.
+ if !strings.HasPrefix(configPath, "/") {
+ configPath = "/" + configPath
+ }
+ c.Source = fmt.Sprintf("https://%s%s", configHost, configPath)
+ resp, err := configClient.Get(c.Source)
+ if err != nil {
+ return fmt.Errorf(
+ "Error: Could not connect to configuration host %s: %v\n",
+ configHost,
+ err,
+ )
+ }
+
+ jsonConfig, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return fmt.Errorf(
+ "Error: Could not read configuration content downloaded from %s: %v\n",
+ c.Source,
+ err,
+ )
+ }
+
+ err = json.Unmarshal(jsonConfig, c)
+ if err != nil {
+ return fmt.Errorf(
+ "Error: Could not parse configuration returned from %s: %v\n",
+ c.Source,
+ err,
+ )
+ }
+
+ //if len(c.Test_Endpoint) != 0 {
+ if false {
+ tempUrl, err := url.Parse(c.Urls.LargeUrl)
+ if err != nil {
+ return fmt.Errorf("Error parsing large_https_download_url: %v", err)
+ }
+ c.Urls.LargeUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path
+ tempUrl, err = url.Parse(c.Urls.SmallUrl)
+ if err != nil {
+ return fmt.Errorf("Error parsing small_https_download_url: %v", err)
+ }
+ c.Urls.SmallUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path
+ tempUrl, err = url.Parse(c.Urls.UploadUrl)
+ if err != nil {
+ return fmt.Errorf("Error parsing https_upload_url: %v", err)
+ }
+ c.Urls.UploadUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path
+ }
+ return nil
+}
+
+func (c *Config) String() string {
+ return fmt.Sprintf(
+ "Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s\nEndpoint: %s\n",
+ c.Version,
+ c.Urls.SmallUrl,
+ c.Urls.LargeUrl,
+ c.Urls.UploadUrl,
+ c.Test_Endpoint,
+ )
+}
+
+func (c *Config) IsValid() error {
+ if parsedUrl, err := url.ParseRequestURI(c.Urls.LargeUrl); err != nil ||
+ parsedUrl.Scheme != "https" {
+ return fmt.Errorf(
+ "Configuration url large_https_download_url is invalid: %s",
+ utilities.Conditional(
+ len(c.Urls.LargeUrl) != 0,
+ c.Urls.LargeUrl,
+ "Missing",
+ ),
+ )
+ }
+ if parsedUrl, err := url.ParseRequestURI(c.Urls.SmallUrl); err != nil ||
+ parsedUrl.Scheme != "https" {
+ return fmt.Errorf(
+ "Configuration url small_https_download_url is invalid: %s",
+ utilities.Conditional(
+ len(c.Urls.SmallUrl) != 0,
+ c.Urls.SmallUrl,
+ "Missing",
+ ),
+ )
+ }
+ if parsedUrl, err := url.ParseRequestURI(c.Urls.UploadUrl); err != nil ||
+ parsedUrl.Scheme != "https" {
+ return fmt.Errorf(
+ "Configuration url https_upload_url is invalid: %s",
+ utilities.Conditional(
+ len(c.Urls.UploadUrl) != 0,
+ c.Urls.UploadUrl,
+ "Missing",
+ ),
+ )
+ }
+ return nil
+}
diff --git a/constants/constants.go b/constants/constants.go
index 2f906b6..147b643 100644
--- a/constants/constants.go
+++ b/constants/constants.go
@@ -18,7 +18,7 @@ var (
// The amount of time that the client will cooldown if it is in debug mode.
CooldownPeriod time.Duration = 4 * time.Second
// The number of probes to send when calculating RTT.
- RPMProbeCount int = 5
+ MeasurementProbeCount int = 5
// The amount of time that we give ourselves to calculate the RPM.
RPMCalculationTime time.Duration = 10 * time.Second
diff --git a/networkQuality.go b/networkQuality.go
index 2aba054..ab9517e 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -17,25 +17,20 @@ package main
import (
"context"
"crypto/tls"
- "encoding/json"
"flag"
"fmt"
_ "io"
- "io/ioutil"
_ "log"
- "math/rand"
"net/http"
- "net/url"
"os"
"runtime/pprof"
- "strings"
"time"
"github.com/network-quality/goresponsiveness/ccw"
+ "github.com/network-quality/goresponsiveness/config"
"github.com/network-quality/goresponsiveness/constants"
"github.com/network-quality/goresponsiveness/debug"
"github.com/network-quality/goresponsiveness/lgc"
- "github.com/network-quality/goresponsiveness/ma"
"github.com/network-quality/goresponsiveness/rpm"
"github.com/network-quality/goresponsiveness/timeoutat"
"github.com/network-quality/goresponsiveness/utilities"
@@ -86,343 +81,6 @@ var (
)
)
-type ConfigUrls struct {
- SmallUrl string `json:"small_https_download_url"`
- LargeUrl string `json:"large_https_download_url"`
- UploadUrl string `json:"https_upload_url"`
-}
-
-type Config struct {
- Version int
- Urls ConfigUrls `json:"urls"`
- Source string
- Test_Endpoint string
-}
-
-func (c *Config) Get(configHost string, configPath string) error {
- configTransport := http2.Transport{}
- configTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
- configClient := &http.Client{Transport: &configTransport}
- // Extraneous /s in URLs is normally okay, but the Apple CDN does not
- // like them. Make sure that we put exactly one (1) / between the host
- // and the path.
- if !strings.HasPrefix(configPath, "/") {
- configPath = "/" + configPath
- }
- c.Source = fmt.Sprintf("https://%s%s", configHost, configPath)
- resp, err := configClient.Get(c.Source)
- if err != nil {
- return fmt.Errorf(
- "Error: Could not connect to configuration host %s: %v\n",
- configHost,
- err,
- )
- }
-
- jsonConfig, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return fmt.Errorf(
- "Error: Could not read configuration content downloaded from %s: %v\n",
- c.Source,
- err,
- )
- }
-
- err = json.Unmarshal(jsonConfig, c)
- if err != nil {
- return fmt.Errorf(
- "Error: Could not parse configuration returned from %s: %v\n",
- c.Source,
- err,
- )
- }
-
- //if len(c.Test_Endpoint) != 0 {
- if false {
- tempUrl, err := url.Parse(c.Urls.LargeUrl)
- if err != nil {
- return fmt.Errorf("Error parsing large_https_download_url: %v", err)
- }
- c.Urls.LargeUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path
- tempUrl, err = url.Parse(c.Urls.SmallUrl)
- if err != nil {
- return fmt.Errorf("Error parsing small_https_download_url: %v", err)
- }
- c.Urls.SmallUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path
- tempUrl, err = url.Parse(c.Urls.UploadUrl)
- if err != nil {
- return fmt.Errorf("Error parsing https_upload_url: %v", err)
- }
- c.Urls.UploadUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path
- }
- return nil
-}
-
-func (c *Config) String() string {
- return fmt.Sprintf(
- "Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s\nEndpoint: %s\n",
- c.Version,
- c.Urls.SmallUrl,
- c.Urls.LargeUrl,
- c.Urls.UploadUrl,
- c.Test_Endpoint,
- )
-}
-
-func (c *Config) IsValid() error {
- if parsedUrl, err := url.ParseRequestURI(c.Urls.LargeUrl); err != nil ||
- parsedUrl.Scheme != "https" {
- return fmt.Errorf(
- "Configuration url large_https_download_url is invalid: %s",
- utilities.Conditional(
- len(c.Urls.LargeUrl) != 0,
- c.Urls.LargeUrl,
- "Missing",
- ),
- )
- }
- if parsedUrl, err := url.ParseRequestURI(c.Urls.SmallUrl); err != nil ||
- parsedUrl.Scheme != "https" {
- return fmt.Errorf(
- "Configuration url small_https_download_url is invalid: %s",
- utilities.Conditional(
- len(c.Urls.SmallUrl) != 0,
- c.Urls.SmallUrl,
- "Missing",
- ),
- )
- }
- if parsedUrl, err := url.ParseRequestURI(c.Urls.UploadUrl); err != nil ||
- parsedUrl.Scheme != "https" {
- return fmt.Errorf(
- "Configuration url https_upload_url is invalid: %s",
- utilities.Conditional(
- len(c.Urls.UploadUrl) != 0,
- c.Urls.UploadUrl,
- "Missing",
- ),
- )
- }
- return nil
-}
-
-func addFlows(
- ctx context.Context,
- toAdd uint64,
- lgcs *[]lgc.LoadGeneratingConnection,
- lgcsPreviousTransferred *[]uint64,
- lgcGenerator func() lgc.LoadGeneratingConnection,
- debug debug.DebugLevel,
-) {
- for i := uint64(0); i < toAdd; i++ {
- *lgcs = append(*lgcs, lgcGenerator())
- *lgcsPreviousTransferred = append(*lgcsPreviousTransferred, 0)
- if !(*lgcs)[len(*lgcs)-1].Start(ctx, debug) {
- fmt.Printf(
- "Error starting lgc with id %d!\n",
- (*lgcs)[len(*lgcs)-1].ClientId(),
- )
- return
- }
- }
-}
-
-type SaturationResult struct {
- RateBps float64
- lgcs []lgc.LoadGeneratingConnection
-}
-
-func saturate(
- saturationCtx context.Context,
- operatingCtx context.Context,
- lgcGenerator func() lgc.LoadGeneratingConnection,
- debugging *debug.DebugWithPrefix,
-) (saturated chan SaturationResult) {
- saturated = make(chan SaturationResult)
- go func() {
-
- lgcs := make([]lgc.LoadGeneratingConnection, 0)
- lgcsPreviousTransferred := make([]uint64, 0)
-
- addFlows(
- saturationCtx,
- constants.StartingNumberOfLoadGeneratingConnections,
- &lgcs,
- &lgcsPreviousTransferred,
- lgcGenerator,
- debugging.Level,
- )
-
- previousFlowIncreaseIteration := uint64(0)
- previousMovingAverage := float64(0)
- movingAverage := ma.NewMovingAverage(
- constants.MovingAverageIntervalCount,
- )
- movingAverageAverage := ma.NewMovingAverage(
- constants.MovingAverageIntervalCount,
- )
-
- nextSampleStartTime := time.Now().Add(time.Second)
-
- for currentIteration := uint64(0); true; currentIteration++ {
-
- // When the program stops operating, then stop.
- if saturationCtx.Err() != nil {
- return
- }
-
- // We may be asked to stop trying to saturate the
- // network and return our current status.
- if saturationCtx.Err() != nil {
- //break
- }
-
- now := time.Now()
- // At each 1-second interval
- if nextSampleStartTime.Sub(now) > 0 {
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "%v: Sleeping until %v\n",
- debugging,
- nextSampleStartTime,
- )
- }
- time.Sleep(nextSampleStartTime.Sub(now))
- } else {
- fmt.Fprintf(os.Stderr, "Warning: Missed a one-second deadline.\n")
- }
- nextSampleStartTime = time.Now().Add(time.Second)
-
- // Compute "instantaneous aggregate" goodput which is the number of
- // bytes transferred within the last second.
- totalTransfer := uint64(0)
- allInvalid := true
- for i := range lgcs {
- if !lgcs[i].IsValid() {
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "%v: Load-generating connection with id %d is invalid ... skipping.\n",
- debugging,
- lgcs[i].ClientId(),
- )
- }
- continue
- }
- allInvalid = false
- previousTransferred := lgcsPreviousTransferred[i]
- currentTransferred := lgcs[i].Transferred()
- totalTransfer += (currentTransferred - previousTransferred)
- lgcsPreviousTransferred[i] = currentTransferred
- }
-
- // For some reason, all the lgcs are invalid. This likely means that
- // the network/server went away.
- if allInvalid {
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "%v: All lgcs were invalid. Assuming that network/server went away.\n",
- debugging,
- )
- }
- break
- }
-
- // Compute a moving average of the last
- // constants.MovingAverageIntervalCount "instantaneous aggregate
- // goodput" measurements
- movingAverage.AddMeasurement(float64(totalTransfer))
- currentMovingAverage := movingAverage.CalculateAverage()
- movingAverageAverage.AddMeasurement(currentMovingAverage)
- movingAverageDelta := utilities.SignedPercentDifference(
- currentMovingAverage,
- previousMovingAverage,
- )
-
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "%v: Instantaneous goodput: %f MB.\n",
- debugging,
- utilities.ToMBps(float64(totalTransfer)),
- )
- fmt.Printf(
- "%v: Previous moving average: %f MB.\n",
- debugging,
- utilities.ToMBps(previousMovingAverage),
- )
- fmt.Printf(
- "%v: Current moving average: %f MB.\n",
- debugging,
- utilities.ToMBps(currentMovingAverage),
- )
- fmt.Printf(
- "%v: Moving average delta: %f.\n",
- debugging,
- movingAverageDelta,
- )
- }
-
- previousMovingAverage = currentMovingAverage
-
- // Special case: We won't make any adjustments on the first
- // iteration.
- if currentIteration == 0 {
- continue
- }
-
- // If moving average > "previous" moving average + InstabilityDelta:
- if movingAverageDelta > constants.InstabilityDelta {
- // Network did not yet reach saturation. If no flows added
- // within the last 4 seconds, add 4 more flows
- if (currentIteration - previousFlowIncreaseIteration) > uint64(
- constants.MovingAverageStabilitySpan,
- ) {
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "%v: Adding flows because we are unsaturated and waited a while.\n",
- debugging,
- )
- }
- addFlows(
- saturationCtx,
- constants.AdditiveNumberOfLoadGeneratingConnections,
- &lgcs,
- &lgcsPreviousTransferred,
- lgcGenerator,
- debugging.Level,
- )
- previousFlowIncreaseIteration = currentIteration
- } else {
- if debug.IsDebug(debugging.Level) {
- fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debugging)
- }
- }
- } else { // Else, network reached saturation for the current flow count.
- if debug.IsDebug(debugging.Level) {
- fmt.Printf("%v: Network reached saturation with current flow count.\n", debugging)
- }
- // If new flows added and for 4 seconds the moving average
- // throughput did not change: network reached stable saturation
- if (currentIteration-previousFlowIncreaseIteration) < uint64(constants.MovingAverageStabilitySpan) && movingAverageAverage.AllSequentialIncreasesLessThan(float64(5)) {
- if debug.IsDebug(debugging.Level) {
- fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging)
- }
- break
- } else {
- // Else, add four more flows
- if debug.IsDebug(debugging.Level) {
- fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging)
- }
- addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, &lgcsPreviousTransferred, lgcGenerator, debugging.Level)
- previousFlowIncreaseIteration = currentIteration
- }
- }
-
- }
- saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), lgcs: lgcs}
- }()
- return
-}
-
func main() {
flag.Parse()
@@ -433,7 +91,7 @@ func main() {
saturationCtx, cancelSaturationCtx := context.WithCancel(
context.Background(),
)
- config := &Config{}
+ config := &config.Config{}
var debugLevel debug.DebugLevel = debug.Error
if *debugCliFlag {
@@ -508,6 +166,10 @@ func main() {
}
}
+ /*
+ * Create (and then, ironically, name) two anonymous functions that, when invoked,
+ * will create load-generating connections for upload/download/
+ */
generate_lbd := func() lgc.LoadGeneratingConnection {
return &lgc.LoadGeneratingConnectionDownload{
Path: config.Urls.LargeUrl,
@@ -524,13 +186,13 @@ func main() {
var downloadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "download")
var uploadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "upload")
- downloadSaturationChannel := saturate(
+ downloadSaturationChannel := rpm.Saturate(
saturationCtx,
operatingCtx,
generate_lbd,
downloadDebugging,
)
- uploadSaturationChannel := saturate(
+ uploadSaturationChannel := rpm.Saturate(
saturationCtx,
operatingCtx,
generate_lbu,
@@ -540,8 +202,8 @@ func main() {
saturationTimeout := false
uploadSaturated := false
downloadSaturated := false
- downloadSaturation := SaturationResult{}
- uploadSaturation := SaturationResult{}
+ downloadSaturation := rpm.SaturationResult{}
+ uploadSaturation := rpm.SaturationResult{}
for !(uploadSaturated && downloadSaturated) {
select {
@@ -557,7 +219,7 @@ func main() {
"",
),
utilities.ToMBps(downloadSaturation.RateBps),
- len(downloadSaturation.lgcs),
+ len(downloadSaturation.LGCs),
)
}
}
@@ -573,7 +235,7 @@ func main() {
"",
),
utilities.ToMBps(uploadSaturation.RateBps),
- len(uploadSaturation.lgcs),
+ len(uploadSaturation.LGCs),
)
}
}
@@ -585,7 +247,7 @@ func main() {
// will exit!
fmt.Fprint(
os.Stderr,
- "Error: Saturation could not be completed in time and no provisional rates could be accessed. Test failed.\n",
+ "Error: Saturation could not be completed in time and no provisional rates could be assessed. Test failed.\n",
)
cancelOperatingCtx()
if *debugCliFlag {
@@ -629,25 +291,21 @@ func main() {
)
}
- totalRTsCount := uint64(0)
- totalRTTimes := float64(0)
- rttTimeout := false
+ totalMeasurements := uint64(0)
+ totalMeasurementTimes := float64(0)
+ measurementTimeout := false
- for i := 0; i < constants.RPMProbeCount && !rttTimeout; i++ {
- if len(downloadSaturation.lgcs) == 0 {
+ for i := 0; i < constants.MeasurementProbeCount && !measurementTimeout; i++ {
+ if len(downloadSaturation.LGCs) == 0 {
continue
}
- randomlgcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).
- Int() %
- len(
- downloadSaturation.lgcs,
- )
- if !downloadSaturation.lgcs[randomlgcsIndex].IsValid() {
+ randomLGCsIndex := utilities.RandBetween(len(downloadSaturation.LGCs))
+ if !downloadSaturation.LGCs[randomLGCsIndex].IsValid() {
if *debugCliFlag {
fmt.Printf(
- "%v: The randomly selected download lgc (with id %d) was invalid. Skipping.\n",
+ "%v: The randomly selected saturated connection (with id %d) was invalid. Skipping.\n",
debugCliFlag,
- downloadSaturation.lgcs[randomlgcsIndex].ClientId(),
+ downloadSaturation.LGCs[randomLGCsIndex].ClientId(),
)
}
@@ -657,7 +315,7 @@ func main() {
if time.Since(timeoutAbsoluteTime) > 0 {
if *debugCliFlag {
fmt.Printf(
- "Pathologically could not find valid lgcs to use for measurement.\n",
+ "Pathologically could not find valid saturated connections use for measurement.\n",
)
}
break
@@ -665,46 +323,47 @@ func main() {
continue
}
- newTransport := http2.Transport{}
- newTransport.TLSClientConfig = &tls.Config{}
+ unsaturatedMeasurementTransport := http2.Transport{}
+ unsaturatedMeasurementTransport.TLSClientConfig = &tls.Config{}
if sslKeyFileConcurrentWriter != nil {
- newTransport.TLSClientConfig.KeyLogWriter = sslKeyFileConcurrentWriter
+ unsaturatedMeasurementTransport.TLSClientConfig.KeyLogWriter = sslKeyFileConcurrentWriter
}
- newTransport.TLSClientConfig.InsecureSkipVerify = true
- newClient := http.Client{Transport: &newTransport}
+ unsaturatedMeasurementTransport.TLSClientConfig.InsecureSkipVerify = true
+ newClient := http.Client{Transport: &unsaturatedMeasurementTransport}
- newRTTProbe := rpm.NewProbe(&newClient, debugLevel)
+ unsaturatedMeasurementProbe := rpm.NewProbe(&newClient, debugLevel)
- saturatedRTTProbe := rpm.NewProbe(
- downloadSaturation.lgcs[randomlgcsIndex].Client(),
+ saturatedMeasurementProbe := rpm.NewProbe(
+ downloadSaturation.LGCs[randomLGCsIndex].Client(),
debugLevel,
)
select {
case <-timeoutChannel:
{
- rttTimeout = true
+ measurementTimeout = true
}
- case sequentialRTTimes := <-rpm.CalculateProbeMeasurements(operatingCtx, *strictFlag, saturatedRTTProbe, newRTTProbe, config.Urls.SmallUrl, debugLevel):
+ case sequentialMeasurementTimes := <-rpm.CalculateProbeMeasurements(operatingCtx, *strictFlag, saturatedMeasurementProbe, unsaturatedMeasurementProbe, config.Urls.SmallUrl, debugLevel):
{
- if sequentialRTTimes.Err != nil {
+ if sequentialMeasurementTimes.Err != nil {
fmt.Printf(
- "Failed to calculate a time for sequential RTTs: %v\n",
- sequentialRTTimes.Err,
+ "Failed to calculate a time for sequential measurements: %v\n",
+ sequentialMeasurementTimes.Err,
)
continue
}
if debug.IsDebug(debugLevel) {
- fmt.Printf("rttProbe: %v\n", newRTTProbe)
+ fmt.Printf("unsaturatedMeasurementProbe: %v\n", unsaturatedMeasurementProbe)
}
- // We know that we have a good Sequential RTT.
- totalRTsCount += uint64(sequentialRTTimes.MeasurementCount)
- totalRTTimes += sequentialRTTimes.Delay.Seconds()
+ // We know that we have a good Sequential measurement.
+ totalMeasurements += uint64(sequentialMeasurementTimes.MeasurementCount)
+ totalMeasurementTimes += sequentialMeasurementTimes.Delay.Seconds()
if debug.IsDebug(debugLevel) {
fmt.Printf(
- "sequentialRTTsTime: %v\n",
- sequentialRTTimes.Delay.Seconds(),
+ "most-recent sequential measurement time: %v; most-recent sequential measurement count: %v\n",
+ sequentialMeasurementTimes.Delay.Seconds(),
+ sequentialMeasurementTimes.MeasurementCount,
)
}
}
@@ -715,31 +374,31 @@ func main() {
"Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
utilities.ToMbps(downloadSaturation.RateBps),
utilities.ToMBps(downloadSaturation.RateBps),
- len(downloadSaturation.lgcs),
+ len(downloadSaturation.LGCs),
)
fmt.Printf(
"Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
utilities.ToMbps(uploadSaturation.RateBps),
utilities.ToMBps(uploadSaturation.RateBps),
- len(uploadSaturation.lgcs),
+ len(uploadSaturation.LGCs),
)
- if totalRTsCount != 0 {
+ if totalMeasurements != 0 {
// "... it sums the five time values for each probe, and divides by the
// total
// number of probes to compute an average probe duration. The
// reciprocal of this, normalized to 60 seconds, gives the Round-trips
// Per Minute (RPM)."
- // "average probe duration" = totalRTTimes / totalRTsCount.
- // The reciprocol of this = 1 / (totalRTTimes / totalRTsCount) <-
+ // "average probe duration" = totalMeasurementTimes / totalMeasurements.
+ // The reciprocol of this = 1 / (totalMeasurementTimes / totalMeasurements) <-
// semantically the probes-per-second.
// Normalized to 60 seconds: 60 * (1
- // / (totalRTTimes / totalRTsCount))) <- semantically the number of
+ // / ((totalMeasurementTimes / totalMeasurements)))) <- semantically the number of
// probes per minute.
rpm := float64(
time.Minute.Seconds(),
- ) / (totalRTTimes / (float64(totalRTsCount)))
- fmt.Printf("Total RTTs measured: %d\n", totalRTsCount)
+ ) / (totalMeasurementTimes / (float64(totalMeasurements)))
+ fmt.Printf("Total measurements: %d\n", totalMeasurements)
fmt.Printf("RPM: %5.0f\n", rpm)
} else {
fmt.Printf("Error occurred calculating RPM -- no probe measurements received.\n")
diff --git a/rpm/rpm.go b/rpm/rpm.go
index a349cee..8f431b6 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -7,14 +7,235 @@ import (
"io"
"net/http"
"net/http/httptrace"
+ "os"
"time"
+ "github.com/network-quality/goresponsiveness/constants"
"github.com/network-quality/goresponsiveness/debug"
+ "github.com/network-quality/goresponsiveness/lgc"
+ "github.com/network-quality/goresponsiveness/ma"
"github.com/network-quality/goresponsiveness/stats"
"github.com/network-quality/goresponsiveness/traceable"
"github.com/network-quality/goresponsiveness/utilities"
)
+func addFlows(
+ ctx context.Context,
+ toAdd uint64,
+ lgcs *[]lgc.LoadGeneratingConnection,
+ lgcsPreviousTransferred *[]uint64,
+ lgcGenerator func() lgc.LoadGeneratingConnection,
+ debug debug.DebugLevel,
+) {
+ for i := uint64(0); i < toAdd; i++ {
+ *lgcs = append(*lgcs, lgcGenerator())
+ *lgcsPreviousTransferred = append(*lgcsPreviousTransferred, 0)
+ if !(*lgcs)[len(*lgcs)-1].Start(ctx, debug) {
+ fmt.Printf(
+ "Error starting lgc with id %d!\n",
+ (*lgcs)[len(*lgcs)-1].ClientId(),
+ )
+ return
+ }
+ }
+}
+
+type SaturationResult struct {
+ RateBps float64
+ LGCs []lgc.LoadGeneratingConnection
+}
+
+func Saturate(
+ saturationCtx context.Context,
+ operatingCtx context.Context,
+ lgcGenerator func() lgc.LoadGeneratingConnection,
+ debugging *debug.DebugWithPrefix,
+) (saturated chan SaturationResult) {
+ saturated = make(chan SaturationResult)
+ go func() {
+
+ lgcs := make([]lgc.LoadGeneratingConnection, 0)
+ lgcsPreviousTransferred := make([]uint64, 0)
+
+ addFlows(
+ saturationCtx,
+ constants.StartingNumberOfLoadGeneratingConnections,
+ &lgcs,
+ &lgcsPreviousTransferred,
+ lgcGenerator,
+ debugging.Level,
+ )
+
+ previousFlowIncreaseIteration := uint64(0)
+ previousMovingAverage := float64(0)
+ movingAverage := ma.NewMovingAverage(
+ constants.MovingAverageIntervalCount,
+ )
+ movingAverageAverage := ma.NewMovingAverage(
+ constants.MovingAverageIntervalCount,
+ )
+
+ nextSampleStartTime := time.Now().Add(time.Second)
+
+ for currentIteration := uint64(0); true; currentIteration++ {
+
+ // When the program stops operating, then stop.
+ if saturationCtx.Err() != nil {
+ return
+ }
+
+ // We may be asked to stop trying to saturate the
+ // network and return our current status.
+ if saturationCtx.Err() != nil {
+ //break
+ }
+
+ now := time.Now()
+ // At each 1-second interval
+ if nextSampleStartTime.Sub(now) > 0 {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: Sleeping until %v\n",
+ debugging,
+ nextSampleStartTime,
+ )
+ }
+ time.Sleep(nextSampleStartTime.Sub(now))
+ } else {
+ fmt.Fprintf(os.Stderr, "Warning: Missed a one-second deadline.\n")
+ }
+ nextSampleStartTime = time.Now().Add(time.Second)
+
+ // Compute "instantaneous aggregate" goodput which is the number of
+ // bytes transferred within the last second.
+ totalTransfer := uint64(0)
+ allInvalid := true
+ for i := range lgcs {
+ if !lgcs[i].IsValid() {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: Load-generating connection with id %d is invalid ... skipping.\n",
+ debugging,
+ lgcs[i].ClientId(),
+ )
+ }
+ continue
+ }
+ allInvalid = false
+ previousTransferred := lgcsPreviousTransferred[i]
+ currentTransferred := lgcs[i].Transferred()
+ totalTransfer += (currentTransferred - previousTransferred)
+ lgcsPreviousTransferred[i] = currentTransferred
+ }
+
+ // For some reason, all the lgcs are invalid. This likely means that
+ // the network/server went away.
+ if allInvalid {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: All lgcs were invalid. Assuming that network/server went away.\n",
+ debugging,
+ )
+ }
+ break
+ }
+
+ // Compute a moving average of the last
+ // constants.MovingAverageIntervalCount "instantaneous aggregate
+ // goodput" measurements
+ movingAverage.AddMeasurement(float64(totalTransfer))
+ currentMovingAverage := movingAverage.CalculateAverage()
+ movingAverageAverage.AddMeasurement(currentMovingAverage)
+ movingAverageDelta := utilities.SignedPercentDifference(
+ currentMovingAverage,
+ previousMovingAverage,
+ )
+
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: Instantaneous goodput: %f MB.\n",
+ debugging,
+ utilities.ToMBps(float64(totalTransfer)),
+ )
+ fmt.Printf(
+ "%v: Previous moving average: %f MB.\n",
+ debugging,
+ utilities.ToMBps(previousMovingAverage),
+ )
+ fmt.Printf(
+ "%v: Current moving average: %f MB.\n",
+ debugging,
+ utilities.ToMBps(currentMovingAverage),
+ )
+ fmt.Printf(
+ "%v: Moving average delta: %f.\n",
+ debugging,
+ movingAverageDelta,
+ )
+ }
+
+ previousMovingAverage = currentMovingAverage
+
+ // Special case: We won't make any adjustments on the first
+ // iteration.
+ if currentIteration == 0 {
+ continue
+ }
+
+ // If moving average > "previous" moving average + InstabilityDelta:
+ if movingAverageDelta > constants.InstabilityDelta {
+ // Network did not yet reach saturation. If no flows added
+ // within the last 4 seconds, add 4 more flows
+ if (currentIteration - previousFlowIncreaseIteration) > uint64(
+ constants.MovingAverageStabilitySpan,
+ ) {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: Adding flows because we are unsaturated and waited a while.\n",
+ debugging,
+ )
+ }
+ addFlows(
+ saturationCtx,
+ constants.AdditiveNumberOfLoadGeneratingConnections,
+ &lgcs,
+ &lgcsPreviousTransferred,
+ lgcGenerator,
+ debugging.Level,
+ )
+ previousFlowIncreaseIteration = currentIteration
+ } else {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debugging)
+ }
+ }
+ } else { // Else, network reached saturation for the current flow count.
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf("%v: Network reached saturation with current flow count.\n", debugging)
+ }
+ // If new flows added and for 4 seconds the moving average
+ // throughput did not change: network reached stable saturation
+ if (currentIteration-previousFlowIncreaseIteration) < uint64(constants.MovingAverageStabilitySpan) && movingAverageAverage.AllSequentialIncreasesLessThan(float64(5)) {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging)
+ }
+ break
+ } else {
+ // Else, add four more flows
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging)
+ }
+ addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, &lgcsPreviousTransferred, lgcGenerator, debugging.Level)
+ previousFlowIncreaseIteration = currentIteration
+ }
+ }
+
+ }
+ saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs}
+ }()
+ return
+}
+
type Probe struct {
client *http.Client
stats *stats.TraceStats
diff --git a/utilities/utilities.go b/utilities/utilities.go
index 4b114ba..a143d31 100644
--- a/utilities/utilities.go
+++ b/utilities/utilities.go
@@ -17,6 +17,7 @@ package utilities
import (
"fmt"
"math"
+ "math/rand"
"os"
"reflect"
"sync/atomic"
@@ -117,3 +118,7 @@ func (optional Optional[S]) String() string {
return "None"
}
}
+
+func RandBetween(max int) int {
+ return rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % max
+}