summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ma/ma.go25
-rw-r--r--mc/mc.go47
-rw-r--r--networkQuality.go146
-rw-r--r--timeoutat/timeoutat.go30
-rw-r--r--utilities/utilities.go5
5 files changed, 197 insertions, 56 deletions
diff --git a/ma/ma.go b/ma/ma.go
index 9d52d97..0dee17f 100644
--- a/ma/ma.go
+++ b/ma/ma.go
@@ -1,6 +1,8 @@
package ma
import (
+ "math"
+
"github.com/hawkinsw/goresponsiveness/saturating"
)
@@ -8,25 +10,40 @@ import (
// in Go (1.18).
type MovingAverage struct {
intervals int
- instants []uint64
+ instants []float64
index int
divisor *saturating.SaturatingInt
}
func NewMovingAverage(intervals int) *MovingAverage {
- return &MovingAverage{instants: make([]uint64, intervals), intervals: intervals, divisor: saturating.NewSaturatingInt(intervals)}
+ return &MovingAverage{instants: make([]float64, intervals), intervals: intervals, divisor: saturating.NewSaturatingInt(intervals)}
}
-func (ma *MovingAverage) AddMeasurement(measurement uint64) {
+func (ma *MovingAverage) AddMeasurement(measurement float64) {
ma.instants[ma.index] = measurement
ma.divisor.Add(1)
ma.index = (ma.index + 1) % ma.intervals
}
func (ma *MovingAverage) CalculateAverage() float64 {
- total := uint64(0)
+ total := float64(0)
for i := 0; i < ma.intervals; i++ {
total += ma.instants[i]
}
return float64(total) / float64(ma.divisor.Value())
}
+
+func (ma *MovingAverage) ConsistentWithin(limit float64) bool {
+ previous := ma.instants[0]
+ for i := 1; i < ma.intervals; i++ {
+ current := ma.instants[i]
+ delta := math.Abs(current - previous)
+ percentChange := (float64(delta) / (float64(current+previous) / 2.0)) * float64(100)
+
+ previous = current
+ if math.Abs(percentChange) > limit {
+ return false
+ }
+ }
+ return true
+}
diff --git a/mc/mc.go b/mc/mc.go
index 0b1d69b..bc0645a 100644
--- a/mc/mc.go
+++ b/mc/mc.go
@@ -5,8 +5,11 @@ import (
"io"
"io/ioutil"
"net/http"
+ "fmt"
)
+var chunkSize int = 5000
+
type MeasurableConnection interface {
Start(context.Context) bool
Transferred() uint64
@@ -18,31 +21,33 @@ type LoadBearingDownload struct {
client *http.Client
}
-func (lbc *LoadBearingDownload) Transferred() uint64 {
- return lbc.downloaded
+func (lbd *LoadBearingDownload) Transferred() uint64 {
+ return lbd.downloaded
}
-func (lbc *LoadBearingDownload) Start(ctx context.Context) bool {
- lbc.downloaded = 0
- lbc.client = &http.Client{}
- get, err := lbc.client.Get(lbc.Path)
+func (lbd *LoadBearingDownload) Start(ctx context.Context) bool {
+ lbd.downloaded = 0
+ lbd.client = &http.Client{}
- if err != nil {
- return false
- }
- go doDownload(get, &lbc.downloaded, ctx)
+ fmt.Printf("Started a load bearing download.\n")
+ go doDownload(lbd.client, lbd.Path, &lbd.downloaded, ctx)
return true
}
-func doDownload(get *http.Response, count *uint64, ctx context.Context) {
+func doDownload(client *http.Client, path string, count *uint64, ctx context.Context) {
+ get, err := client.Get(path)
+ if err != nil {
+ return
+ }
for ctx.Err() == nil {
- n, err := io.CopyN(ioutil.Discard, get.Body, 100)
+ n, err := io.CopyN(ioutil.Discard, get.Body, int64(chunkSize))
if err != nil {
break
}
*count += uint64(n)
}
get.Body.Close()
+ fmt.Printf("Ending a load-bearing download.\n");
}
type LoadBearingUpload struct {
@@ -66,16 +71,24 @@ func (s *syntheticCountingReader) Read(p []byte) (n int, err error) {
}
err = nil
n = len(p)
+ n = chunkSize
*s.n += uint64(n)
return
}
+
+func doUpload(client *http.Client, path string, count *uint64, ctx context.Context) bool {
+ *count = 0
+ s := &syntheticCountingReader{n: count, ctx: ctx}
+ resp, _ := client.Post(path, "application/octet-stream", s)
+ resp.Body.Close()
+ fmt.Printf("Ending a load-bearing upload.\n")
+ return true
+}
+
func (lbu *LoadBearingUpload) Start(ctx context.Context) bool {
lbu.uploaded = 0
lbu.client = &http.Client{}
- s := &syntheticCountingReader{n: &lbu.uploaded, ctx: ctx}
- go func() {
- resp, _ := lbu.client.Post(lbu.Path, "application/octet-stream", s)
- resp.Body.Close()
- }()
+ fmt.Printf("Started a load bearing upload.\n")
+ go doUpload(lbu.client, lbu.Path, &lbu.uploaded, ctx)
return true
}
diff --git a/networkQuality.go b/networkQuality.go
index f254707..83346ac 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -5,13 +5,17 @@ import (
"encoding/json"
"flag"
"fmt"
- "github.com/hawkinsw/goresponsiveness/ma"
- "github.com/hawkinsw/goresponsiveness/mc"
_ "io"
"io/ioutil"
_ "log"
"net/http"
+ "os"
"time"
+
+ "github.com/hawkinsw/goresponsiveness/ma"
+ "github.com/hawkinsw/goresponsiveness/mc"
+ "github.com/hawkinsw/goresponsiveness/timeoutat"
+ "github.com/hawkinsw/goresponsiveness/utilities"
)
type ConfigUrls struct {
@@ -38,24 +42,33 @@ var (
configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.")
configPort = flag.Int("port", 4043, "port number on which to access responsiveness configuration server.")
debug = flag.Bool("debug", false, "Enable debugging.")
+ timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.")
)
-func saturate(ctx context.Context, saturated chan<- interface{}, lbcGenerator func() mc.MeasurableConnection) {
- mcs := make([]mc.MeasurableConnection, 4)
- mcsPreviousTransferred := make([]uint64, 4)
- for i := range mcs {
+func addFlows(ctx context.Context, toAdd uint64, mcs *[]mc.MeasurableConnection, mcsPreviousTransferred *[]uint64, lbcGenerator func() mc.MeasurableConnection) {
+ for i := uint64(0); i < toAdd; i++ {
//mcs[i] = &mc.LoadBearingUpload{Path: config.Urls.UploadUrl}
- mcs[i] = lbcGenerator()
- mcsPreviousTransferred[i] = 0
- if !mcs[i].Start(ctx) {
+ *mcs = append(*mcs, lbcGenerator())
+ *mcsPreviousTransferred = append(*mcsPreviousTransferred, 0)
+ if !(*mcs)[len(*mcs)-1].Start(ctx) {
fmt.Printf("Error starting %dth MC!\n", i)
return
}
}
+}
+
+func saturate(ctx context.Context, saturated chan<- float64, lbcGenerator func() mc.MeasurableConnection, debug bool) {
+ mcs := make([]mc.MeasurableConnection, 0)
+ mcsPreviousTransferred := make([]uint64, 0)
+ // Create 4 load bearing connections
+ addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator)
+
+ previousFlowIncreaseIteration := uint64(0)
previousMovingAverage := float64(0)
movingAverage := ma.NewMovingAverage(4)
- //lastFlowIncrease := uint64(0)
+ movingAverageAverage := ma.NewMovingAverage(4)
+
for currentIteration := uint64(0); true; currentIteration++ {
// If we are cancelled, then stop.
@@ -63,9 +76,10 @@ func saturate(ctx context.Context, saturated chan<- interface{}, lbcGenerator fu
return
}
+ // At each 1-second interval
time.Sleep(time.Second)
- // 1. Calculate the most recent goodput.
+ // Compute "instantaneous aggregate" goodput which is the number of bytes transferred within the last second.
totalTransfer := uint64(0)
for i := range mcs {
previousTransferred := mcsPreviousTransferred[i]
@@ -74,33 +88,59 @@ func saturate(ctx context.Context, saturated chan<- interface{}, lbcGenerator fu
mcsPreviousTransferred[i] = currentTransferred
}
- // 2. Calculate the delta
- movingAverage.AddMeasurement(totalTransfer)
+ // Compute a moving average of the last 4 "instantaneous aggregate goodput" measurements
+ movingAverage.AddMeasurement(float64(totalTransfer))
currentMovingAverage := movingAverage.CalculateAverage()
- movingAverageDelta := ((currentMovingAverage - previousMovingAverage) / (float64(currentMovingAverage+previousMovingAverage) / 2.0)) * float64(100)
+ movingAverageAverage.AddMeasurement(currentMovingAverage)
+ movingAverageDelta := utilities.PercentDifference(currentMovingAverage, previousMovingAverage)
previousMovingAverage = currentMovingAverage
- fmt.Printf("Instantaneous goodput: %f MB.\n", toMBs(float64(totalTransfer)))
- fmt.Printf("Moving average: %f MB.\n", toMBs(currentMovingAverage))
- fmt.Printf("Moving average delta: %f.\n", movingAverageDelta)
-
+ if debug {
+ fmt.Printf("Instantaneous goodput: %f MB.\n", toMBs(float64(totalTransfer)))
+ fmt.Printf("Moving average: %f MB.\n", toMBs(currentMovingAverage))
+ fmt.Printf("Moving average delta: %f.\n", movingAverageDelta)
+ }
- // 3. Are we stable or not?
- if currentIteration != 0 && movingAverageDelta < float64(5) {
- // We are stable!
- fmt.Printf("Stable\n")
- break
- } else {
- // We are unstable!
- fmt.Printf("Unstable\n")
+ // If moving average > "previous" moving average + 5%:
+ if currentIteration == 0 || movingAverageDelta > float64(5) {
+ // Network did not yet reach saturation. If no flows added within the last 4 seconds, add 4 more flows
+ if (currentIteration - previousFlowIncreaseIteration) > 4 {
+ if debug {
+ fmt.Printf("Adding flows because we are unsaturated and waited a while.\n")
+ }
+ addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator)
+ previousFlowIncreaseIteration = currentIteration
+ } else {
+ if debug {
+ fmt.Printf("We are unsaturated, but it still too early to add anything.\n")
+ }
+ }
+ } else { // Else, network reached saturation for the current flow count.
+ // If new flows added and for 4 seconds the moving average throughput did not change: network reached stable saturation
+ if (currentIteration-previousFlowIncreaseIteration) < 4 && movingAverageAverage.ConsistentWithin(float64(4)) {
+ if debug {
+ fmt.Printf("New flows added within the last four seconds and the moving-average average is consistent!\n")
+ }
+ break
+ } else {
+ // Else, add four more flows
+ if debug {
+ fmt.Printf("New flows to add to try to increase our saturation!\n")
+ }
+ addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator)
+ previousFlowIncreaseIteration = currentIteration
+ }
}
+
}
- saturated <- struct{}{}
+ saturated <- movingAverage.CalculateAverage()
}
func main() {
flag.Parse()
+ timeoutDuration := time.Second * time.Duration(*timeout)
+
configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort)
configUrl := fmt.Sprintf("https://%s/config", configHostPort)
@@ -130,22 +170,58 @@ func main() {
operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background())
- uploadSaturationChannel := make(chan interface{})
+ uploadSaturationChannel := make(chan float64)
+ downloadSaturationChannel := make(chan float64)
+ timeoutChannel := make(chan interface{})
+
+ _ = timeoutat.NewTimeoutAt(operatingCtx, time.Now().Add(timeoutDuration), timeoutChannel)
- g := func() mc.MeasurableConnection {
- return &mc.LoadBearingDownload{Path: config.Urls.LargeUrl}
+ generate_lbd := func() mc.MeasurableConnection {
+ return &mc.LoadBearingDownload{Path: config.Urls.LargeUrl}
+ }
+ generate_lbu := func() mc.MeasurableConnection {
+ return &mc.LoadBearingUpload{Path: config.Urls.UploadUrl}
}
+ go saturate(operatingCtx, downloadSaturationChannel, generate_lbd, *debug)
+ go saturate(operatingCtx, uploadSaturationChannel, generate_lbu, *debug)
- go saturate(operatingCtx, uploadSaturationChannel, g)
+ saturation_timeout := false
+ upload_saturated := false
+ download_saturated := false
- select {
- case <-uploadSaturationChannel:
- {
- fmt.Printf("upload is saturated!\n")
+ for !saturation_timeout && !(upload_saturated && download_saturated) {
+ select {
+ case saturatedDownloadRate := <-downloadSaturationChannel:
+ {
+ download_saturated = true
+ if *debug {
+ fmt.Printf("################## download is saturated (%f)!\n", toMBs(saturatedDownloadRate))
+ }
+ }
+ case saturatedUploadRate := <-uploadSaturationChannel:
+ {
+ upload_saturated = true
+ if *debug {
+ fmt.Printf("################# upload is saturated (%f)!\n", toMBs(saturatedUploadRate))
+ }
+ }
+ case <-timeoutChannel:
+ {
+ saturation_timeout = true
+ if *debug {
+ fmt.Printf("################# timeout reaching saturation!\n")
+ }
+ }
}
}
+ if saturation_timeout {
+ cancelOperatingCtx()
+ fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation in maximum time of %v.", timeoutDuration)
+ return
+ }
+
time.Sleep(10 * time.Second)
cancelOperatingCtx()
diff --git a/timeoutat/timeoutat.go b/timeoutat/timeoutat.go
new file mode 100644
index 0000000..e77804e
--- /dev/null
+++ b/timeoutat/timeoutat.go
@@ -0,0 +1,30 @@
+package timeoutat
+
+import (
+ "context"
+ "fmt"
+ "time"
+)
+
+type TimeoutAt struct {
+ when time.Time
+ response chan interface{}
+}
+
+func NewTimeoutAt(ctx context.Context, when time.Time, response chan interface{}) *TimeoutAt {
+ timeoutAt := &TimeoutAt{when: when, response: response}
+ timeoutAt.start(ctx)
+ return timeoutAt
+}
+
+func (ta *TimeoutAt) start(ctx context.Context) {
+ go func() {
+ fmt.Printf("Timeout expected to end at %v\n", ta.when)
+ select {
+ case <-time.After(ta.when.Sub(time.Now())):
+ case <-ctx.Done():
+ }
+ ta.response <- struct{}{}
+ fmt.Printf("Timeout ended at %v\n", time.Now())
+ }()
+}
diff --git a/utilities/utilities.go b/utilities/utilities.go
new file mode 100644
index 0000000..a372141
--- /dev/null
+++ b/utilities/utilities.go
@@ -0,0 +1,5 @@
+package utilities
+
+func PercentDifference(current float64, previous float64) (difference float64) {
+ return ((current - previous) / (float64(current+previous) / 2.0)) * float64(100)
+}