summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go.mod6
-rw-r--r--mc/mc.go25
-rw-r--r--networkQuality.go84
-rw-r--r--utilities/utilities.go50
4 files changed, 140 insertions, 25 deletions
diff --git a/go.mod b/go.mod
index ee5ae7e..2b40bb2 100644
--- a/go.mod
+++ b/go.mod
@@ -1,3 +1,7 @@
module github.com/hawkinsw/goresponsiveness
-go 1.16
+go 1.17
+
+require golang.org/x/net v0.0.0-20211209124913-491a49abca63
+
+require golang.org/x/text v0.3.6 // indirect
diff --git a/mc/mc.go b/mc/mc.go
index a70f16e..710e203 100644
--- a/mc/mc.go
+++ b/mc/mc.go
@@ -13,6 +13,7 @@ var chunkSize int = 5000
type MeasurableConnection interface {
Start(context.Context, bool) bool
Transferred() uint64
+ Client() *http.Client
}
type LoadBearingDownload struct {
@@ -25,10 +26,30 @@ func (lbd *LoadBearingDownload) Transferred() uint64 {
return lbd.downloaded
}
+func (lbd *LoadBearingDownload) Client() *http.Client {
+ return lbd.client
+}
+
func (lbd *LoadBearingDownload) Start(ctx context.Context, debug bool) bool {
lbd.downloaded = 0
lbd.client = &http.Client{}
+ // At some point this might be useful: It is a snippet of code that will enable
+ // logging of per-session TLS key material in order to make debugging easier in
+ // Wireshark.
+ /*
+ lbd.client = &http.Client{
+ Transport: &http2.Transport{
+ TLSClientConfig: &tls.Config{
+ KeyLogWriter: w,
+
+ Rand: utilities.RandZeroSource{}, // for reproducible output; don't do this.
+ InsecureSkipVerify: true, // test server certificate is not trusted.
+ },
+ },
+ }
+ */
+
if debug {
fmt.Printf("Started a load bearing download.\n")
}
@@ -64,6 +85,10 @@ func (lbu *LoadBearingUpload) Transferred() uint64 {
return lbu.uploaded
}
+func (lbd *LoadBearingUpload) Client() *http.Client {
+ return lbd.client
+}
+
type syntheticCountingReader struct {
n *uint64
ctx context.Context
diff --git a/networkQuality.go b/networkQuality.go
index 886022e..673f58b 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -8,9 +8,11 @@ import (
_ "io"
"io/ioutil"
_ "log"
+ "math/rand"
"net/http"
"net/url"
"os"
+ "strings"
"time"
"github.com/hawkinsw/goresponsiveness/ma"
@@ -53,10 +55,12 @@ func toMBs(bytes float64) float64 {
var (
// Variables to hold CLI arguments.
- configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.")
- configPort = flag.Int("port", 4043, "port number on which to access responsiveness configuration server.")
- debug = flag.Bool("debug", false, "Enable debugging.")
- timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.")
+ configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.")
+ configPort = flag.Int("port", 4043, "port number on which to access responsiveness configuration server.")
+ configPath = flag.String("path", "config", "path on the server to the configuration endpoint.")
+ debug = flag.Bool("debug", false, "Enable debugging.")
+ timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.")
+ storeSslKeys = flag.Bool("store-ssl-keys", false, "Store SSL keys from connections for debugging. (currently unused)")
)
func addFlows(ctx context.Context, toAdd uint64, mcs *[]mc.MeasurableConnection, mcsPreviousTransferred *[]uint64, lbcGenerator func() mc.MeasurableConnection, debug bool) {
@@ -72,8 +76,8 @@ func addFlows(ctx context.Context, toAdd uint64, mcs *[]mc.MeasurableConnection,
}
type SaturationResult struct {
- RateBps float64
- FlowCount uint64
+ RateBps float64
+ Mcs []mc.MeasurableConnection
}
func saturate(ctx context.Context, saturated chan<- SaturationResult, lbcGenerator func() mc.MeasurableConnection, debug bool) {
@@ -152,7 +156,7 @@ func saturate(ctx context.Context, saturated chan<- SaturationResult, lbcGenerat
}
}
- saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), FlowCount: uint64(len(mcs))}
+ saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), Mcs: mcs}
}
func main() {
@@ -161,7 +165,12 @@ func main() {
timeoutDuration := time.Second * time.Duration(*timeout)
configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort)
- configUrl := fmt.Sprintf("https://%s/config", configHostPort)
+
+ if !strings.HasPrefix(*configPath, "/") {
+ *configPath = "/" + *configPath
+ }
+
+ configUrl := fmt.Sprintf("https://%s%s", configHostPort, *configPath)
configClient := &http.Client{}
resp, err := configClient.Get(configUrl)
@@ -183,8 +192,6 @@ func main() {
return
}
- // TODO: Make sure that all configuration values are present and accounted for!
-
if err := config.IsValid(); err != nil {
fmt.Fprintf(os.Stderr, "Error: Invalid configuration returned from %s: %v\n", configUrl, err)
return
@@ -198,7 +205,6 @@ func main() {
uploadSaturationChannel := make(chan SaturationResult)
downloadSaturationChannel := make(chan SaturationResult)
-
timeoutChannel := timeoutat.TimeoutAt(operatingCtx, time.Now().Add(timeoutDuration), *debug)
generate_lbd := func() mc.MeasurableConnection {
@@ -211,29 +217,31 @@ func main() {
go saturate(operatingCtx, downloadSaturationChannel, generate_lbd, *debug)
go saturate(operatingCtx, uploadSaturationChannel, generate_lbu, *debug)
- saturation_timeout := false
+ test_timeout := false
upload_saturated := false
download_saturated := false
- for !saturation_timeout && !(upload_saturated && download_saturated) {
+ var downloadSaturation, uploadSaturation SaturationResult
+
+ for !test_timeout && !(upload_saturated && download_saturated) {
select {
- case saturatedDownloadRate := <-downloadSaturationChannel:
+ case downloadSaturation = <-downloadSaturationChannel:
{
download_saturated = true
if *debug {
- fmt.Printf("################# download is saturated (%fMBps, %d flows)!\n", toMBs(saturatedDownloadRate.RateBps), saturatedDownloadRate.FlowCount)
+ fmt.Printf("################# download is saturated (%fMBps, %d flows)!\n", toMBs(downloadSaturation.RateBps), len(downloadSaturation.Mcs))
}
}
- case saturatedUploadRate := <-uploadSaturationChannel:
+ case uploadSaturation = <-uploadSaturationChannel:
{
upload_saturated = true
if *debug {
- fmt.Printf("################# upload is saturated (%fMBps, %d flows)!\n", toMBs(saturatedUploadRate.RateBps), saturatedUploadRate.FlowCount)
+ fmt.Printf("################# upload is saturated (%fMBps, %d flows)!\n", toMBs(uploadSaturation.RateBps), len(uploadSaturation.Mcs))
}
}
case <-timeoutChannel:
{
- saturation_timeout = true
+ test_timeout = true
if *debug {
fmt.Printf("################# timeout reaching saturation!\n")
}
@@ -241,15 +249,45 @@ func main() {
}
}
- if saturation_timeout {
+ if test_timeout {
cancelOperatingCtx()
- fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation in maximum time of %v\n.", timeoutDuration)
+ fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation before test time expired.\n.", timeoutDuration)
return
}
- time.Sleep(10 * time.Second)
+ // We are guaranteed to have an upload and download saturation result!
- cancelOperatingCtx()
+ robustnessProbeIterationCount := 5
+ actualProbeCount := 0
- time.Sleep(4 * time.Second)
+ totalProbeTime := float64(0)
+ for i := 0; i < robustnessProbeIterationCount && !test_timeout; i++ {
+ // There are len(downloadSaturation.Mcs) flows with http2 connections that
+ // we can piggy back. Let's choose one at random.
+ mcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Mcs)
+ select {
+ case <-timeoutChannel:
+ {
+ test_timeout = true
+ }
+ case probeTime := <-utilities.TimedSequentialGets(operatingCtx, downloadSaturation.Mcs[mcsIndex].Client(), &http.Client{}, config.Urls.SmallUrl):
+ {
+ actualProbeCount++
+ totalProbeTime += probeTime.Delay.Seconds()
+ if *debug {
+ fmt.Printf("probeTime: %v\n", probeTime.Delay.Seconds())
+ }
+ }
+ }
+ }
+
+ averageProbeTime := totalProbeTime / (float64(actualProbeCount) * 5)
+
+ fmt.Printf("RPM: %v\n", float64(60)/averageProbeTime)
+
+ cancelOperatingCtx()
+ if *debug {
+ // Hold on to cool down.
+ time.Sleep(4 * time.Second)
+ }
}
diff --git a/utilities/utilities.go b/utilities/utilities.go
index 54f0f4a..16585d6 100644
--- a/utilities/utilities.go
+++ b/utilities/utilities.go
@@ -1,6 +1,12 @@
package utilities
-import "math"
+import (
+ "context"
+ "io"
+ "math"
+ "net/http"
+ "time"
+)
func SignedPercentDifference(current float64, previous float64) (difference float64) {
return ((current - previous) / (float64(current+previous) / 2.0)) * float64(100)
@@ -15,3 +21,45 @@ func Conditional(condition bool, t string, f string) string {
}
return f
}
+
+type GetLatency struct {
+ Delay time.Duration
+ Err error
+}
+
+func TimedSequentialGets(ctx context.Context, client_a *http.Client, client_b *http.Client, url string) chan GetLatency {
+ responseChannel := make(chan GetLatency)
+ go func() {
+ before := time.Now()
+ c_a, err := client_a.Get(url)
+ if err != nil {
+ responseChannel <- GetLatency{Delay: 0, Err: err}
+ }
+ // TODO: Make this interruptable somehow by using _ctx_.
+ _, err = io.ReadAll(c_a.Body)
+ if err != nil {
+ responseChannel <- GetLatency{Delay: 0, Err: err}
+ }
+ c_b, err := client_b.Get(url)
+ if err != nil {
+ responseChannel <- GetLatency{Delay: 0, Err: err}
+ }
+ // TODO: Make this interruptable somehow by using _ctx_.
+ _, err = io.ReadAll(c_b.Body)
+ if err != nil {
+ responseChannel <- GetLatency{Delay: 0, Err: err}
+ }
+ responseChannel <- GetLatency{Delay: time.Now().Sub(before), Err: nil}
+ }()
+ return responseChannel
+}
+
+type RandZeroSource struct{}
+
+func (RandZeroSource) Read(b []byte) (n int, err error) {
+ for i := range b {
+ b[i] = 0
+ }
+
+ return len(b), nil
+}