summaryrefslogtreecommitdiff
path: root/networkQuality.go
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2021-12-10 22:14:26 -0500
committerWill Hawkins <[email protected]>2021-12-10 22:14:26 -0500
commit339a162877a78641a45338983de2192353fe7a5a (patch)
tree84ea105cd0533e0cc7c67904d78d495845d008b1 /networkQuality.go
parent5927d8aca8df26bf770ca061cdd3e00794847b27 (diff)
More work.
Diffstat (limited to 'networkQuality.go')
-rw-r--r--networkQuality.go92
1 files changed, 79 insertions, 13 deletions
diff --git a/networkQuality.go b/networkQuality.go
index df58bbf..f254707 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -5,6 +5,7 @@ import (
"encoding/json"
"flag"
"fmt"
+ "github.com/hawkinsw/goresponsiveness/ma"
"github.com/hawkinsw/goresponsiveness/mc"
_ "io"
"io/ioutil"
@@ -28,6 +29,10 @@ func (c *Config) String() string {
return fmt.Sprintf("Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s", c.Version, c.Urls.SmallUrl, c.Urls.LargeUrl, c.Urls.UploadUrl)
}
+func toMBs(bytes float64) float64 {
+ return float64(bytes) / float64(1024*1024)
+}
+
var (
// Variables to hold CLI arguments.
configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.")
@@ -35,6 +40,64 @@ var (
debug = flag.Bool("debug", false, "Enable debugging.")
)
+func saturate(ctx context.Context, saturated chan<- interface{}, lbcGenerator func() mc.MeasurableConnection) {
+ mcs := make([]mc.MeasurableConnection, 4)
+ mcsPreviousTransferred := make([]uint64, 4)
+ for i := range mcs {
+ //mcs[i] = &mc.LoadBearingUpload{Path: config.Urls.UploadUrl}
+ mcs[i] = lbcGenerator()
+ mcsPreviousTransferred[i] = 0
+ if !mcs[i].Start(ctx) {
+ fmt.Printf("Error starting %dth MC!\n", i)
+ return
+ }
+ }
+
+ previousMovingAverage := float64(0)
+ movingAverage := ma.NewMovingAverage(4)
+ //lastFlowIncrease := uint64(0)
+ for currentIteration := uint64(0); true; currentIteration++ {
+
+ // If we are cancelled, then stop.
+ if ctx.Err() != nil {
+ return
+ }
+
+ time.Sleep(time.Second)
+
+ // 1. Calculate the most recent goodput.
+ totalTransfer := uint64(0)
+ for i := range mcs {
+ previousTransferred := mcsPreviousTransferred[i]
+ currentTransferred := mcs[i].Transferred()
+ totalTransfer += (currentTransferred - previousTransferred)
+ mcsPreviousTransferred[i] = currentTransferred
+ }
+
+ // 2. Calculate the delta
+ movingAverage.AddMeasurement(totalTransfer)
+ currentMovingAverage := movingAverage.CalculateAverage()
+ movingAverageDelta := ((currentMovingAverage - previousMovingAverage) / (float64(currentMovingAverage+previousMovingAverage) / 2.0)) * float64(100)
+ previousMovingAverage = currentMovingAverage
+
+ fmt.Printf("Instantaneous goodput: %f MB.\n", toMBs(float64(totalTransfer)))
+ fmt.Printf("Moving average: %f MB.\n", toMBs(currentMovingAverage))
+ fmt.Printf("Moving average delta: %f.\n", movingAverageDelta)
+
+
+ // 3. Are we stable or not?
+ if currentIteration != 0 && movingAverageDelta < float64(5) {
+ // We are stable!
+ fmt.Printf("Stable\n")
+ break
+ } else {
+ // We are unstable!
+ fmt.Printf("Unstable\n")
+ }
+ }
+ saturated <- struct{}{}
+}
+
func main() {
flag.Parse()
@@ -65,24 +128,27 @@ func main() {
fmt.Printf("Configuration: %s\n", &config)
}
- time.Sleep(4 * time.Second)
+ operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background())
- mcs := make([]*mc.LoadBearingConnection, 4)
- for i := range mcs {
- mcs[i] = &mc.LoadBearingConnection{Path: config.Urls.LargeUrl}
- mcCtx := context.Background()
- if !mcs[i].Start(mcCtx) {
- fmt.Printf("Error starting %dth MC!\n", i)
- return
- }
+ uploadSaturationChannel := make(chan interface{})
+
+ g := func() mc.MeasurableConnection {
+ return &mc.LoadBearingDownload{Path: config.Urls.LargeUrl}
}
- time.Sleep(4 * time.Second)
- for i := range mcs {
- mcs[i].Stop()
- fmt.Printf("mc[%d] read: %d\n", i, mcs[i].Downloaded())
+ go saturate(operatingCtx, uploadSaturationChannel, g)
+
+ select {
+ case <-uploadSaturationChannel:
+ {
+ fmt.Printf("upload is saturated!\n")
+ }
}
+ time.Sleep(10 * time.Second)
+
+ cancelOperatingCtx()
+
time.Sleep(4 * time.Second)
}