diff options
| -rw-r--r-- | README.md | 14 | ||||
| -rw-r--r-- | constants/constants.go | 3 | ||||
| -rw-r--r-- | lbc/lbc.go | 18 | ||||
| -rw-r--r-- | ma/ma.go | 3 | ||||
| -rw-r--r-- | networkQuality.go | 105 | ||||
| -rw-r--r-- | timeoutat/timeoutat.go | 6 | ||||
| -rw-r--r-- | utilities/utilities.go | 69 |
7 files changed, 164 insertions, 54 deletions
@@ -86,6 +86,20 @@ You can also test against the Apple infrastructure using: $ ./networkQuality --config mensura.cdn-apple.com --port 443 --path /api/v1/gm/config ``` +## Contributing + +We *love* contributions. Before submitting a patch, format your code with `go fmt` *and* `golines`: + +``` +$ golines -w -m 80 --shorten-comments . +``` + +You can easily install `golines` in to your `${GOPATH}` with + +``` +$ go install github.com/segmentio/golines@latest +``` + ## References [RFC2026] https://datatracker.ietf.org/doc/html/rfc2026 diff --git a/constants/constants.go b/constants/constants.go index 436dd3b..a94c777 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -5,7 +5,8 @@ import "time" var ( // The initial number of connections on a LBC. StartingNumberOfLoadBearingConnections uint64 = 4 - // The number of intervals for which to account in a moving-average calculation. + // The number of intervals for which to account in a moving-average + // calculation. MovingAverageIntervalCount int = 4 // The number of intervals across which to consider a moving average stable. MovingAverageStabilitySpan int = 4 @@ -72,13 +72,18 @@ func (cr *countingReader) Read(p []byte) (n int, err error) { return } -func (lbd *LoadBearingConnectionDownload) Start(ctx context.Context, debug bool) bool { +func (lbd *LoadBearingConnectionDownload) Start( + ctx context.Context, + debug bool, +) bool { lbd.downloaded = 0 transport := http2.Transport{} if !utilities.IsInterfaceNil(lbd.KeyLogger) { if debug { - fmt.Printf("Using an SSL Key Logger for this load-bearing download.\n") + fmt.Printf( + "Using an SSL Key Logger for this load-bearing download.\n", + ) } transport.TLSClientConfig = &tls.Config{ KeyLogWriter: lbd.KeyLogger, @@ -171,13 +176,18 @@ func (lbu *LoadBearingConnectionUpload) doUpload(ctx context.Context) bool { return true } -func (lbu *LoadBearingConnectionUpload) Start(ctx context.Context, debug bool) bool { +func (lbu *LoadBearingConnectionUpload) Start( + ctx context.Context, + debug bool, +) bool { lbu.uploaded = 0 transport := http2.Transport{} if !utilities.IsInterfaceNil(lbu.KeyLogger) { if debug { - fmt.Printf("Using an SSL Key Logger for this load-bearing upload.\n") + fmt.Printf( + "Using an SSL Key Logger for this load-bearing upload.\n", + ) } transport.TLSClientConfig = &tls.Config{ KeyLogWriter: lbu.KeyLogger, @@ -59,7 +59,8 @@ func (ma *MovingAverage) AllSequentialIncreasesLessThan(limit float64) bool { return false } - // Invariant: ma.index always points to the oldest (see AddMeasurement above) + // Invariant: ma.index always points to the oldest (see AddMeasurement + // above) oldestIndex := ma.index previous := ma.instants[oldestIndex] for i := 1; i < ma.intervals; i++ { diff --git a/networkQuality.go b/networkQuality.go index 53bf990..d71411d 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -161,21 +161,33 @@ func (c *Config) IsValid() error { parsedUrl.Scheme != "https" { return fmt.Errorf( "Configuration url large_https_download_url is invalid: %s", - utilities.Conditional(len(c.Urls.LargeUrl) != 0, c.Urls.LargeUrl, "Missing"), + utilities.Conditional( + len(c.Urls.LargeUrl) != 0, + c.Urls.LargeUrl, + "Missing", + ), ) } if parsedUrl, err := url.ParseRequestURI(c.Urls.SmallUrl); err != nil || parsedUrl.Scheme != "https" { return fmt.Errorf( "Configuration url small_https_download_url is invalid: %s", - utilities.Conditional(len(c.Urls.SmallUrl) != 0, c.Urls.SmallUrl, "Missing"), + utilities.Conditional( + len(c.Urls.SmallUrl) != 0, + c.Urls.SmallUrl, + "Missing", + ), ) } if parsedUrl, err := url.ParseRequestURI(c.Urls.UploadUrl); err != nil || parsedUrl.Scheme != "https" { return fmt.Errorf( "Configuration url https_upload_url is invalid: %s", - utilities.Conditional(len(c.Urls.UploadUrl) != 0, c.Urls.UploadUrl, "Missing"), + utilities.Conditional( + len(c.Urls.UploadUrl) != 0, + c.Urls.UploadUrl, + "Missing", + ), ) } return nil @@ -239,8 +251,12 @@ func saturate( previousFlowIncreaseIteration := uint64(0) previousMovingAverage := float64(0) - movingAverage := ma.NewMovingAverage(constants.MovingAverageIntervalCount) - movingAverageAverage := ma.NewMovingAverage(constants.MovingAverageIntervalCount) + movingAverage := ma.NewMovingAverage( + constants.MovingAverageIntervalCount, + ) + movingAverageAverage := ma.NewMovingAverage( + constants.MovingAverageIntervalCount, + ) nextSampleStartTime := time.Now().Add(time.Second) @@ -261,7 +277,11 @@ func saturate( // At each 1-second interval if nextSampleStartTime.Sub(now) > 0 { if debug != nil { - fmt.Printf("%v: Sleeping until %v\n", debug, nextSampleStartTime) + fmt.Printf( + "%v: Sleeping until %v\n", + debug, + nextSampleStartTime, + ) } time.Sleep(nextSampleStartTime.Sub(now)) } else { @@ -269,7 +289,8 @@ func saturate( } nextSampleStartTime = time.Now().Add(time.Second) - // Compute "instantaneous aggregate" goodput which is the number of bytes transferred within the last second. + // Compute "instantaneous aggregate" goodput which is the number of + // bytes transferred within the last second. totalTransfer := uint64(0) allInvalid := true for i := range lbcs { @@ -290,7 +311,8 @@ func saturate( lbcsPreviousTransferred[i] = currentTransferred } - // For some reason, all the LBCs are invalid. This likely means that the network/server went away. + // For some reason, all the LBCs are invalid. This likely means that + // the network/server went away. if allInvalid { if debug != nil { fmt.Printf( @@ -301,7 +323,9 @@ func saturate( break } - // Compute a moving average of the last constants.MovingAverageIntervalCount "instantaneous aggregate goodput" measurements + // Compute a moving average of the last + // constants.MovingAverageIntervalCount "instantaneous aggregate + // goodput" measurements movingAverage.AddMeasurement(float64(totalTransfer)) currentMovingAverage := movingAverage.CalculateAverage() movingAverageAverage.AddMeasurement(currentMovingAverage) @@ -326,19 +350,25 @@ func saturate( debug, utilities.ToMBps(currentMovingAverage), ) - fmt.Printf("%v: Moving average delta: %f.\n", debug, movingAverageDelta) + fmt.Printf( + "%v: Moving average delta: %f.\n", + debug, + movingAverageDelta, + ) } previousMovingAverage = currentMovingAverage - // Special case: We won't make any adjustments on the first iteration. + // Special case: We won't make any adjustments on the first + // iteration. if currentIteration == 0 { continue } // If moving average > "previous" moving average + InstabilityDelta: if movingAverageDelta > constants.InstabilityDelta { - // Network did not yet reach saturation. If no flows added within the last 4 seconds, add 4 more flows + // Network did not yet reach saturation. If no flows added + // within the last 4 seconds, add 4 more flows if (currentIteration - previousFlowIncreaseIteration) > uint64( constants.MovingAverageStabilitySpan, ) { @@ -366,7 +396,8 @@ func saturate( if debug != nil { fmt.Printf("%v: Network reached saturation with current flow count.\n", debug) } - // If new flows added and for 4 seconds the moving average throughput did not change: network reached stable saturation + // If new flows added and for 4 seconds the moving average + // throughput did not change: network reached stable saturation if (currentIteration-previousFlowIncreaseIteration) < uint64(constants.MovingAverageStabilitySpan) && movingAverageAverage.AllSequentialIncreasesLessThan(float64(5)) { if debug != nil { fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debug) @@ -395,7 +426,9 @@ func main() { timeoutAbsoluteTime := time.Now().Add(timeoutDuration) configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort) operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background()) - saturationCtx, cancelSaturationCtx := context.WithCancel(context.Background()) + saturationCtx, cancelSaturationCtx := context.WithCancel( + context.Background(), + ) config := &Config{} if err := config.Get(configHostPort, *configPath); err != nil { @@ -415,7 +448,11 @@ func main() { fmt.Printf("Configuration: %s\n", config) } - timeoutChannel := timeoutat.TimeoutAt(operatingCtx, timeoutAbsoluteTime, *debug) + timeoutChannel := timeoutat.TimeoutAt( + operatingCtx, + timeoutAbsoluteTime, + *debug, + ) if *debug { fmt.Printf("Test will end earlier than %v\n", timeoutAbsoluteTime) } @@ -488,7 +525,12 @@ func main() { generate_lbd, downloadDebugging, ) - uploadSaturationChannel := saturate(saturationCtx, operatingCtx, generate_lbu, uploadDebugging) + uploadSaturationChannel := saturate( + saturationCtx, + operatingCtx, + generate_lbu, + uploadDebugging, + ) saturationTimeout := false uploadSaturated := false @@ -504,7 +546,11 @@ func main() { if *debug { fmt.Printf( "################# download is %s saturated (%fMBps, %d flows)!\n", - utilities.Conditional(saturationTimeout, "(provisionally)", ""), + utilities.Conditional( + saturationTimeout, + "(provisionally)", + "", + ), utilities.ToMBps(downloadSaturation.RateBps), len(downloadSaturation.Lbcs), ) @@ -516,7 +562,11 @@ func main() { if *debug { fmt.Printf( "################# upload is %s saturated (%fMBps, %d flows)!\n", - utilities.Conditional(saturationTimeout, "(provisionally)", ""), + utilities.Conditional( + saturationTimeout, + "(provisionally)", + "", + ), utilities.ToMBps(uploadSaturation.RateBps), len(uploadSaturation.Lbcs), ) @@ -540,13 +590,22 @@ func main() { } saturationTimeout = true - // We timed out attempting to saturate the link. So, we will shut down all the saturation xfers + // We timed out attempting to saturate the link. So, we will + // shut down all the saturation xfers cancelSaturationCtx() - // and then we will give ourselves some additional time in order to calculate a provisional saturation. - timeoutAbsoluteTime = time.Now().Add(constants.RPMCalculationTime) - timeoutChannel = timeoutat.TimeoutAt(operatingCtx, timeoutAbsoluteTime, *debug) + // and then we will give ourselves some additional time in order + // to calculate a provisional saturation. + timeoutAbsoluteTime = time.Now(). + Add(constants.RPMCalculationTime) + timeoutChannel = timeoutat.TimeoutAt( + operatingCtx, + timeoutAbsoluteTime, + *debug, + ) if *debug { - fmt.Printf("################# timeout reaching saturation!\n") + fmt.Printf( + "################# timeout reaching saturation!\n", + ) } } } diff --git a/timeoutat/timeoutat.go b/timeoutat/timeoutat.go index 263890d..0e13a9f 100644 --- a/timeoutat/timeoutat.go +++ b/timeoutat/timeoutat.go @@ -20,7 +20,11 @@ import ( "time" ) -func TimeoutAt(ctx context.Context, when time.Time, debug bool) (response chan interface{}) { +func TimeoutAt( + ctx context.Context, + when time.Time, + debug bool, +) (response chan interface{}) { response = make(chan interface{}) go func(ctx context.Context) { go func() { diff --git a/utilities/utilities.go b/utilities/utilities.go index c74e99b..8b64da4 100644 --- a/utilities/utilities.go +++ b/utilities/utilities.go @@ -29,11 +29,22 @@ func IsInterfaceNil(ifc interface{}) bool { (reflect.ValueOf(ifc).Kind() == reflect.Ptr && reflect.ValueOf(ifc).IsNil()) } -func SignedPercentDifference(current float64, previous float64) (difference float64) { - return ((current - previous) / (float64(current+previous) / 2.0)) * float64(100) +func SignedPercentDifference( + current float64, + previous float64, +) (difference float64) { + return ((current - previous) / (float64(current+previous) / 2.0)) * float64( + 100, + ) } -func AbsPercentDifference(current float64, previous float64) (difference float64) { - return (math.Abs(current-previous) / (float64(current+previous) / 2.0)) * float64(100) + +func AbsPercentDifference( + current float64, + previous float64, +) (difference float64) { + return (math.Abs(current-previous) / (float64(current+previous) / 2.0)) * float64( + 100, + ) } func Conditional(condition bool, t string, f string) string { @@ -52,36 +63,44 @@ func ToMBps(bytes float64) float64 { } type GetLatency struct { - Delay time.Duration - RTTs uint16 - Err error + Delay time.Duration + RoundTripCount uint16 + Err error } func CalculateSequentialRTTsTime( ctx context.Context, - client_a *http.Client, - client_b *http.Client, + saturated_client *http.Client, + new_client *http.Client, url string, ) chan GetLatency { responseChannel := make(chan GetLatency) go func() { + roundTripCount := uint16(0) before := time.Now() - c_a, err := client_a.Get(url) - if err != nil { - responseChannel <- GetLatency{Delay: 0, RTTs: 0, Err: err} - return - } - // TODO: Make this interruptable somehow by using _ctx_. - _, err = io.ReadAll(c_a.Body) - if err != nil { - responseChannel <- GetLatency{Delay: 0, RTTs: 0, Err: err} - return - } - c_a.Body.Close() + /* + TODO: We are not going to measure round-trip times on the load-bearing connection + right now because we are dealing with a massive amount of buffer bloat on the + Apple CDN. - c_b, err := client_b.Get(url) + c_a, err := saturated_client.Get(url) + if err != nil { + responseChannel <- GetLatency{Delay: 0, RTTs: 0, Err: err} + return + } + // TODO: Make this interruptable somehow + // by using _ctx_. + _, err = io.ReadAll(c_a.Body) + if err != nil { + responseChannel <- GetLatency{Delay: 0, RTTs: 0, Err: err} + return + } + roundTripCount += 5 + c_a.Body.Close() + */ + c_b, err := new_client.Get(url) if err != nil { - responseChannel <- GetLatency{Delay: 0, RTTs: 0, Err: err} + responseChannel <- GetLatency{Delay: 0, RoundTripCount: 0, Err: err} return } // TODO: Make this interruptable somehow by using _ctx_. @@ -91,7 +110,9 @@ func CalculateSequentialRTTsTime( return } c_b.Body.Close() - responseChannel <- GetLatency{Delay: time.Since(before), RTTs: 10, Err: nil} + // We use 1 here according to the wording in 4.2.1. + roundTripCount += 1 + responseChannel <- GetLatency{Delay: time.Since(before), RoundTripCount: roundTripCount, Err: nil} }() return responseChannel } |
