summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2021-12-10 22:14:26 -0500
committerWill Hawkins <[email protected]>2021-12-10 22:14:26 -0500
commit339a162877a78641a45338983de2192353fe7a5a (patch)
tree84ea105cd0533e0cc7c67904d78d495845d008b1
parent5927d8aca8df26bf770ca061cdd3e00794847b27 (diff)
More work.
-rw-r--r--ma/ma.go32
-rw-r--r--mc/mc.go61
-rw-r--r--networkQuality.go92
-rw-r--r--saturating/saturating.go27
4 files changed, 179 insertions, 33 deletions
diff --git a/ma/ma.go b/ma/ma.go
new file mode 100644
index 0000000..9d52d97
--- /dev/null
+++ b/ma/ma.go
@@ -0,0 +1,32 @@
+package ma
+
+import (
+ "github.com/hawkinsw/goresponsiveness/saturating"
+)
+
+// Convert this to a Type Parameterized interface when they are available
+// in Go (1.18).
+type MovingAverage struct {
+ intervals int
+ instants []uint64
+ index int
+ divisor *saturating.SaturatingInt
+}
+
+func NewMovingAverage(intervals int) *MovingAverage {
+ return &MovingAverage{instants: make([]uint64, intervals), intervals: intervals, divisor: saturating.NewSaturatingInt(intervals)}
+}
+
+func (ma *MovingAverage) AddMeasurement(measurement uint64) {
+ ma.instants[ma.index] = measurement
+ ma.divisor.Add(1)
+ ma.index = (ma.index + 1) % ma.intervals
+}
+
+func (ma *MovingAverage) CalculateAverage() float64 {
+ total := uint64(0)
+ for i := 0; i < ma.intervals; i++ {
+ total += ma.instants[i]
+ }
+ return float64(total) / float64(ma.divisor.Value())
+}
diff --git a/mc/mc.go b/mc/mc.go
index 4d405de..0b1d69b 100644
--- a/mc/mc.go
+++ b/mc/mc.go
@@ -2,7 +2,6 @@ package mc
import (
"context"
- "fmt"
"io"
"io/ioutil"
"net/http"
@@ -10,25 +9,20 @@ import (
type MeasurableConnection interface {
Start(context.Context) bool
- Stop() bool
- Downloaded() uint64
+ Transferred() uint64
}
-type LoadBearingConnection struct {
+type LoadBearingDownload struct {
Path string
- ctx context.Context
- cancel context.CancelFunc
downloaded uint64
client *http.Client
}
-func (lbc *LoadBearingConnection) Downloaded() uint64 {
+func (lbc *LoadBearingDownload) Transferred() uint64 {
return lbc.downloaded
}
-func (lbc *LoadBearingConnection) Start(ctx context.Context) bool {
- fmt.Printf("Starting a LBC ...")
- lbc.ctx, lbc.cancel = context.WithCancel(ctx)
+func (lbc *LoadBearingDownload) Start(ctx context.Context) bool {
lbc.downloaded = 0
lbc.client = &http.Client{}
get, err := lbc.client.Get(lbc.Path)
@@ -36,25 +30,52 @@ func (lbc *LoadBearingConnection) Start(ctx context.Context) bool {
if err != nil {
return false
}
- go doDownload(get, &lbc.downloaded, lbc.ctx)
- return true
-}
-
-func (lbc *LoadBearingConnection) Stop() bool {
- lbc.cancel()
+ go doDownload(get, &lbc.downloaded, ctx)
return true
}
func doDownload(get *http.Response, count *uint64, ctx context.Context) {
for ctx.Err() == nil {
- n, err := io.CopyN(ioutil.Discard, get.Body, 1024*1024)
+ n, err := io.CopyN(ioutil.Discard, get.Body, 100)
if err != nil {
- fmt.Printf("Done reading!\n")
break
}
- fmt.Printf("Read some bytes: %d\n", n)
*count += uint64(n)
}
- fmt.Printf("Cancelling my download.\n")
get.Body.Close()
}
+
+type LoadBearingUpload struct {
+ Path string
+ uploaded uint64
+ client *http.Client
+}
+
+func (lbu *LoadBearingUpload) Transferred() uint64 {
+ return lbu.uploaded
+}
+
+type syntheticCountingReader struct {
+ n *uint64
+ ctx context.Context
+}
+
+func (s *syntheticCountingReader) Read(p []byte) (n int, err error) {
+ if s.ctx.Err() != nil {
+ return 0, io.EOF
+ }
+ err = nil
+ n = len(p)
+ *s.n += uint64(n)
+ return
+}
+func (lbu *LoadBearingUpload) Start(ctx context.Context) bool {
+ lbu.uploaded = 0
+ lbu.client = &http.Client{}
+ s := &syntheticCountingReader{n: &lbu.uploaded, ctx: ctx}
+ go func() {
+ resp, _ := lbu.client.Post(lbu.Path, "application/octet-stream", s)
+ resp.Body.Close()
+ }()
+ return true
+}
diff --git a/networkQuality.go b/networkQuality.go
index df58bbf..f254707 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -5,6 +5,7 @@ import (
"encoding/json"
"flag"
"fmt"
+ "github.com/hawkinsw/goresponsiveness/ma"
"github.com/hawkinsw/goresponsiveness/mc"
_ "io"
"io/ioutil"
@@ -28,6 +29,10 @@ func (c *Config) String() string {
return fmt.Sprintf("Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s", c.Version, c.Urls.SmallUrl, c.Urls.LargeUrl, c.Urls.UploadUrl)
}
+func toMBs(bytes float64) float64 {
+ return float64(bytes) / float64(1024*1024)
+}
+
var (
// Variables to hold CLI arguments.
configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.")
@@ -35,6 +40,64 @@ var (
debug = flag.Bool("debug", false, "Enable debugging.")
)
+func saturate(ctx context.Context, saturated chan<- interface{}, lbcGenerator func() mc.MeasurableConnection) {
+ mcs := make([]mc.MeasurableConnection, 4)
+ mcsPreviousTransferred := make([]uint64, 4)
+ for i := range mcs {
+ //mcs[i] = &mc.LoadBearingUpload{Path: config.Urls.UploadUrl}
+ mcs[i] = lbcGenerator()
+ mcsPreviousTransferred[i] = 0
+ if !mcs[i].Start(ctx) {
+ fmt.Printf("Error starting %dth MC!\n", i)
+ return
+ }
+ }
+
+ previousMovingAverage := float64(0)
+ movingAverage := ma.NewMovingAverage(4)
+ //lastFlowIncrease := uint64(0)
+ for currentIteration := uint64(0); true; currentIteration++ {
+
+ // If we are cancelled, then stop.
+ if ctx.Err() != nil {
+ return
+ }
+
+ time.Sleep(time.Second)
+
+ // 1. Calculate the most recent goodput.
+ totalTransfer := uint64(0)
+ for i := range mcs {
+ previousTransferred := mcsPreviousTransferred[i]
+ currentTransferred := mcs[i].Transferred()
+ totalTransfer += (currentTransferred - previousTransferred)
+ mcsPreviousTransferred[i] = currentTransferred
+ }
+
+ // 2. Calculate the delta
+ movingAverage.AddMeasurement(totalTransfer)
+ currentMovingAverage := movingAverage.CalculateAverage()
+ movingAverageDelta := ((currentMovingAverage - previousMovingAverage) / (float64(currentMovingAverage+previousMovingAverage) / 2.0)) * float64(100)
+ previousMovingAverage = currentMovingAverage
+
+ fmt.Printf("Instantaneous goodput: %f MB.\n", toMBs(float64(totalTransfer)))
+ fmt.Printf("Moving average: %f MB.\n", toMBs(currentMovingAverage))
+ fmt.Printf("Moving average delta: %f.\n", movingAverageDelta)
+
+
+ // 3. Are we stable or not?
+ if currentIteration != 0 && movingAverageDelta < float64(5) {
+ // We are stable!
+ fmt.Printf("Stable\n")
+ break
+ } else {
+ // We are unstable!
+ fmt.Printf("Unstable\n")
+ }
+ }
+ saturated <- struct{}{}
+}
+
func main() {
flag.Parse()
@@ -65,24 +128,27 @@ func main() {
fmt.Printf("Configuration: %s\n", &config)
}
- time.Sleep(4 * time.Second)
+ operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background())
- mcs := make([]*mc.LoadBearingConnection, 4)
- for i := range mcs {
- mcs[i] = &mc.LoadBearingConnection{Path: config.Urls.LargeUrl}
- mcCtx := context.Background()
- if !mcs[i].Start(mcCtx) {
- fmt.Printf("Error starting %dth MC!\n", i)
- return
- }
+ uploadSaturationChannel := make(chan interface{})
+
+ g := func() mc.MeasurableConnection {
+ return &mc.LoadBearingDownload{Path: config.Urls.LargeUrl}
}
- time.Sleep(4 * time.Second)
- for i := range mcs {
- mcs[i].Stop()
- fmt.Printf("mc[%d] read: %d\n", i, mcs[i].Downloaded())
+ go saturate(operatingCtx, uploadSaturationChannel, g)
+
+ select {
+ case <-uploadSaturationChannel:
+ {
+ fmt.Printf("upload is saturated!\n")
+ }
}
+ time.Sleep(10 * time.Second)
+
+ cancelOperatingCtx()
+
time.Sleep(4 * time.Second)
}
diff --git a/saturating/saturating.go b/saturating/saturating.go
new file mode 100644
index 0000000..10a0c87
--- /dev/null
+++ b/saturating/saturating.go
@@ -0,0 +1,27 @@
+package saturating
+
+type SaturatingInt struct {
+ max int
+ value int
+ saturated bool
+}
+
+func NewSaturatingInt(max int) *SaturatingInt {
+ return &SaturatingInt{max: max, value: 0, saturated: false}
+}
+
+func (s *SaturatingInt) Value() int {
+ if s.saturated {
+ return s.max
+ }
+ return s.value
+}
+
+func (s *SaturatingInt) Add(operand int) {
+ if !s.saturated {
+ s.value += operand
+ if s.value >= s.max {
+ s.saturated = true
+ }
+ }
+}