diff options
| author | Will Hawkins <[email protected]> | 2021-12-10 22:14:26 -0500 |
|---|---|---|
| committer | Will Hawkins <[email protected]> | 2021-12-10 22:14:26 -0500 |
| commit | 339a162877a78641a45338983de2192353fe7a5a (patch) | |
| tree | 84ea105cd0533e0cc7c67904d78d495845d008b1 | |
| parent | 5927d8aca8df26bf770ca061cdd3e00794847b27 (diff) | |
More work.
| -rw-r--r-- | ma/ma.go | 32 | ||||
| -rw-r--r-- | mc/mc.go | 61 | ||||
| -rw-r--r-- | networkQuality.go | 92 | ||||
| -rw-r--r-- | saturating/saturating.go | 27 |
4 files changed, 179 insertions, 33 deletions
diff --git a/ma/ma.go b/ma/ma.go new file mode 100644 index 0000000..9d52d97 --- /dev/null +++ b/ma/ma.go @@ -0,0 +1,32 @@ +package ma + +import ( + "github.com/hawkinsw/goresponsiveness/saturating" +) + +// Convert this to a Type Parameterized interface when they are available +// in Go (1.18). +type MovingAverage struct { + intervals int + instants []uint64 + index int + divisor *saturating.SaturatingInt +} + +func NewMovingAverage(intervals int) *MovingAverage { + return &MovingAverage{instants: make([]uint64, intervals), intervals: intervals, divisor: saturating.NewSaturatingInt(intervals)} +} + +func (ma *MovingAverage) AddMeasurement(measurement uint64) { + ma.instants[ma.index] = measurement + ma.divisor.Add(1) + ma.index = (ma.index + 1) % ma.intervals +} + +func (ma *MovingAverage) CalculateAverage() float64 { + total := uint64(0) + for i := 0; i < ma.intervals; i++ { + total += ma.instants[i] + } + return float64(total) / float64(ma.divisor.Value()) +} @@ -2,7 +2,6 @@ package mc import ( "context" - "fmt" "io" "io/ioutil" "net/http" @@ -10,25 +9,20 @@ import ( type MeasurableConnection interface { Start(context.Context) bool - Stop() bool - Downloaded() uint64 + Transferred() uint64 } -type LoadBearingConnection struct { +type LoadBearingDownload struct { Path string - ctx context.Context - cancel context.CancelFunc downloaded uint64 client *http.Client } -func (lbc *LoadBearingConnection) Downloaded() uint64 { +func (lbc *LoadBearingDownload) Transferred() uint64 { return lbc.downloaded } -func (lbc *LoadBearingConnection) Start(ctx context.Context) bool { - fmt.Printf("Starting a LBC ...") - lbc.ctx, lbc.cancel = context.WithCancel(ctx) +func (lbc *LoadBearingDownload) Start(ctx context.Context) bool { lbc.downloaded = 0 lbc.client = &http.Client{} get, err := lbc.client.Get(lbc.Path) @@ -36,25 +30,52 @@ func (lbc *LoadBearingConnection) Start(ctx context.Context) bool { if err != nil { return false } - go doDownload(get, &lbc.downloaded, lbc.ctx) - return true -} - -func (lbc *LoadBearingConnection) Stop() bool { - lbc.cancel() + go doDownload(get, &lbc.downloaded, ctx) return true } func doDownload(get *http.Response, count *uint64, ctx context.Context) { for ctx.Err() == nil { - n, err := io.CopyN(ioutil.Discard, get.Body, 1024*1024) + n, err := io.CopyN(ioutil.Discard, get.Body, 100) if err != nil { - fmt.Printf("Done reading!\n") break } - fmt.Printf("Read some bytes: %d\n", n) *count += uint64(n) } - fmt.Printf("Cancelling my download.\n") get.Body.Close() } + +type LoadBearingUpload struct { + Path string + uploaded uint64 + client *http.Client +} + +func (lbu *LoadBearingUpload) Transferred() uint64 { + return lbu.uploaded +} + +type syntheticCountingReader struct { + n *uint64 + ctx context.Context +} + +func (s *syntheticCountingReader) Read(p []byte) (n int, err error) { + if s.ctx.Err() != nil { + return 0, io.EOF + } + err = nil + n = len(p) + *s.n += uint64(n) + return +} +func (lbu *LoadBearingUpload) Start(ctx context.Context) bool { + lbu.uploaded = 0 + lbu.client = &http.Client{} + s := &syntheticCountingReader{n: &lbu.uploaded, ctx: ctx} + go func() { + resp, _ := lbu.client.Post(lbu.Path, "application/octet-stream", s) + resp.Body.Close() + }() + return true +} diff --git a/networkQuality.go b/networkQuality.go index df58bbf..f254707 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -5,6 +5,7 @@ import ( "encoding/json" "flag" "fmt" + "github.com/hawkinsw/goresponsiveness/ma" "github.com/hawkinsw/goresponsiveness/mc" _ "io" "io/ioutil" @@ -28,6 +29,10 @@ func (c *Config) String() string { return fmt.Sprintf("Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s", c.Version, c.Urls.SmallUrl, c.Urls.LargeUrl, c.Urls.UploadUrl) } +func toMBs(bytes float64) float64 { + return float64(bytes) / float64(1024*1024) +} + var ( // Variables to hold CLI arguments. configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.") @@ -35,6 +40,64 @@ var ( debug = flag.Bool("debug", false, "Enable debugging.") ) +func saturate(ctx context.Context, saturated chan<- interface{}, lbcGenerator func() mc.MeasurableConnection) { + mcs := make([]mc.MeasurableConnection, 4) + mcsPreviousTransferred := make([]uint64, 4) + for i := range mcs { + //mcs[i] = &mc.LoadBearingUpload{Path: config.Urls.UploadUrl} + mcs[i] = lbcGenerator() + mcsPreviousTransferred[i] = 0 + if !mcs[i].Start(ctx) { + fmt.Printf("Error starting %dth MC!\n", i) + return + } + } + + previousMovingAverage := float64(0) + movingAverage := ma.NewMovingAverage(4) + //lastFlowIncrease := uint64(0) + for currentIteration := uint64(0); true; currentIteration++ { + + // If we are cancelled, then stop. + if ctx.Err() != nil { + return + } + + time.Sleep(time.Second) + + // 1. Calculate the most recent goodput. + totalTransfer := uint64(0) + for i := range mcs { + previousTransferred := mcsPreviousTransferred[i] + currentTransferred := mcs[i].Transferred() + totalTransfer += (currentTransferred - previousTransferred) + mcsPreviousTransferred[i] = currentTransferred + } + + // 2. Calculate the delta + movingAverage.AddMeasurement(totalTransfer) + currentMovingAverage := movingAverage.CalculateAverage() + movingAverageDelta := ((currentMovingAverage - previousMovingAverage) / (float64(currentMovingAverage+previousMovingAverage) / 2.0)) * float64(100) + previousMovingAverage = currentMovingAverage + + fmt.Printf("Instantaneous goodput: %f MB.\n", toMBs(float64(totalTransfer))) + fmt.Printf("Moving average: %f MB.\n", toMBs(currentMovingAverage)) + fmt.Printf("Moving average delta: %f.\n", movingAverageDelta) + + + // 3. Are we stable or not? + if currentIteration != 0 && movingAverageDelta < float64(5) { + // We are stable! + fmt.Printf("Stable\n") + break + } else { + // We are unstable! + fmt.Printf("Unstable\n") + } + } + saturated <- struct{}{} +} + func main() { flag.Parse() @@ -65,24 +128,27 @@ func main() { fmt.Printf("Configuration: %s\n", &config) } - time.Sleep(4 * time.Second) + operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background()) - mcs := make([]*mc.LoadBearingConnection, 4) - for i := range mcs { - mcs[i] = &mc.LoadBearingConnection{Path: config.Urls.LargeUrl} - mcCtx := context.Background() - if !mcs[i].Start(mcCtx) { - fmt.Printf("Error starting %dth MC!\n", i) - return - } + uploadSaturationChannel := make(chan interface{}) + + g := func() mc.MeasurableConnection { + return &mc.LoadBearingDownload{Path: config.Urls.LargeUrl} } - time.Sleep(4 * time.Second) - for i := range mcs { - mcs[i].Stop() - fmt.Printf("mc[%d] read: %d\n", i, mcs[i].Downloaded()) + go saturate(operatingCtx, uploadSaturationChannel, g) + + select { + case <-uploadSaturationChannel: + { + fmt.Printf("upload is saturated!\n") + } } + time.Sleep(10 * time.Second) + + cancelOperatingCtx() + time.Sleep(4 * time.Second) } diff --git a/saturating/saturating.go b/saturating/saturating.go new file mode 100644 index 0000000..10a0c87 --- /dev/null +++ b/saturating/saturating.go @@ -0,0 +1,27 @@ +package saturating + +type SaturatingInt struct { + max int + value int + saturated bool +} + +func NewSaturatingInt(max int) *SaturatingInt { + return &SaturatingInt{max: max, value: 0, saturated: false} +} + +func (s *SaturatingInt) Value() int { + if s.saturated { + return s.max + } + return s.value +} + +func (s *SaturatingInt) Add(operand int) { + if !s.saturated { + s.value += operand + if s.value >= s.max { + s.saturated = true + } + } +} |
