summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lbc/lbc.go69
-rw-r--r--ma/ma.go8
-rw-r--r--networkQuality.go205
3 files changed, 192 insertions, 90 deletions
diff --git a/lbc/lbc.go b/lbc/lbc.go
index 82fe550..d1cfdc8 100644
--- a/lbc/lbc.go
+++ b/lbc/lbc.go
@@ -6,24 +6,32 @@ import (
"io"
"io/ioutil"
"net/http"
+ "sync/atomic"
)
-var chunkSize int = 5000
+var chunkSize int = 50
type LoadBearingConnection interface {
Start(context.Context, bool) bool
Transferred() uint64
Client() *http.Client
+ IsValid() bool
}
type LoadBearingConnectionDownload struct {
Path string
downloaded uint64
client *http.Client
+ debug bool
+ valid bool
}
func (lbd *LoadBearingConnectionDownload) Transferred() uint64 {
- return lbd.downloaded
+ transferred := atomic.LoadUint64(&lbd.downloaded)
+ if lbd.debug {
+ fmt.Printf("download: Transferred: %v\n", transferred)
+ }
+ return transferred
}
func (lbd *LoadBearingConnectionDownload) Client() *http.Client {
@@ -33,6 +41,8 @@ func (lbd *LoadBearingConnectionDownload) Client() *http.Client {
func (lbd *LoadBearingConnectionDownload) Start(ctx context.Context, debug bool) bool {
lbd.downloaded = 0
lbd.client = &http.Client{}
+ lbd.debug = debug
+ lbd.valid = true
// At some point this might be useful: It is a snippet of code that will enable
// logging of per-session TLS key material in order to make debugging easier in
@@ -51,26 +61,31 @@ func (lbd *LoadBearingConnectionDownload) Start(ctx context.Context, debug bool)
*/
if debug {
- fmt.Printf("Started a load bearing download.\n")
+ fmt.Printf("Started a load-bearing download.\n")
}
- go doDownload(ctx, lbd.client, lbd.Path, &lbd.downloaded, debug)
+ go lbd.doDownload(ctx)
return true
}
+func (lbd *LoadBearingConnectionDownload) IsValid() bool {
+ return lbd.valid
+}
-func doDownload(ctx context.Context, client *http.Client, path string, count *uint64, debug bool) {
- get, err := client.Get(path)
+func (lbd *LoadBearingConnectionDownload) doDownload(ctx context.Context) {
+ get, err := lbd.client.Get(lbd.Path)
if err != nil {
+ lbd.valid = false
return
}
for ctx.Err() == nil {
n, err := io.CopyN(ioutil.Discard, get.Body, int64(chunkSize))
if err != nil {
+ lbd.valid = false
break
}
- *count += uint64(n)
+ atomic.AddUint64(&lbd.downloaded, uint64(n))
}
get.Body.Close()
- if debug {
+ if lbd.debug {
fmt.Printf("Ending a load-bearing download.\n")
}
}
@@ -79,14 +94,24 @@ type LoadBearingConnectionUpload struct {
Path string
uploaded uint64
client *http.Client
+ debug bool
+ valid bool
}
func (lbu *LoadBearingConnectionUpload) Transferred() uint64 {
- return lbu.uploaded
+ transferred := atomic.LoadUint64(&lbu.uploaded)
+ if lbu.debug {
+ fmt.Printf("upload: Transferred: %v\n", transferred)
+ }
+ return transferred
}
-func (lbd *LoadBearingConnectionUpload) Client() *http.Client {
- return lbd.client
+func (lbu *LoadBearingConnectionUpload) Client() *http.Client {
+ return lbu.client
+}
+
+func (lbu *LoadBearingConnectionUpload) IsValid() bool {
+ return lbu.valid
}
type syntheticCountingReader struct {
@@ -101,16 +126,17 @@ func (s *syntheticCountingReader) Read(p []byte) (n int, err error) {
err = nil
n = len(p)
n = chunkSize
- *s.n += uint64(n)
+ atomic.AddUint64(s.n, uint64(n))
return
}
-func doUpload(ctx context.Context, client *http.Client, path string, count *uint64, debug bool) bool {
- *count = 0
- s := &syntheticCountingReader{n: count, ctx: ctx}
- resp, _ := client.Post(path, "application/octet-stream", s)
+func (lbu *LoadBearingConnectionUpload) doUpload(ctx context.Context) bool {
+ lbu.uploaded = 0
+ s := &syntheticCountingReader{n: &lbu.uploaded, ctx: ctx}
+ resp, _ := lbu.client.Post(lbu.Path, "application/octet-stream", s)
+ lbu.valid = false
resp.Body.Close()
- if debug {
+ if lbu.debug {
fmt.Printf("Ending a load-bearing upload.\n")
}
return true
@@ -119,7 +145,12 @@ func doUpload(ctx context.Context, client *http.Client, path string, count *uint
func (lbu *LoadBearingConnectionUpload) Start(ctx context.Context, debug bool) bool {
lbu.uploaded = 0
lbu.client = &http.Client{}
- fmt.Printf("Started a load bearing upload.\n")
- go doUpload(ctx, lbu.client, lbu.Path, &lbu.uploaded, debug)
+ lbu.debug = debug
+ lbu.valid = true
+
+ if debug {
+ fmt.Printf("Started a load-bearing upload.\n")
+ }
+ go lbu.doUpload(ctx)
return true
}
diff --git a/ma/ma.go b/ma/ma.go
index 5c3558b..b591d19 100644
--- a/ma/ma.go
+++ b/ma/ma.go
@@ -1,8 +1,6 @@
package ma
import (
- "math"
-
"github.com/hawkinsw/goresponsiveness/saturating"
"github.com/hawkinsw/goresponsiveness/utilities"
)
@@ -34,13 +32,13 @@ func (ma *MovingAverage) CalculateAverage() float64 {
return float64(total) / float64(ma.divisor.Value())
}
-func (ma *MovingAverage) ConsistentWithin(limit float64) bool {
+func (ma *MovingAverage) IncreasesLessThan(limit float64) bool {
previous := ma.instants[0]
for i := 1; i < ma.intervals; i++ {
current := ma.instants[i]
- percentChange := utilities.AbsPercentDifference(current, previous)
+ percentChange := utilities.SignedPercentDifference(current, previous)
previous = current
- if math.Abs(percentChange) > limit {
+ if percentChange > limit {
return false
}
}
diff --git a/networkQuality.go b/networkQuality.go
index 44d64b1..54c4260 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -21,6 +21,19 @@ import (
"github.com/hawkinsw/goresponsiveness/utilities"
)
+var (
+ // Variables to hold CLI arguments.
+ configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.")
+ configPort = flag.Int("port", 4043, "port number on which to access responsiveness configuration server.")
+ configPath = flag.String("path", "config", "path on the server to the configuration endpoint.")
+ debug = flag.Bool("debug", false, "Enable debugging.")
+ timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.")
+ storeSslKeys = flag.Bool("store-ssl-keys", false, "Store SSL keys from connections for debugging. (currently unused)")
+
+ // Global configuration
+ cooldownPeriod int = 4
+)
+
type ConfigUrls struct {
SmallUrl string `json:"small_https_download_url"`
LargeUrl string `json:"large_https_download_url"`
@@ -28,9 +41,10 @@ type ConfigUrls struct {
}
type Config struct {
- Version int
- Urls ConfigUrls `json:"urls"`
- Source string
+ Version int
+ Urls ConfigUrls `json:"urls"`
+ Source string
+ Test_Endpoint string
}
func (c *Config) Get(configHost string, configPath string) error {
@@ -56,11 +70,30 @@ func (c *Config) Get(configHost string, configPath string) error {
if err != nil {
return fmt.Errorf("Error: Could not parse configuration returned from %s: %v\n", c.Source, err)
}
+
+ //if len(c.Test_Endpoint) != 0 {
+ if false {
+ tempUrl, err := url.Parse(c.Urls.LargeUrl)
+ if err != nil {
+ return fmt.Errorf("Error parsing large_https_download_url: %v", err)
+ }
+ c.Urls.LargeUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path
+ tempUrl, err = url.Parse(c.Urls.SmallUrl)
+ if err != nil {
+ return fmt.Errorf("Error parsing small_https_download_url: %v", err)
+ }
+ c.Urls.SmallUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path
+ tempUrl, err = url.Parse(c.Urls.UploadUrl)
+ if err != nil {
+ return fmt.Errorf("Error parsing https_upload_url: %v", err)
+ }
+ c.Urls.UploadUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path
+ }
return nil
}
func (c *Config) String() string {
- return fmt.Sprintf("Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s", c.Version, c.Urls.SmallUrl, c.Urls.LargeUrl, c.Urls.UploadUrl)
+ return fmt.Sprintf("Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s\nEndpoint: %s\n", c.Version, c.Urls.SmallUrl, c.Urls.LargeUrl, c.Urls.UploadUrl, c.Test_Endpoint)
}
func (c *Config) IsValid() error {
@@ -76,19 +109,13 @@ func (c *Config) IsValid() error {
return nil
}
-func toMBs(bytes float64) float64 {
- return float64(bytes) / float64(1024*1024)
+func toMbps(bytes float64) float64 {
+ return toMBps(bytes) * float64(8)
}
-var (
- // Variables to hold CLI arguments.
- configHost = flag.String("config", "networkquality.example.com", "name/IP of responsiveness configuration server.")
- configPort = flag.Int("port", 4043, "port number on which to access responsiveness configuration server.")
- configPath = flag.String("path", "config", "path on the server to the configuration endpoint.")
- debug = flag.Bool("debug", false, "Enable debugging.")
- timeout = flag.Int("timeout", 20, "Maximum time to spend measuring.")
- storeSslKeys = flag.Bool("store-ssl-keys", false, "Store SSL keys from connections for debugging. (currently unused)")
-)
+func toMBps(bytes float64) float64 {
+ return float64(bytes) / float64(1024*1024)
+}
func addFlows(ctx context.Context, toAdd uint64, lbcs *[]lbc.LoadBearingConnection, lbcsPreviousTransferred *[]uint64, lbcGenerator func() lbc.LoadBearingConnection, debug bool) {
for i := uint64(0); i < toAdd; i++ {
@@ -107,7 +134,19 @@ type SaturationResult struct {
Lbcs []lbc.LoadBearingConnection
}
-func saturate(ctx context.Context, lbcGenerator func() lbc.LoadBearingConnection, debug bool) (saturated chan SaturationResult) {
+type Debugging struct {
+ Prefix string
+}
+
+func NewDebugging(prefix string) *Debugging {
+ return &Debugging{Prefix: prefix}
+}
+
+func (d *Debugging) String() string {
+ return d.Prefix
+}
+
+func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGenerator func() lbc.LoadBearingConnection, debug *Debugging) (saturated chan SaturationResult) {
saturated = make(chan SaturationResult)
go func() {
@@ -115,37 +154,49 @@ func saturate(ctx context.Context, lbcGenerator func() lbc.LoadBearingConnection
lbcsPreviousTransferred := make([]uint64, 0)
// Create 4 load bearing connections
- addFlows(ctx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug)
+ addFlows(operatingCtx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil)
previousFlowIncreaseIteration := uint64(0)
previousMovingAverage := float64(0)
movingAverage := ma.NewMovingAverage(4)
movingAverageAverage := ma.NewMovingAverage(4)
- nextTime := time.Now().Add(time.Second)
+ nextSampleStartTime := time.Now().Add(time.Second)
for currentIteration := uint64(0); true; currentIteration++ {
- // If we are cancelled, then stop.
- if ctx.Err() != nil {
+ // When the program stops operating, then stop.
+ if operatingCtx.Err() != nil {
return
}
+ // We may be asked to stop trying to saturate the
+ // network and return our current status.
+ if saturationCtx.Err() != nil {
+ //break
+ }
+
now := time.Now()
// At each 1-second interval
- if nextTime.Second() > now.Second() {
- if debug {
- fmt.Printf("Sleeping until %v\n", nextTime)
+ if nextSampleStartTime.Sub(now) > 0 {
+ if debug != nil {
+ fmt.Printf("%v: Sleeping until %v\n", debug, nextSampleStartTime)
}
- time.Sleep(nextTime.Sub(now))
+ time.Sleep(nextSampleStartTime.Sub(now))
} else {
- fmt.Printf("Warning: Missed a one-second deadline.\n")
+ fmt.Fprintf(os.Stderr, "Warning: Missed a one-second deadline.\n")
}
- nextTime = time.Now().Add(time.Second)
+ nextSampleStartTime = time.Now().Add(time.Second)
// Compute "instantaneous aggregate" goodput which is the number of bytes transferred within the last second.
totalTransfer := uint64(0)
for i := range lbcs {
+ if !lbcs[i].IsValid() {
+ if debug != nil {
+ fmt.Printf("%v: Load-bearing connection at index %d is invalid ... skipping.\n", debug, i)
+ }
+ continue
+ }
previousTransferred := lbcsPreviousTransferred[i]
currentTransferred := lbcs[i].Transferred()
totalTransfer += (currentTransferred - previousTransferred)
@@ -157,41 +208,51 @@ func saturate(ctx context.Context, lbcGenerator func() lbc.LoadBearingConnection
currentMovingAverage := movingAverage.CalculateAverage()
movingAverageAverage.AddMeasurement(currentMovingAverage)
movingAverageDelta := utilities.SignedPercentDifference(currentMovingAverage, previousMovingAverage)
+
+ if debug != nil {
+ fmt.Printf("%v: Instantaneous goodput: %f MB.\n", debug, toMBps(float64(totalTransfer)))
+ fmt.Printf("%v: Previous moving average: %f MB.\n", debug, toMBps(previousMovingAverage))
+ fmt.Printf("%v: Current moving average: %f MB.\n", debug, toMBps(currentMovingAverage))
+ fmt.Printf("%v: Moving average delta: %f.\n", debug, movingAverageDelta)
+ }
+
previousMovingAverage = currentMovingAverage
- if debug {
- fmt.Printf("Instantaneous goodput: %f MB.\n", toMBs(float64(totalTransfer)))
- fmt.Printf("Moving average: %f MB.\n", toMBs(currentMovingAverage))
- fmt.Printf("Moving average delta: %f.\n", movingAverageDelta)
+ // Special case: We won't make any adjustments on the first iteration.
+ if currentIteration == 0 {
+ continue
}
// If moving average > "previous" moving average + 5%:
- if currentIteration == 0 || movingAverageDelta > float64(5) {
+ if movingAverageDelta > float64(5) {
// Network did not yet reach saturation. If no flows added within the last 4 seconds, add 4 more flows
if (currentIteration - previousFlowIncreaseIteration) > 4 {
- if debug {
- fmt.Printf("Adding flows because we are unsaturated and waited a while.\n")
+ if debug != nil {
+ fmt.Printf("%v: Adding flows because we are unsaturated and waited a while.\n", debug)
}
- addFlows(ctx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug)
+ addFlows(operatingCtx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil)
previousFlowIncreaseIteration = currentIteration
} else {
- if debug {
- fmt.Printf("We are unsaturated, but it still too early to add anything.\n")
+ if debug != nil {
+ fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debug)
}
}
} else { // Else, network reached saturation for the current flow count.
+ if debug != nil {
+ fmt.Printf("%v: Network reached saturation with current flow count.\n", debug)
+ }
// If new flows added and for 4 seconds the moving average throughput did not change: network reached stable saturation
- if (currentIteration-previousFlowIncreaseIteration) < 4 && movingAverageAverage.ConsistentWithin(float64(4)) {
- if debug {
- fmt.Printf("New flows added within the last four seconds and the moving-average average is consistent!\n")
+ if (currentIteration-previousFlowIncreaseIteration) < 4 && movingAverageAverage.IncreasesLessThan(float64(5)) {
+ if debug != nil {
+ fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debug)
}
break
} else {
// Else, add four more flows
- if debug {
- fmt.Printf("New flows to add to try to increase our saturation!\n")
+ if debug != nil {
+ fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debug)
}
- addFlows(ctx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug)
+ addFlows(operatingCtx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil)
previousFlowIncreaseIteration = currentIteration
}
}
@@ -208,6 +269,7 @@ func main() {
timeoutDuration := time.Second * time.Duration(*timeout)
configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort)
operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background())
+ saturationCtx, cancelSaturationCtx := context.WithCancel(context.Background())
config := &Config{}
if err := config.Get(configHostPort, *configPath); err != nil {
@@ -230,34 +292,48 @@ func main() {
generate_lbu := func() lbc.LoadBearingConnection {
return &lbc.LoadBearingConnectionUpload{Path: config.Urls.UploadUrl}
}
- downloadSaturationChannel := saturate(operatingCtx, generate_lbd, *debug)
- uploadSaturationChannel := saturate(operatingCtx, generate_lbu, *debug)
- test_timeout := false
- upload_saturated := false
- download_saturated := false
+ downloadSaturationChannel := saturate(saturationCtx, operatingCtx, generate_lbd, NewDebugging("download"))
+ uploadSaturationChannel := saturate(saturationCtx, operatingCtx, generate_lbu, NewDebugging("upload"))
+
+ saturationTimeout := false
+ uploadSaturated := false
+ downloadSaturated := false
downloadSaturation := SaturationResult{}
uploadSaturation := SaturationResult{}
- for !test_timeout && !(upload_saturated && download_saturated) {
+ for !(uploadSaturated && downloadSaturated) {
select {
case downloadSaturation = <-downloadSaturationChannel:
{
- download_saturated = true
+ downloadSaturated = true
if *debug {
- fmt.Printf("################# download is saturated (%fMBps, %d flows)!\n", toMBs(downloadSaturation.RateBps), len(downloadSaturation.Lbcs))
+ fmt.Printf("################# download is %s saturated (%fMBps, %d flows)!\n", utilities.Conditional(saturationTimeout, "(provisionally)", ""), toMBps(downloadSaturation.RateBps), len(downloadSaturation.Lbcs))
}
}
case uploadSaturation = <-uploadSaturationChannel:
{
- upload_saturated = true
+ uploadSaturated = true
if *debug {
- fmt.Printf("################# upload is saturated (%fMBps, %d flows)!\n", toMBs(uploadSaturation.RateBps), len(uploadSaturation.Lbcs))
+ fmt.Printf("################# upload is %s saturated (%fMBps, %d flows)!\n", utilities.Conditional(saturationTimeout, "(provisionally)", ""), toMBps(uploadSaturation.RateBps), len(uploadSaturation.Lbcs))
}
}
case <-timeoutChannel:
{
- test_timeout = true
+ if saturationTimeout {
+ // We already timedout on saturation. This signal means that
+ // we are timedout on getting the provisional saturation. We
+ // will exit!
+ fmt.Fprint(os.Stderr, "Error: Saturation could not be completed in time and no provisional rates could be accessed. Test failed.\n")
+ cancelOperatingCtx()
+ if *debug {
+ time.Sleep(time.Duration(cooldownPeriod) * time.Second)
+ }
+ return
+ }
+ saturationTimeout = true
+ timeoutChannel = timeoutat.TimeoutAt(operatingCtx, time.Now().Add(5*time.Second), *debug)
+ cancelSaturationCtx()
if *debug {
fmt.Printf("################# timeout reaching saturation!\n")
}
@@ -265,26 +341,21 @@ func main() {
}
}
- if test_timeout {
- cancelOperatingCtx()
- fmt.Fprintf(os.Stderr, "Error: Did not reach upload/download saturation before test time expired (%v).\n.", timeoutDuration)
- return
- }
-
robustnessProbeIterationCount := 5
- actualRTTCount := 0
+ totalRTTsCount := 0
totalRTTTime := float64(0)
+ rttTimeout := false
- for i := 0; i < robustnessProbeIterationCount && !test_timeout; i++ {
+ for i := 0; i < robustnessProbeIterationCount && !rttTimeout; i++ {
randomLbcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Lbcs)
select {
case <-timeoutChannel:
{
- test_timeout = true
+ rttTimeout = true
}
case fiveRTTsTime := <-utilities.TimedSequentialRTTs(operatingCtx, downloadSaturation.Lbcs[randomLbcsIndex].Client(), &http.Client{}, config.Urls.SmallUrl):
{
- actualRTTCount += 5
+ totalRTTsCount += 5
totalRTTTime += fiveRTTsTime.Delay.Seconds()
if *debug {
fmt.Printf("fiveRTTsTime: %v\n", fiveRTTsTime.Delay.Seconds())
@@ -293,13 +364,15 @@ func main() {
}
}
- rpm := float64(60) / (totalRTTTime / (float64(actualRTTCount) * 5))
+ rpm := float64(60) / (totalRTTTime / (float64(totalRTTsCount) * 5))
+ fmt.Printf("Download: %f MBps (%f Mbps), using %d parallel connections.\n", toMBps(downloadSaturation.RateBps), toMbps(downloadSaturation.RateBps), len(downloadSaturation.Lbcs))
+ fmt.Printf("Upload: %f MBps (%f Mbps), using %d parallel connections.\n", toMBps(uploadSaturation.RateBps), toMbps(uploadSaturation.RateBps), len(uploadSaturation.Lbcs))
+ fmt.Printf("Total RTTs measured: %d\n", totalRTTsCount)
fmt.Printf("RPM: %v\n", rpm)
cancelOperatingCtx()
if *debug {
- // Hold on to cool down.
- time.Sleep(4 * time.Second)
+ time.Sleep(time.Duration(cooldownPeriod) * time.Second)
}
}