diff options
| -rw-r--r-- | go.mod | 6 | ||||
| -rw-r--r-- | mc/mc.go | 25 | ||||
| -rw-r--r-- | networkQuality.go | 84 | ||||
| -rw-r--r-- | utilities/utilities.go | 50 |
4 files changed, 140 insertions, 25 deletions
@@ -1,3 +1,7 @@ module github.com/hawkinsw/goresponsiveness -go 1.16 +go 1.17 + +require golang.org/x/net v0.0.0-20211209124913-491a49abca63 + +require golang.org/x/text v0.3.6 // indirect @@ -13,6 +13,7 @@ var chunkSize int = 5000 type MeasurableConnection interface { Start(context.Context, bool) bool Transferred() uint64 + Client() *http.Client } type LoadBearingDownload struct { @@ -25,10 +26,30 @@ func (lbd *LoadBearingDownload) Transferred() uint64 { return lbd.downloaded } +func (lbd *LoadBearingDownload) Client() *http.Client { + return lbd.client +} + func (lbd *LoadBearingDownload) Start(ctx context.Context, debug bool) bool { lbd.downloaded = 0 lbd.client = &http.Client{} + // At some point this might be useful: It is a snippet of code that will enable + // logging of per-session TLS key material in order to make debugging easier in + // Wireshark. + /* + lbd.client = &http.Client{ + Transport: &http2.Transport{ + TLSClientConfig: &tls.Config{ + KeyLogWriter: w, + + Rand: utilities.RandZeroSource{}, // for reproducible output; don't do this. + InsecureSkipVerify: true, // test server certificate is not trusted. + }, + }, + } + */ + if debug { fmt.Printf("Started a load bearing download.\n") } @@ -64,6 +85,10 @@ func (lbu *LoadBearingUpload) Transferred() uint64 { return lbu.uploaded } +func (lbd *LoadBearingUpload) Client() *http.Client { + return lbd.client +} + type syntheticCountingReader struct { n *uint64 ctx context.Context diff --git a/networkQuality.go b/networkQuality.go index 886022e..673f58b 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -8,9 +8,11 @@ import ( _ "io" "io/ioutil" _ "log" + "math/rand" "net/http" "net/url" "os" + "strings" "time" "github.com/hawkinsw/goresponsiveness/ma" @@ -53,10 +55,12 @@ func toMBs(bytes float64) float64 { var ( // Variables to hold CLI arguments. - configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.") - configPort = flag.Int("port", 4043, "port number on which to access responsiveness configuration server.") - debug = flag.Bool("debug", false, "Enable debugging.") - timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.") + configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.") + configPort = flag.Int("port", 4043, "port number on which to access responsiveness configuration server.") + configPath = flag.String("path", "config", "path on the server to the configuration endpoint.") + debug = flag.Bool("debug", false, "Enable debugging.") + timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.") + storeSslKeys = flag.Bool("store-ssl-keys", false, "Store SSL keys from connections for debugging. (currently unused)") ) func addFlows(ctx context.Context, toAdd uint64, mcs *[]mc.MeasurableConnection, mcsPreviousTransferred *[]uint64, lbcGenerator func() mc.MeasurableConnection, debug bool) { @@ -72,8 +76,8 @@ func addFlows(ctx context.Context, toAdd uint64, mcs *[]mc.MeasurableConnection, } type SaturationResult struct { - RateBps float64 - FlowCount uint64 + RateBps float64 + Mcs []mc.MeasurableConnection } func saturate(ctx context.Context, saturated chan<- SaturationResult, lbcGenerator func() mc.MeasurableConnection, debug bool) { @@ -152,7 +156,7 @@ func saturate(ctx context.Context, saturated chan<- SaturationResult, lbcGenerat } } - saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), FlowCount: uint64(len(mcs))} + saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), Mcs: mcs} } func main() { @@ -161,7 +165,12 @@ func main() { timeoutDuration := time.Second * time.Duration(*timeout) configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort) - configUrl := fmt.Sprintf("https://%s/config", configHostPort) + + if !strings.HasPrefix(*configPath, "/") { + *configPath = "/" + *configPath + } + + configUrl := fmt.Sprintf("https://%s%s", configHostPort, *configPath) configClient := &http.Client{} resp, err := configClient.Get(configUrl) @@ -183,8 +192,6 @@ func main() { return } - // TODO: Make sure that all configuration values are present and accounted for! - if err := config.IsValid(); err != nil { fmt.Fprintf(os.Stderr, "Error: Invalid configuration returned from %s: %v\n", configUrl, err) return @@ -198,7 +205,6 @@ func main() { uploadSaturationChannel := make(chan SaturationResult) downloadSaturationChannel := make(chan SaturationResult) - timeoutChannel := timeoutat.TimeoutAt(operatingCtx, time.Now().Add(timeoutDuration), *debug) generate_lbd := func() mc.MeasurableConnection { @@ -211,29 +217,31 @@ func main() { go saturate(operatingCtx, downloadSaturationChannel, generate_lbd, *debug) go saturate(operatingCtx, uploadSaturationChannel, generate_lbu, *debug) - saturation_timeout := false + test_timeout := false upload_saturated := false download_saturated := false - for !saturation_timeout && !(upload_saturated && download_saturated) { + var downloadSaturation, uploadSaturation SaturationResult + + for !test_timeout && !(upload_saturated && download_saturated) { select { - case saturatedDownloadRate := <-downloadSaturationChannel: + case downloadSaturation = <-downloadSaturationChannel: { download_saturated = true if *debug { - fmt.Printf("################# download is saturated (%fMBps, %d flows)!\n", toMBs(saturatedDownloadRate.RateBps), saturatedDownloadRate.FlowCount) + fmt.Printf("################# download is saturated (%fMBps, %d flows)!\n", toMBs(downloadSaturation.RateBps), len(downloadSaturation.Mcs)) } } - case saturatedUploadRate := <-uploadSaturationChannel: + case uploadSaturation = <-uploadSaturationChannel: { upload_saturated = true if *debug { - fmt.Printf("################# upload is saturated (%fMBps, %d flows)!\n", toMBs(saturatedUploadRate.RateBps), saturatedUploadRate.FlowCount) + fmt.Printf("################# upload is saturated (%fMBps, %d flows)!\n", toMBs(uploadSaturation.RateBps), len(uploadSaturation.Mcs)) } } case <-timeoutChannel: { - saturation_timeout = true + test_timeout = true if *debug { fmt.Printf("################# timeout reaching saturation!\n") } @@ -241,15 +249,45 @@ func main() { } } - if saturation_timeout { + if test_timeout { cancelOperatingCtx() - fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation in maximum time of %v\n.", timeoutDuration) + fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation before test time expired.\n.", timeoutDuration) return } - time.Sleep(10 * time.Second) + // We are guaranteed to have an upload and download saturation result! - cancelOperatingCtx() + robustnessProbeIterationCount := 5 + actualProbeCount := 0 - time.Sleep(4 * time.Second) + totalProbeTime := float64(0) + for i := 0; i < robustnessProbeIterationCount && !test_timeout; i++ { + // There are len(downloadSaturation.Mcs) flows with http2 connections that + // we can piggy back. Let's choose one at random. + mcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Mcs) + select { + case <-timeoutChannel: + { + test_timeout = true + } + case probeTime := <-utilities.TimedSequentialGets(operatingCtx, downloadSaturation.Mcs[mcsIndex].Client(), &http.Client{}, config.Urls.SmallUrl): + { + actualProbeCount++ + totalProbeTime += probeTime.Delay.Seconds() + if *debug { + fmt.Printf("probeTime: %v\n", probeTime.Delay.Seconds()) + } + } + } + } + + averageProbeTime := totalProbeTime / (float64(actualProbeCount) * 5) + + fmt.Printf("RPM: %v\n", float64(60)/averageProbeTime) + + cancelOperatingCtx() + if *debug { + // Hold on to cool down. + time.Sleep(4 * time.Second) + } } diff --git a/utilities/utilities.go b/utilities/utilities.go index 54f0f4a..16585d6 100644 --- a/utilities/utilities.go +++ b/utilities/utilities.go @@ -1,6 +1,12 @@ package utilities -import "math" +import ( + "context" + "io" + "math" + "net/http" + "time" +) func SignedPercentDifference(current float64, previous float64) (difference float64) { return ((current - previous) / (float64(current+previous) / 2.0)) * float64(100) @@ -15,3 +21,45 @@ func Conditional(condition bool, t string, f string) string { } return f } + +type GetLatency struct { + Delay time.Duration + Err error +} + +func TimedSequentialGets(ctx context.Context, client_a *http.Client, client_b *http.Client, url string) chan GetLatency { + responseChannel := make(chan GetLatency) + go func() { + before := time.Now() + c_a, err := client_a.Get(url) + if err != nil { + responseChannel <- GetLatency{Delay: 0, Err: err} + } + // TODO: Make this interruptable somehow by using _ctx_. + _, err = io.ReadAll(c_a.Body) + if err != nil { + responseChannel <- GetLatency{Delay: 0, Err: err} + } + c_b, err := client_b.Get(url) + if err != nil { + responseChannel <- GetLatency{Delay: 0, Err: err} + } + // TODO: Make this interruptable somehow by using _ctx_. + _, err = io.ReadAll(c_b.Body) + if err != nil { + responseChannel <- GetLatency{Delay: 0, Err: err} + } + responseChannel <- GetLatency{Delay: time.Now().Sub(before), Err: nil} + }() + return responseChannel +} + +type RandZeroSource struct{} + +func (RandZeroSource) Read(b []byte) (n int, err error) { + for i := range b { + b[i] = 0 + } + + return len(b), nil +} |
