diff options
| -rw-r--r-- | ma/ma.go | 25 | ||||
| -rw-r--r-- | mc/mc.go | 47 | ||||
| -rw-r--r-- | networkQuality.go | 146 | ||||
| -rw-r--r-- | timeoutat/timeoutat.go | 30 | ||||
| -rw-r--r-- | utilities/utilities.go | 5 |
5 files changed, 197 insertions, 56 deletions
@@ -1,6 +1,8 @@ package ma import ( + "math" + "github.com/hawkinsw/goresponsiveness/saturating" ) @@ -8,25 +10,40 @@ import ( // in Go (1.18). type MovingAverage struct { intervals int - instants []uint64 + instants []float64 index int divisor *saturating.SaturatingInt } func NewMovingAverage(intervals int) *MovingAverage { - return &MovingAverage{instants: make([]uint64, intervals), intervals: intervals, divisor: saturating.NewSaturatingInt(intervals)} + return &MovingAverage{instants: make([]float64, intervals), intervals: intervals, divisor: saturating.NewSaturatingInt(intervals)} } -func (ma *MovingAverage) AddMeasurement(measurement uint64) { +func (ma *MovingAverage) AddMeasurement(measurement float64) { ma.instants[ma.index] = measurement ma.divisor.Add(1) ma.index = (ma.index + 1) % ma.intervals } func (ma *MovingAverage) CalculateAverage() float64 { - total := uint64(0) + total := float64(0) for i := 0; i < ma.intervals; i++ { total += ma.instants[i] } return float64(total) / float64(ma.divisor.Value()) } + +func (ma *MovingAverage) ConsistentWithin(limit float64) bool { + previous := ma.instants[0] + for i := 1; i < ma.intervals; i++ { + current := ma.instants[i] + delta := math.Abs(current - previous) + percentChange := (float64(delta) / (float64(current+previous) / 2.0)) * float64(100) + + previous = current + if math.Abs(percentChange) > limit { + return false + } + } + return true +} @@ -5,8 +5,11 @@ import ( "io" "io/ioutil" "net/http" + "fmt" ) +var chunkSize int = 5000 + type MeasurableConnection interface { Start(context.Context) bool Transferred() uint64 @@ -18,31 +21,33 @@ type LoadBearingDownload struct { client *http.Client } -func (lbc *LoadBearingDownload) Transferred() uint64 { - return lbc.downloaded +func (lbd *LoadBearingDownload) Transferred() uint64 { + return lbd.downloaded } -func (lbc *LoadBearingDownload) Start(ctx context.Context) bool { - lbc.downloaded = 0 - lbc.client = &http.Client{} - get, err := lbc.client.Get(lbc.Path) +func (lbd *LoadBearingDownload) Start(ctx context.Context) bool { + lbd.downloaded = 0 + lbd.client = &http.Client{} - if err != nil { - return false - } - go doDownload(get, &lbc.downloaded, ctx) + fmt.Printf("Started a load bearing download.\n") + go doDownload(lbd.client, lbd.Path, &lbd.downloaded, ctx) return true } -func doDownload(get *http.Response, count *uint64, ctx context.Context) { +func doDownload(client *http.Client, path string, count *uint64, ctx context.Context) { + get, err := client.Get(path) + if err != nil { + return + } for ctx.Err() == nil { - n, err := io.CopyN(ioutil.Discard, get.Body, 100) + n, err := io.CopyN(ioutil.Discard, get.Body, int64(chunkSize)) if err != nil { break } *count += uint64(n) } get.Body.Close() + fmt.Printf("Ending a load-bearing download.\n"); } type LoadBearingUpload struct { @@ -66,16 +71,24 @@ func (s *syntheticCountingReader) Read(p []byte) (n int, err error) { } err = nil n = len(p) + n = chunkSize *s.n += uint64(n) return } + +func doUpload(client *http.Client, path string, count *uint64, ctx context.Context) bool { + *count = 0 + s := &syntheticCountingReader{n: count, ctx: ctx} + resp, _ := client.Post(path, "application/octet-stream", s) + resp.Body.Close() + fmt.Printf("Ending a load-bearing upload.\n") + return true +} + func (lbu *LoadBearingUpload) Start(ctx context.Context) bool { lbu.uploaded = 0 lbu.client = &http.Client{} - s := &syntheticCountingReader{n: &lbu.uploaded, ctx: ctx} - go func() { - resp, _ := lbu.client.Post(lbu.Path, "application/octet-stream", s) - resp.Body.Close() - }() + fmt.Printf("Started a load bearing upload.\n") + go doUpload(lbu.client, lbu.Path, &lbu.uploaded, ctx) return true } diff --git a/networkQuality.go b/networkQuality.go index f254707..83346ac 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -5,13 +5,17 @@ import ( "encoding/json" "flag" "fmt" - "github.com/hawkinsw/goresponsiveness/ma" - "github.com/hawkinsw/goresponsiveness/mc" _ "io" "io/ioutil" _ "log" "net/http" + "os" "time" + + "github.com/hawkinsw/goresponsiveness/ma" + "github.com/hawkinsw/goresponsiveness/mc" + "github.com/hawkinsw/goresponsiveness/timeoutat" + "github.com/hawkinsw/goresponsiveness/utilities" ) type ConfigUrls struct { @@ -38,24 +42,33 @@ var ( configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.") configPort = flag.Int("port", 4043, "port number on which to access responsiveness configuration server.") debug = flag.Bool("debug", false, "Enable debugging.") + timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.") ) -func saturate(ctx context.Context, saturated chan<- interface{}, lbcGenerator func() mc.MeasurableConnection) { - mcs := make([]mc.MeasurableConnection, 4) - mcsPreviousTransferred := make([]uint64, 4) - for i := range mcs { +func addFlows(ctx context.Context, toAdd uint64, mcs *[]mc.MeasurableConnection, mcsPreviousTransferred *[]uint64, lbcGenerator func() mc.MeasurableConnection) { + for i := uint64(0); i < toAdd; i++ { //mcs[i] = &mc.LoadBearingUpload{Path: config.Urls.UploadUrl} - mcs[i] = lbcGenerator() - mcsPreviousTransferred[i] = 0 - if !mcs[i].Start(ctx) { + *mcs = append(*mcs, lbcGenerator()) + *mcsPreviousTransferred = append(*mcsPreviousTransferred, 0) + if !(*mcs)[len(*mcs)-1].Start(ctx) { fmt.Printf("Error starting %dth MC!\n", i) return } } +} + +func saturate(ctx context.Context, saturated chan<- float64, lbcGenerator func() mc.MeasurableConnection, debug bool) { + mcs := make([]mc.MeasurableConnection, 0) + mcsPreviousTransferred := make([]uint64, 0) + // Create 4 load bearing connections + addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator) + + previousFlowIncreaseIteration := uint64(0) previousMovingAverage := float64(0) movingAverage := ma.NewMovingAverage(4) - //lastFlowIncrease := uint64(0) + movingAverageAverage := ma.NewMovingAverage(4) + for currentIteration := uint64(0); true; currentIteration++ { // If we are cancelled, then stop. @@ -63,9 +76,10 @@ func saturate(ctx context.Context, saturated chan<- interface{}, lbcGenerator fu return } + // At each 1-second interval time.Sleep(time.Second) - // 1. Calculate the most recent goodput. + // Compute "instantaneous aggregate" goodput which is the number of bytes transferred within the last second. totalTransfer := uint64(0) for i := range mcs { previousTransferred := mcsPreviousTransferred[i] @@ -74,33 +88,59 @@ func saturate(ctx context.Context, saturated chan<- interface{}, lbcGenerator fu mcsPreviousTransferred[i] = currentTransferred } - // 2. Calculate the delta - movingAverage.AddMeasurement(totalTransfer) + // Compute a moving average of the last 4 "instantaneous aggregate goodput" measurements + movingAverage.AddMeasurement(float64(totalTransfer)) currentMovingAverage := movingAverage.CalculateAverage() - movingAverageDelta := ((currentMovingAverage - previousMovingAverage) / (float64(currentMovingAverage+previousMovingAverage) / 2.0)) * float64(100) + movingAverageAverage.AddMeasurement(currentMovingAverage) + movingAverageDelta := utilities.PercentDifference(currentMovingAverage, previousMovingAverage) previousMovingAverage = currentMovingAverage - fmt.Printf("Instantaneous goodput: %f MB.\n", toMBs(float64(totalTransfer))) - fmt.Printf("Moving average: %f MB.\n", toMBs(currentMovingAverage)) - fmt.Printf("Moving average delta: %f.\n", movingAverageDelta) - + if debug { + fmt.Printf("Instantaneous goodput: %f MB.\n", toMBs(float64(totalTransfer))) + fmt.Printf("Moving average: %f MB.\n", toMBs(currentMovingAverage)) + fmt.Printf("Moving average delta: %f.\n", movingAverageDelta) + } - // 3. Are we stable or not? - if currentIteration != 0 && movingAverageDelta < float64(5) { - // We are stable! - fmt.Printf("Stable\n") - break - } else { - // We are unstable! - fmt.Printf("Unstable\n") + // If moving average > "previous" moving average + 5%: + if currentIteration == 0 || movingAverageDelta > float64(5) { + // Network did not yet reach saturation. If no flows added within the last 4 seconds, add 4 more flows + if (currentIteration - previousFlowIncreaseIteration) > 4 { + if debug { + fmt.Printf("Adding flows because we are unsaturated and waited a while.\n") + } + addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator) + previousFlowIncreaseIteration = currentIteration + } else { + if debug { + fmt.Printf("We are unsaturated, but it still too early to add anything.\n") + } + } + } else { // Else, network reached saturation for the current flow count. + // If new flows added and for 4 seconds the moving average throughput did not change: network reached stable saturation + if (currentIteration-previousFlowIncreaseIteration) < 4 && movingAverageAverage.ConsistentWithin(float64(4)) { + if debug { + fmt.Printf("New flows added within the last four seconds and the moving-average average is consistent!\n") + } + break + } else { + // Else, add four more flows + if debug { + fmt.Printf("New flows to add to try to increase our saturation!\n") + } + addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator) + previousFlowIncreaseIteration = currentIteration + } } + } - saturated <- struct{}{} + saturated <- movingAverage.CalculateAverage() } func main() { flag.Parse() + timeoutDuration := time.Second * time.Duration(*timeout) + configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort) configUrl := fmt.Sprintf("https://%s/config", configHostPort) @@ -130,22 +170,58 @@ func main() { operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background()) - uploadSaturationChannel := make(chan interface{}) + uploadSaturationChannel := make(chan float64) + downloadSaturationChannel := make(chan float64) + timeoutChannel := make(chan interface{}) + + _ = timeoutat.NewTimeoutAt(operatingCtx, time.Now().Add(timeoutDuration), timeoutChannel) - g := func() mc.MeasurableConnection { - return &mc.LoadBearingDownload{Path: config.Urls.LargeUrl} + generate_lbd := func() mc.MeasurableConnection { + return &mc.LoadBearingDownload{Path: config.Urls.LargeUrl} + } + generate_lbu := func() mc.MeasurableConnection { + return &mc.LoadBearingUpload{Path: config.Urls.UploadUrl} } + go saturate(operatingCtx, downloadSaturationChannel, generate_lbd, *debug) + go saturate(operatingCtx, uploadSaturationChannel, generate_lbu, *debug) - go saturate(operatingCtx, uploadSaturationChannel, g) + saturation_timeout := false + upload_saturated := false + download_saturated := false - select { - case <-uploadSaturationChannel: - { - fmt.Printf("upload is saturated!\n") + for !saturation_timeout && !(upload_saturated && download_saturated) { + select { + case saturatedDownloadRate := <-downloadSaturationChannel: + { + download_saturated = true + if *debug { + fmt.Printf("################## download is saturated (%f)!\n", toMBs(saturatedDownloadRate)) + } + } + case saturatedUploadRate := <-uploadSaturationChannel: + { + upload_saturated = true + if *debug { + fmt.Printf("################# upload is saturated (%f)!\n", toMBs(saturatedUploadRate)) + } + } + case <-timeoutChannel: + { + saturation_timeout = true + if *debug { + fmt.Printf("################# timeout reaching saturation!\n") + } + } } } + if saturation_timeout { + cancelOperatingCtx() + fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation in maximum time of %v.", timeoutDuration) + return + } + time.Sleep(10 * time.Second) cancelOperatingCtx() diff --git a/timeoutat/timeoutat.go b/timeoutat/timeoutat.go new file mode 100644 index 0000000..e77804e --- /dev/null +++ b/timeoutat/timeoutat.go @@ -0,0 +1,30 @@ +package timeoutat + +import ( + "context" + "fmt" + "time" +) + +type TimeoutAt struct { + when time.Time + response chan interface{} +} + +func NewTimeoutAt(ctx context.Context, when time.Time, response chan interface{}) *TimeoutAt { + timeoutAt := &TimeoutAt{when: when, response: response} + timeoutAt.start(ctx) + return timeoutAt +} + +func (ta *TimeoutAt) start(ctx context.Context) { + go func() { + fmt.Printf("Timeout expected to end at %v\n", ta.when) + select { + case <-time.After(ta.when.Sub(time.Now())): + case <-ctx.Done(): + } + ta.response <- struct{}{} + fmt.Printf("Timeout ended at %v\n", time.Now()) + }() +} diff --git a/utilities/utilities.go b/utilities/utilities.go new file mode 100644 index 0000000..a372141 --- /dev/null +++ b/utilities/utilities.go @@ -0,0 +1,5 @@ +package utilities + +func PercentDifference(current float64, previous float64) (difference float64) { + return ((current - previous) / (float64(current+previous) / 2.0)) * float64(100) +} |
