diff options
| -rw-r--r-- | ma/ma.go | 5 | ||||
| -rw-r--r-- | mc/mc.go | 28 | ||||
| -rw-r--r-- | networkQuality.go | 42 | ||||
| -rw-r--r-- | timeoutat/timeoutat.go | 39 | ||||
| -rw-r--r-- | utilities/utilities.go | 7 |
5 files changed, 67 insertions, 54 deletions
@@ -4,6 +4,7 @@ import ( "math" "github.com/hawkinsw/goresponsiveness/saturating" + "github.com/hawkinsw/goresponsiveness/utilities" ) // Convert this to a Type Parameterized interface when they are available @@ -37,9 +38,7 @@ func (ma *MovingAverage) ConsistentWithin(limit float64) bool { previous := ma.instants[0] for i := 1; i < ma.intervals; i++ { current := ma.instants[i] - delta := math.Abs(current - previous) - percentChange := (float64(delta) / (float64(current+previous) / 2.0)) * float64(100) - + percentChange := utilities.AbsPercentDifference(current, previous) previous = current if math.Abs(percentChange) > limit { return false @@ -2,16 +2,16 @@ package mc import ( "context" + "fmt" "io" "io/ioutil" "net/http" - "fmt" ) var chunkSize int = 5000 type MeasurableConnection interface { - Start(context.Context) bool + Start(context.Context, bool) bool Transferred() uint64 } @@ -25,16 +25,18 @@ func (lbd *LoadBearingDownload) Transferred() uint64 { return lbd.downloaded } -func (lbd *LoadBearingDownload) Start(ctx context.Context) bool { +func (lbd *LoadBearingDownload) Start(ctx context.Context, debug bool) bool { lbd.downloaded = 0 lbd.client = &http.Client{} - fmt.Printf("Started a load bearing download.\n") - go doDownload(lbd.client, lbd.Path, &lbd.downloaded, ctx) + if debug { + fmt.Printf("Started a load bearing download.\n") + } + go doDownload(ctx, lbd.client, lbd.Path, &lbd.downloaded, debug) return true } -func doDownload(client *http.Client, path string, count *uint64, ctx context.Context) { +func doDownload(ctx context.Context, client *http.Client, path string, count *uint64, debug bool) { get, err := client.Get(path) if err != nil { return @@ -47,7 +49,9 @@ func doDownload(client *http.Client, path string, count *uint64, ctx context.Con *count += uint64(n) } get.Body.Close() - fmt.Printf("Ending a load-bearing download.\n"); + if debug { + fmt.Printf("Ending a load-bearing download.\n") + } } type LoadBearingUpload struct { @@ -76,19 +80,21 @@ func (s *syntheticCountingReader) Read(p []byte) (n int, err error) { return } -func doUpload(client *http.Client, path string, count *uint64, ctx context.Context) bool { +func doUpload(ctx context.Context, client *http.Client, path string, count *uint64, debug bool) bool { *count = 0 s := &syntheticCountingReader{n: count, ctx: ctx} resp, _ := client.Post(path, "application/octet-stream", s) resp.Body.Close() - fmt.Printf("Ending a load-bearing upload.\n") + if debug { + fmt.Printf("Ending a load-bearing upload.\n") + } return true } -func (lbu *LoadBearingUpload) Start(ctx context.Context) bool { +func (lbu *LoadBearingUpload) Start(ctx context.Context, debug bool) bool { lbu.uploaded = 0 lbu.client = &http.Client{} fmt.Printf("Started a load bearing upload.\n") - go doUpload(lbu.client, lbu.Path, &lbu.uploaded, ctx) + go doUpload(ctx, lbu.client, lbu.Path, &lbu.uploaded, debug) return true } diff --git a/networkQuality.go b/networkQuality.go index 83346ac..ae8e25b 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -45,24 +45,29 @@ var ( timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.") ) -func addFlows(ctx context.Context, toAdd uint64, mcs *[]mc.MeasurableConnection, mcsPreviousTransferred *[]uint64, lbcGenerator func() mc.MeasurableConnection) { +func addFlows(ctx context.Context, toAdd uint64, mcs *[]mc.MeasurableConnection, mcsPreviousTransferred *[]uint64, lbcGenerator func() mc.MeasurableConnection, debug bool) { for i := uint64(0); i < toAdd; i++ { //mcs[i] = &mc.LoadBearingUpload{Path: config.Urls.UploadUrl} *mcs = append(*mcs, lbcGenerator()) *mcsPreviousTransferred = append(*mcsPreviousTransferred, 0) - if !(*mcs)[len(*mcs)-1].Start(ctx) { + if !(*mcs)[len(*mcs)-1].Start(ctx, debug) { fmt.Printf("Error starting %dth MC!\n", i) return } } } -func saturate(ctx context.Context, saturated chan<- float64, lbcGenerator func() mc.MeasurableConnection, debug bool) { +type SaturationResult struct { + RateBps float64 + FlowCount uint64 +} + +func saturate(ctx context.Context, saturated chan<- SaturationResult, lbcGenerator func() mc.MeasurableConnection, debug bool) { mcs := make([]mc.MeasurableConnection, 0) mcsPreviousTransferred := make([]uint64, 0) // Create 4 load bearing connections - addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator) + addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug) previousFlowIncreaseIteration := uint64(0) previousMovingAverage := float64(0) @@ -92,7 +97,7 @@ func saturate(ctx context.Context, saturated chan<- float64, lbcGenerator func() movingAverage.AddMeasurement(float64(totalTransfer)) currentMovingAverage := movingAverage.CalculateAverage() movingAverageAverage.AddMeasurement(currentMovingAverage) - movingAverageDelta := utilities.PercentDifference(currentMovingAverage, previousMovingAverage) + movingAverageDelta := utilities.SignedPercentDifference(currentMovingAverage, previousMovingAverage) previousMovingAverage = currentMovingAverage if debug { @@ -108,7 +113,7 @@ func saturate(ctx context.Context, saturated chan<- float64, lbcGenerator func() if debug { fmt.Printf("Adding flows because we are unsaturated and waited a while.\n") } - addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator) + addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug) previousFlowIncreaseIteration = currentIteration } else { if debug { @@ -127,13 +132,13 @@ func saturate(ctx context.Context, saturated chan<- float64, lbcGenerator func() if debug { fmt.Printf("New flows to add to try to increase our saturation!\n") } - addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator) + addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug) previousFlowIncreaseIteration = currentIteration } } } - saturated <- movingAverage.CalculateAverage() + saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), FlowCount: uint64(len(mcs))} } func main() { @@ -147,34 +152,35 @@ func main() { configClient := &http.Client{} resp, err := configClient.Get(configUrl) if err != nil { - fmt.Printf("Error connecting to %s: %v\n", configHostPort, err) + fmt.Fprintf(os.Stderr, "Error: Could not connect to configuration host %s: %v\n", configHostPort, err) return } jsonConfig, err := ioutil.ReadAll(resp.Body) if err != nil { - fmt.Printf("Error reading content downloaded from %s: %v\n", configUrl, err) + fmt.Fprintf(os.Stderr, "Error: Could not read configuration content downloaded from %s: %v\n", configUrl, err) return } var config Config err = json.Unmarshal(jsonConfig, &config) if err != nil { - fmt.Printf("Error parsing configuration returned from %s: %v\n", configUrl, err) + fmt.Fprintf(os.Stderr, "Error: Could not parse configuration returned from %s: %v\n", configUrl, err) return } + // TODO: Make sure that all configuration values are present and accounted for! + if *debug { fmt.Printf("Configuration: %s\n", &config) } operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background()) - uploadSaturationChannel := make(chan float64) - downloadSaturationChannel := make(chan float64) - timeoutChannel := make(chan interface{}) + uploadSaturationChannel := make(chan SaturationResult) + downloadSaturationChannel := make(chan SaturationResult) - _ = timeoutat.NewTimeoutAt(operatingCtx, time.Now().Add(timeoutDuration), timeoutChannel) + timeoutChannel := timeoutat.TimeoutAt(operatingCtx, time.Now().Add(timeoutDuration), *debug) generate_lbd := func() mc.MeasurableConnection { return &mc.LoadBearingDownload{Path: config.Urls.LargeUrl} @@ -196,14 +202,14 @@ func main() { { download_saturated = true if *debug { - fmt.Printf("################## download is saturated (%f)!\n", toMBs(saturatedDownloadRate)) + fmt.Printf("################# download is saturated (%fMBps, %d flows)!\n", toMBs(saturatedDownloadRate.RateBps), saturatedDownloadRate.FlowCount) } } case saturatedUploadRate := <-uploadSaturationChannel: { upload_saturated = true if *debug { - fmt.Printf("################# upload is saturated (%f)!\n", toMBs(saturatedUploadRate)) + fmt.Printf("################# upload is saturated (%fMBps, %d flows)!\n", toMBs(saturatedUploadRate.RateBps), saturatedUploadRate.FlowCount) } } case <-timeoutChannel: @@ -218,7 +224,7 @@ func main() { if saturation_timeout { cancelOperatingCtx() - fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation in maximum time of %v.", timeoutDuration) + fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation in maximum time of %v\n.", timeoutDuration) return } diff --git a/timeoutat/timeoutat.go b/timeoutat/timeoutat.go index e77804e..74a3b70 100644 --- a/timeoutat/timeoutat.go +++ b/timeoutat/timeoutat.go @@ -6,25 +6,22 @@ import ( "time" ) -type TimeoutAt struct { - when time.Time - response chan interface{} -} - -func NewTimeoutAt(ctx context.Context, when time.Time, response chan interface{}) *TimeoutAt { - timeoutAt := &TimeoutAt{when: when, response: response} - timeoutAt.start(ctx) - return timeoutAt -} - -func (ta *TimeoutAt) start(ctx context.Context) { - go func() { - fmt.Printf("Timeout expected to end at %v\n", ta.when) - select { - case <-time.After(ta.when.Sub(time.Now())): - case <-ctx.Done(): - } - ta.response <- struct{}{} - fmt.Printf("Timeout ended at %v\n", time.Now()) - }() +func TimeoutAt(ctx context.Context, when time.Time, debug bool) (response chan interface{}) { + response = make(chan interface{}) + go func(ctx context.Context) { + go func() { + if debug { + fmt.Printf("Timeout expected to end at %v\n", when) + } + select { + case <-time.After(when.Sub(time.Now())): + case <-ctx.Done(): + } + response <- struct{}{} + if debug { + fmt.Printf("Timeout ended at %v\n", time.Now()) + } + }() + }(ctx) + return } diff --git a/utilities/utilities.go b/utilities/utilities.go index a372141..fd5c824 100644 --- a/utilities/utilities.go +++ b/utilities/utilities.go @@ -1,5 +1,10 @@ package utilities -func PercentDifference(current float64, previous float64) (difference float64) { +import "math" + +func SignedPercentDifference(current float64, previous float64) (difference float64) { return ((current - previous) / (float64(current+previous) / 2.0)) * float64(100) } +func AbsPercentDifference(current float64, previous float64) (difference float64) { + return (math.Abs(current-previous) / (float64(current+previous) / 2.0)) * float64(100) +} |
