summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ma/ma.go5
-rw-r--r--mc/mc.go28
-rw-r--r--networkQuality.go42
-rw-r--r--timeoutat/timeoutat.go39
-rw-r--r--utilities/utilities.go7
5 files changed, 67 insertions, 54 deletions
diff --git a/ma/ma.go b/ma/ma.go
index 0dee17f..5c3558b 100644
--- a/ma/ma.go
+++ b/ma/ma.go
@@ -4,6 +4,7 @@ import (
"math"
"github.com/hawkinsw/goresponsiveness/saturating"
+ "github.com/hawkinsw/goresponsiveness/utilities"
)
// Convert this to a Type Parameterized interface when they are available
@@ -37,9 +38,7 @@ func (ma *MovingAverage) ConsistentWithin(limit float64) bool {
previous := ma.instants[0]
for i := 1; i < ma.intervals; i++ {
current := ma.instants[i]
- delta := math.Abs(current - previous)
- percentChange := (float64(delta) / (float64(current+previous) / 2.0)) * float64(100)
-
+ percentChange := utilities.AbsPercentDifference(current, previous)
previous = current
if math.Abs(percentChange) > limit {
return false
diff --git a/mc/mc.go b/mc/mc.go
index bc0645a..a70f16e 100644
--- a/mc/mc.go
+++ b/mc/mc.go
@@ -2,16 +2,16 @@ package mc
import (
"context"
+ "fmt"
"io"
"io/ioutil"
"net/http"
- "fmt"
)
var chunkSize int = 5000
type MeasurableConnection interface {
- Start(context.Context) bool
+ Start(context.Context, bool) bool
Transferred() uint64
}
@@ -25,16 +25,18 @@ func (lbd *LoadBearingDownload) Transferred() uint64 {
return lbd.downloaded
}
-func (lbd *LoadBearingDownload) Start(ctx context.Context) bool {
+func (lbd *LoadBearingDownload) Start(ctx context.Context, debug bool) bool {
lbd.downloaded = 0
lbd.client = &http.Client{}
- fmt.Printf("Started a load bearing download.\n")
- go doDownload(lbd.client, lbd.Path, &lbd.downloaded, ctx)
+ if debug {
+ fmt.Printf("Started a load bearing download.\n")
+ }
+ go doDownload(ctx, lbd.client, lbd.Path, &lbd.downloaded, debug)
return true
}
-func doDownload(client *http.Client, path string, count *uint64, ctx context.Context) {
+func doDownload(ctx context.Context, client *http.Client, path string, count *uint64, debug bool) {
get, err := client.Get(path)
if err != nil {
return
@@ -47,7 +49,9 @@ func doDownload(client *http.Client, path string, count *uint64, ctx context.Con
*count += uint64(n)
}
get.Body.Close()
- fmt.Printf("Ending a load-bearing download.\n");
+ if debug {
+ fmt.Printf("Ending a load-bearing download.\n")
+ }
}
type LoadBearingUpload struct {
@@ -76,19 +80,21 @@ func (s *syntheticCountingReader) Read(p []byte) (n int, err error) {
return
}
-func doUpload(client *http.Client, path string, count *uint64, ctx context.Context) bool {
+func doUpload(ctx context.Context, client *http.Client, path string, count *uint64, debug bool) bool {
*count = 0
s := &syntheticCountingReader{n: count, ctx: ctx}
resp, _ := client.Post(path, "application/octet-stream", s)
resp.Body.Close()
- fmt.Printf("Ending a load-bearing upload.\n")
+ if debug {
+ fmt.Printf("Ending a load-bearing upload.\n")
+ }
return true
}
-func (lbu *LoadBearingUpload) Start(ctx context.Context) bool {
+func (lbu *LoadBearingUpload) Start(ctx context.Context, debug bool) bool {
lbu.uploaded = 0
lbu.client = &http.Client{}
fmt.Printf("Started a load bearing upload.\n")
- go doUpload(lbu.client, lbu.Path, &lbu.uploaded, ctx)
+ go doUpload(ctx, lbu.client, lbu.Path, &lbu.uploaded, debug)
return true
}
diff --git a/networkQuality.go b/networkQuality.go
index 83346ac..ae8e25b 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -45,24 +45,29 @@ var (
timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.")
)
-func addFlows(ctx context.Context, toAdd uint64, mcs *[]mc.MeasurableConnection, mcsPreviousTransferred *[]uint64, lbcGenerator func() mc.MeasurableConnection) {
+func addFlows(ctx context.Context, toAdd uint64, mcs *[]mc.MeasurableConnection, mcsPreviousTransferred *[]uint64, lbcGenerator func() mc.MeasurableConnection, debug bool) {
for i := uint64(0); i < toAdd; i++ {
//mcs[i] = &mc.LoadBearingUpload{Path: config.Urls.UploadUrl}
*mcs = append(*mcs, lbcGenerator())
*mcsPreviousTransferred = append(*mcsPreviousTransferred, 0)
- if !(*mcs)[len(*mcs)-1].Start(ctx) {
+ if !(*mcs)[len(*mcs)-1].Start(ctx, debug) {
fmt.Printf("Error starting %dth MC!\n", i)
return
}
}
}
-func saturate(ctx context.Context, saturated chan<- float64, lbcGenerator func() mc.MeasurableConnection, debug bool) {
+type SaturationResult struct {
+ RateBps float64
+ FlowCount uint64
+}
+
+func saturate(ctx context.Context, saturated chan<- SaturationResult, lbcGenerator func() mc.MeasurableConnection, debug bool) {
mcs := make([]mc.MeasurableConnection, 0)
mcsPreviousTransferred := make([]uint64, 0)
// Create 4 load bearing connections
- addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator)
+ addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug)
previousFlowIncreaseIteration := uint64(0)
previousMovingAverage := float64(0)
@@ -92,7 +97,7 @@ func saturate(ctx context.Context, saturated chan<- float64, lbcGenerator func()
movingAverage.AddMeasurement(float64(totalTransfer))
currentMovingAverage := movingAverage.CalculateAverage()
movingAverageAverage.AddMeasurement(currentMovingAverage)
- movingAverageDelta := utilities.PercentDifference(currentMovingAverage, previousMovingAverage)
+ movingAverageDelta := utilities.SignedPercentDifference(currentMovingAverage, previousMovingAverage)
previousMovingAverage = currentMovingAverage
if debug {
@@ -108,7 +113,7 @@ func saturate(ctx context.Context, saturated chan<- float64, lbcGenerator func()
if debug {
fmt.Printf("Adding flows because we are unsaturated and waited a while.\n")
}
- addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator)
+ addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug)
previousFlowIncreaseIteration = currentIteration
} else {
if debug {
@@ -127,13 +132,13 @@ func saturate(ctx context.Context, saturated chan<- float64, lbcGenerator func()
if debug {
fmt.Printf("New flows to add to try to increase our saturation!\n")
}
- addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator)
+ addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug)
previousFlowIncreaseIteration = currentIteration
}
}
}
- saturated <- movingAverage.CalculateAverage()
+ saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), FlowCount: uint64(len(mcs))}
}
func main() {
@@ -147,34 +152,35 @@ func main() {
configClient := &http.Client{}
resp, err := configClient.Get(configUrl)
if err != nil {
- fmt.Printf("Error connecting to %s: %v\n", configHostPort, err)
+ fmt.Fprintf(os.Stderr, "Error: Could not connect to configuration host %s: %v\n", configHostPort, err)
return
}
jsonConfig, err := ioutil.ReadAll(resp.Body)
if err != nil {
- fmt.Printf("Error reading content downloaded from %s: %v\n", configUrl, err)
+ fmt.Fprintf(os.Stderr, "Error: Could not read configuration content downloaded from %s: %v\n", configUrl, err)
return
}
var config Config
err = json.Unmarshal(jsonConfig, &config)
if err != nil {
- fmt.Printf("Error parsing configuration returned from %s: %v\n", configUrl, err)
+ fmt.Fprintf(os.Stderr, "Error: Could not parse configuration returned from %s: %v\n", configUrl, err)
return
}
+ // TODO: Make sure that all configuration values are present and accounted for!
+
if *debug {
fmt.Printf("Configuration: %s\n", &config)
}
operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background())
- uploadSaturationChannel := make(chan float64)
- downloadSaturationChannel := make(chan float64)
- timeoutChannel := make(chan interface{})
+ uploadSaturationChannel := make(chan SaturationResult)
+ downloadSaturationChannel := make(chan SaturationResult)
- _ = timeoutat.NewTimeoutAt(operatingCtx, time.Now().Add(timeoutDuration), timeoutChannel)
+ timeoutChannel := timeoutat.TimeoutAt(operatingCtx, time.Now().Add(timeoutDuration), *debug)
generate_lbd := func() mc.MeasurableConnection {
return &mc.LoadBearingDownload{Path: config.Urls.LargeUrl}
@@ -196,14 +202,14 @@ func main() {
{
download_saturated = true
if *debug {
- fmt.Printf("################## download is saturated (%f)!\n", toMBs(saturatedDownloadRate))
+ fmt.Printf("################# download is saturated (%fMBps, %d flows)!\n", toMBs(saturatedDownloadRate.RateBps), saturatedDownloadRate.FlowCount)
}
}
case saturatedUploadRate := <-uploadSaturationChannel:
{
upload_saturated = true
if *debug {
- fmt.Printf("################# upload is saturated (%f)!\n", toMBs(saturatedUploadRate))
+ fmt.Printf("################# upload is saturated (%fMBps, %d flows)!\n", toMBs(saturatedUploadRate.RateBps), saturatedUploadRate.FlowCount)
}
}
case <-timeoutChannel:
@@ -218,7 +224,7 @@ func main() {
if saturation_timeout {
cancelOperatingCtx()
- fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation in maximum time of %v.", timeoutDuration)
+ fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation in maximum time of %v\n.", timeoutDuration)
return
}
diff --git a/timeoutat/timeoutat.go b/timeoutat/timeoutat.go
index e77804e..74a3b70 100644
--- a/timeoutat/timeoutat.go
+++ b/timeoutat/timeoutat.go
@@ -6,25 +6,22 @@ import (
"time"
)
-type TimeoutAt struct {
- when time.Time
- response chan interface{}
-}
-
-func NewTimeoutAt(ctx context.Context, when time.Time, response chan interface{}) *TimeoutAt {
- timeoutAt := &TimeoutAt{when: when, response: response}
- timeoutAt.start(ctx)
- return timeoutAt
-}
-
-func (ta *TimeoutAt) start(ctx context.Context) {
- go func() {
- fmt.Printf("Timeout expected to end at %v\n", ta.when)
- select {
- case <-time.After(ta.when.Sub(time.Now())):
- case <-ctx.Done():
- }
- ta.response <- struct{}{}
- fmt.Printf("Timeout ended at %v\n", time.Now())
- }()
+func TimeoutAt(ctx context.Context, when time.Time, debug bool) (response chan interface{}) {
+ response = make(chan interface{})
+ go func(ctx context.Context) {
+ go func() {
+ if debug {
+ fmt.Printf("Timeout expected to end at %v\n", when)
+ }
+ select {
+ case <-time.After(when.Sub(time.Now())):
+ case <-ctx.Done():
+ }
+ response <- struct{}{}
+ if debug {
+ fmt.Printf("Timeout ended at %v\n", time.Now())
+ }
+ }()
+ }(ctx)
+ return
}
diff --git a/utilities/utilities.go b/utilities/utilities.go
index a372141..fd5c824 100644
--- a/utilities/utilities.go
+++ b/utilities/utilities.go
@@ -1,5 +1,10 @@
package utilities
-func PercentDifference(current float64, previous float64) (difference float64) {
+import "math"
+
+func SignedPercentDifference(current float64, previous float64) (difference float64) {
return ((current - previous) / (float64(current+previous) / 2.0)) * float64(100)
}
+func AbsPercentDifference(current float64, previous float64) (difference float64) {
+ return (math.Abs(current-previous) / (float64(current+previous) / 2.0)) * float64(100)
+}