summaryrefslogtreecommitdiff
path: root/networkQuality.go
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2021-12-15 00:21:20 -0500
committerWill Hawkins <[email protected]>2021-12-15 00:21:20 -0500
commit0390ac743419231022d87f393eaa876876e76ee9 (patch)
tree40468a341e6fd193e3b0f3547fdcb93001c06af1 /networkQuality.go
parent45b8e5f0e195de0a9c077b4cdab9410f22654699 (diff)
Basic implementation complete.
Diffstat (limited to 'networkQuality.go')
-rw-r--r--networkQuality.go84
1 files changed, 61 insertions, 23 deletions
diff --git a/networkQuality.go b/networkQuality.go
index 886022e..673f58b 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -8,9 +8,11 @@ import (
_ "io"
"io/ioutil"
_ "log"
+ "math/rand"
"net/http"
"net/url"
"os"
+ "strings"
"time"
"github.com/hawkinsw/goresponsiveness/ma"
@@ -53,10 +55,12 @@ func toMBs(bytes float64) float64 {
var (
// Variables to hold CLI arguments.
- configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.")
- configPort = flag.Int("port", 4043, "port number on which to access responsiveness configuration server.")
- debug = flag.Bool("debug", false, "Enable debugging.")
- timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.")
+ configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.")
+ configPort = flag.Int("port", 4043, "port number on which to access responsiveness configuration server.")
+ configPath = flag.String("path", "config", "path on the server to the configuration endpoint.")
+ debug = flag.Bool("debug", false, "Enable debugging.")
+ timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.")
+ storeSslKeys = flag.Bool("store-ssl-keys", false, "Store SSL keys from connections for debugging. (currently unused)")
)
func addFlows(ctx context.Context, toAdd uint64, mcs *[]mc.MeasurableConnection, mcsPreviousTransferred *[]uint64, lbcGenerator func() mc.MeasurableConnection, debug bool) {
@@ -72,8 +76,8 @@ func addFlows(ctx context.Context, toAdd uint64, mcs *[]mc.MeasurableConnection,
}
type SaturationResult struct {
- RateBps float64
- FlowCount uint64
+ RateBps float64
+ Mcs []mc.MeasurableConnection
}
func saturate(ctx context.Context, saturated chan<- SaturationResult, lbcGenerator func() mc.MeasurableConnection, debug bool) {
@@ -152,7 +156,7 @@ func saturate(ctx context.Context, saturated chan<- SaturationResult, lbcGenerat
}
}
- saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), FlowCount: uint64(len(mcs))}
+ saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), Mcs: mcs}
}
func main() {
@@ -161,7 +165,12 @@ func main() {
timeoutDuration := time.Second * time.Duration(*timeout)
configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort)
- configUrl := fmt.Sprintf("https://%s/config", configHostPort)
+
+ if !strings.HasPrefix(*configPath, "/") {
+ *configPath = "/" + *configPath
+ }
+
+ configUrl := fmt.Sprintf("https://%s%s", configHostPort, *configPath)
configClient := &http.Client{}
resp, err := configClient.Get(configUrl)
@@ -183,8 +192,6 @@ func main() {
return
}
- // TODO: Make sure that all configuration values are present and accounted for!
-
if err := config.IsValid(); err != nil {
fmt.Fprintf(os.Stderr, "Error: Invalid configuration returned from %s: %v\n", configUrl, err)
return
@@ -198,7 +205,6 @@ func main() {
uploadSaturationChannel := make(chan SaturationResult)
downloadSaturationChannel := make(chan SaturationResult)
-
timeoutChannel := timeoutat.TimeoutAt(operatingCtx, time.Now().Add(timeoutDuration), *debug)
generate_lbd := func() mc.MeasurableConnection {
@@ -211,29 +217,31 @@ func main() {
go saturate(operatingCtx, downloadSaturationChannel, generate_lbd, *debug)
go saturate(operatingCtx, uploadSaturationChannel, generate_lbu, *debug)
- saturation_timeout := false
+ test_timeout := false
upload_saturated := false
download_saturated := false
- for !saturation_timeout && !(upload_saturated && download_saturated) {
+ var downloadSaturation, uploadSaturation SaturationResult
+
+ for !test_timeout && !(upload_saturated && download_saturated) {
select {
- case saturatedDownloadRate := <-downloadSaturationChannel:
+ case downloadSaturation = <-downloadSaturationChannel:
{
download_saturated = true
if *debug {
- fmt.Printf("################# download is saturated (%fMBps, %d flows)!\n", toMBs(saturatedDownloadRate.RateBps), saturatedDownloadRate.FlowCount)
+ fmt.Printf("################# download is saturated (%fMBps, %d flows)!\n", toMBs(downloadSaturation.RateBps), len(downloadSaturation.Mcs))
}
}
- case saturatedUploadRate := <-uploadSaturationChannel:
+ case uploadSaturation = <-uploadSaturationChannel:
{
upload_saturated = true
if *debug {
- fmt.Printf("################# upload is saturated (%fMBps, %d flows)!\n", toMBs(saturatedUploadRate.RateBps), saturatedUploadRate.FlowCount)
+ fmt.Printf("################# upload is saturated (%fMBps, %d flows)!\n", toMBs(uploadSaturation.RateBps), len(uploadSaturation.Mcs))
}
}
case <-timeoutChannel:
{
- saturation_timeout = true
+ test_timeout = true
if *debug {
fmt.Printf("################# timeout reaching saturation!\n")
}
@@ -241,15 +249,45 @@ func main() {
}
}
- if saturation_timeout {
+ if test_timeout {
cancelOperatingCtx()
- fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation in maximum time of %v\n.", timeoutDuration)
+ fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation before test time expired.\n.", timeoutDuration)
return
}
- time.Sleep(10 * time.Second)
+ // We are guaranteed to have an upload and download saturation result!
- cancelOperatingCtx()
+ robustnessProbeIterationCount := 5
+ actualProbeCount := 0
- time.Sleep(4 * time.Second)
+ totalProbeTime := float64(0)
+ for i := 0; i < robustnessProbeIterationCount && !test_timeout; i++ {
+ // There are len(downloadSaturation.Mcs) flows with http2 connections that
+ // we can piggy back. Let's choose one at random.
+ mcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Mcs)
+ select {
+ case <-timeoutChannel:
+ {
+ test_timeout = true
+ }
+ case probeTime := <-utilities.TimedSequentialGets(operatingCtx, downloadSaturation.Mcs[mcsIndex].Client(), &http.Client{}, config.Urls.SmallUrl):
+ {
+ actualProbeCount++
+ totalProbeTime += probeTime.Delay.Seconds()
+ if *debug {
+ fmt.Printf("probeTime: %v\n", probeTime.Delay.Seconds())
+ }
+ }
+ }
+ }
+
+ averageProbeTime := totalProbeTime / (float64(actualProbeCount) * 5)
+
+ fmt.Printf("RPM: %v\n", float64(60)/averageProbeTime)
+
+ cancelOperatingCtx()
+ if *debug {
+ // Hold on to cool down.
+ time.Sleep(4 * time.Second)
+ }
}