diff options
| -rw-r--r-- | lbc/lbc.go | 10 | ||||
| -rw-r--r-- | ma/ma.go | 6 | ||||
| -rw-r--r-- | networkQuality.go | 247 | ||||
| -rw-r--r-- | utilities/utilities.go | 10 |
4 files changed, 226 insertions, 47 deletions
@@ -80,7 +80,10 @@ func (lbd *LoadBearingConnectionDownload) Start(ctx context.Context, debug bool) if debug { fmt.Printf("Using an SSL Key Logger for this load-bearing download.\n") } - transport.TLSClientConfig = &tls.Config{KeyLogWriter: lbd.KeyLogger, InsecureSkipVerify: true} + transport.TLSClientConfig = &tls.Config{ + KeyLogWriter: lbd.KeyLogger, + InsecureSkipVerify: true, + } } lbd.client = &http.Client{Transport: &transport} @@ -176,7 +179,10 @@ func (lbu *LoadBearingConnectionUpload) Start(ctx context.Context, debug bool) b if debug { fmt.Printf("Using an SSL Key Logger for this load-bearing upload.\n") } - transport.TLSClientConfig = &tls.Config{KeyLogWriter: lbu.KeyLogger, InsecureSkipVerify: true} + transport.TLSClientConfig = &tls.Config{ + KeyLogWriter: lbu.KeyLogger, + InsecureSkipVerify: true, + } } lbu.client = &http.Client{Transport: &transport} @@ -29,7 +29,11 @@ type MovingAverage struct { } func NewMovingAverage(intervals int) *MovingAverage { - return &MovingAverage{instants: make([]float64, intervals), intervals: intervals, divisor: saturating.NewSaturatingInt(intervals)} + return &MovingAverage{ + instants: make([]float64, intervals), + intervals: intervals, + divisor: saturating.NewSaturatingInt(intervals), + } } func (ma *MovingAverage) AddMeasurement(measurement float64) { diff --git a/networkQuality.go b/networkQuality.go index 5dd0c15..7e4d294 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -40,13 +40,37 @@ import ( var ( // Variables to hold CLI arguments. - configHost = flag.String("config", constants.DefaultConfigHost, "name/IP of responsiveness configuration server.") - configPort = flag.Int("port", constants.DefaultPortNumber, "port number on which to access responsiveness configuration server.") - configPath = flag.String("path", "config", "path on the server to the configuration endpoint.") - debug = flag.Bool("debug", constants.DefaultDebug, "Enable debugging.") - timeout = flag.Int("timeout", constants.DefaultTestTime, "Maximum time to spend measuring.") - sslKeyFileName = flag.String("ssl-key-file", "", "Store the per-session SSL key files in this file.") - profile = flag.String("profile", "", "Enable client runtime profiling and specify storage location. Disabled by default.") + configHost = flag.String( + "config", + constants.DefaultConfigHost, + "name/IP of responsiveness configuration server.", + ) + configPort = flag.Int( + "port", + constants.DefaultPortNumber, + "port number on which to access responsiveness configuration server.", + ) + configPath = flag.String( + "path", + "config", + "path on the server to the configuration endpoint.", + ) + debug = flag.Bool("debug", constants.DefaultDebug, "Enable debugging.") + timeout = flag.Int( + "timeout", + constants.DefaultTestTime, + "Maximum time to spend measuring.", + ) + sslKeyFileName = flag.String( + "ssl-key-file", + "", + "Store the per-session SSL key files in this file.", + ) + profile = flag.String( + "profile", + "", + "Enable client runtime profiling and specify storage location. Disabled by default.", + ) ) type ConfigUrls struct { @@ -73,17 +97,29 @@ func (c *Config) Get(configHost string, configPath string) error { c.Source = fmt.Sprintf("https://%s%s", configHost, configPath) resp, err := configClient.Get(c.Source) if err != nil { - return fmt.Errorf("Error: Could not connect to configuration host %s: %v\n", configHost, err) + return fmt.Errorf( + "Error: Could not connect to configuration host %s: %v\n", + configHost, + err, + ) } jsonConfig, err := ioutil.ReadAll(resp.Body) if err != nil { - return fmt.Errorf("Error: Could not read configuration content downloaded from %s: %v\n", c.Source, err) + return fmt.Errorf( + "Error: Could not read configuration content downloaded from %s: %v\n", + c.Source, + err, + ) } err = json.Unmarshal(jsonConfig, c) if err != nil { - return fmt.Errorf("Error: Could not parse configuration returned from %s: %v\n", c.Source, err) + return fmt.Errorf( + "Error: Could not parse configuration returned from %s: %v\n", + c.Source, + err, + ) } //if len(c.Test_Endpoint) != 0 { @@ -108,23 +144,49 @@ func (c *Config) Get(configHost string, configPath string) error { } func (c *Config) String() string { - return fmt.Sprintf("Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s\nEndpoint: %s\n", c.Version, c.Urls.SmallUrl, c.Urls.LargeUrl, c.Urls.UploadUrl, c.Test_Endpoint) + return fmt.Sprintf( + "Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s\nEndpoint: %s\n", + c.Version, + c.Urls.SmallUrl, + c.Urls.LargeUrl, + c.Urls.UploadUrl, + c.Test_Endpoint, + ) } func (c *Config) IsValid() error { - if parsedUrl, err := url.ParseRequestURI(c.Urls.LargeUrl); err != nil || parsedUrl.Scheme != "https" { - return fmt.Errorf("Configuration url large_https_download_url is invalid: %s", utilities.Conditional(len(c.Urls.LargeUrl) != 0, c.Urls.LargeUrl, "Missing")) + if parsedUrl, err := url.ParseRequestURI(c.Urls.LargeUrl); err != nil || + parsedUrl.Scheme != "https" { + return fmt.Errorf( + "Configuration url large_https_download_url is invalid: %s", + utilities.Conditional(len(c.Urls.LargeUrl) != 0, c.Urls.LargeUrl, "Missing"), + ) } - if parsedUrl, err := url.ParseRequestURI(c.Urls.SmallUrl); err != nil || parsedUrl.Scheme != "https" { - return fmt.Errorf("Configuration url small_https_download_url is invalid: %s", utilities.Conditional(len(c.Urls.SmallUrl) != 0, c.Urls.SmallUrl, "Missing")) + if parsedUrl, err := url.ParseRequestURI(c.Urls.SmallUrl); err != nil || + parsedUrl.Scheme != "https" { + return fmt.Errorf( + "Configuration url small_https_download_url is invalid: %s", + utilities.Conditional(len(c.Urls.SmallUrl) != 0, c.Urls.SmallUrl, "Missing"), + ) } - if parsedUrl, err := url.ParseRequestURI(c.Urls.UploadUrl); err != nil || parsedUrl.Scheme != "https" { - return fmt.Errorf("Configuration url https_upload_url is invalid: %s", utilities.Conditional(len(c.Urls.UploadUrl) != 0, c.Urls.UploadUrl, "Missing")) + if parsedUrl, err := url.ParseRequestURI(c.Urls.UploadUrl); err != nil || + parsedUrl.Scheme != "https" { + return fmt.Errorf( + "Configuration url https_upload_url is invalid: %s", + utilities.Conditional(len(c.Urls.UploadUrl) != 0, c.Urls.UploadUrl, "Missing"), + ) } return nil } -func addFlows(ctx context.Context, toAdd uint64, lbcs *[]lbc.LoadBearingConnection, lbcsPreviousTransferred *[]uint64, lbcGenerator func() lbc.LoadBearingConnection, debug bool) { +func addFlows( + ctx context.Context, + toAdd uint64, + lbcs *[]lbc.LoadBearingConnection, + lbcsPreviousTransferred *[]uint64, + lbcGenerator func() lbc.LoadBearingConnection, + debug bool, +) { for i := uint64(0); i < toAdd; i++ { *lbcs = append(*lbcs, lbcGenerator()) *lbcsPreviousTransferred = append(*lbcsPreviousTransferred, 0) @@ -152,14 +214,26 @@ func (d *Debugging) String() string { return d.Prefix } -func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGenerator func() lbc.LoadBearingConnection, debug *Debugging) (saturated chan SaturationResult) { +func saturate( + saturationCtx context.Context, + operatingCtx context.Context, + lbcGenerator func() lbc.LoadBearingConnection, + debug *Debugging, +) (saturated chan SaturationResult) { saturated = make(chan SaturationResult) go func() { lbcs := make([]lbc.LoadBearingConnection, 0) lbcsPreviousTransferred := make([]uint64, 0) - addFlows(saturationCtx, constants.StartingNumberOfLoadBearingConnections, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil) + addFlows( + saturationCtx, + constants.StartingNumberOfLoadBearingConnections, + &lbcs, + &lbcsPreviousTransferred, + lbcGenerator, + debug != nil, + ) previousFlowIncreaseIteration := uint64(0) previousMovingAverage := float64(0) @@ -199,7 +273,11 @@ func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGe for i := range lbcs { if !lbcs[i].IsValid() { if debug != nil { - fmt.Printf("%v: Load-bearing connection at index %d is invalid ... skipping.\n", debug, i) + fmt.Printf( + "%v: Load-bearing connection at index %d is invalid ... skipping.\n", + debug, + i, + ) } continue } @@ -213,7 +291,10 @@ func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGe // For some reason, all the LBCs are invalid. This likely means that the network/server went away. if allInvalid { if debug != nil { - fmt.Printf("%v: All LBCs were invalid. Assuming that network/server went away.\n", debug) + fmt.Printf( + "%v: All LBCs were invalid. Assuming that network/server went away.\n", + debug, + ) } break } @@ -222,12 +303,27 @@ func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGe movingAverage.AddMeasurement(float64(totalTransfer)) currentMovingAverage := movingAverage.CalculateAverage() movingAverageAverage.AddMeasurement(currentMovingAverage) - movingAverageDelta := utilities.SignedPercentDifference(currentMovingAverage, previousMovingAverage) + movingAverageDelta := utilities.SignedPercentDifference( + currentMovingAverage, + previousMovingAverage, + ) if debug != nil { - fmt.Printf("%v: Instantaneous goodput: %f MB.\n", debug, utilities.ToMBps(float64(totalTransfer))) - fmt.Printf("%v: Previous moving average: %f MB.\n", debug, utilities.ToMBps(previousMovingAverage)) - fmt.Printf("%v: Current moving average: %f MB.\n", debug, utilities.ToMBps(currentMovingAverage)) + fmt.Printf( + "%v: Instantaneous goodput: %f MB.\n", + debug, + utilities.ToMBps(float64(totalTransfer)), + ) + fmt.Printf( + "%v: Previous moving average: %f MB.\n", + debug, + utilities.ToMBps(previousMovingAverage), + ) + fmt.Printf( + "%v: Current moving average: %f MB.\n", + debug, + utilities.ToMBps(currentMovingAverage), + ) fmt.Printf("%v: Moving average delta: %f.\n", debug, movingAverageDelta) } @@ -241,11 +337,23 @@ func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGe // If moving average > "previous" moving average + InstabilityDelta: if movingAverageDelta > constants.InstabilityDelta { // Network did not yet reach saturation. If no flows added within the last 4 seconds, add 4 more flows - if (currentIteration - previousFlowIncreaseIteration) > uint64(constants.MovingAverageStabilitySpan) { + if (currentIteration - previousFlowIncreaseIteration) > uint64( + constants.MovingAverageStabilitySpan, + ) { if debug != nil { - fmt.Printf("%v: Adding flows because we are unsaturated and waited a while.\n", debug) + fmt.Printf( + "%v: Adding flows because we are unsaturated and waited a while.\n", + debug, + ) } - addFlows(saturationCtx, constants.AdditiveNumberOfLoadBearingConnections, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil) + addFlows( + saturationCtx, + constants.AdditiveNumberOfLoadBearingConnections, + &lbcs, + &lbcsPreviousTransferred, + lbcGenerator, + debug != nil, + ) previousFlowIncreaseIteration = currentIteration } else { if debug != nil { @@ -293,7 +401,12 @@ func main() { return } if err := config.IsValid(); err != nil { - fmt.Fprintf(os.Stderr, "Error: Invalid configuration returned from %s: %v\n", config.Source, err) + fmt.Fprintf( + os.Stderr, + "Error: Invalid configuration returned from %s: %v\n", + config.Source, + err, + ) return } if *debug { @@ -308,7 +421,12 @@ func main() { if len(*profile) != 0 { f, err := os.Create(*profile) if err != nil { - fmt.Fprintf(os.Stderr, "Error: Profiling requested with storage in %s but that file could not be opened: %v\n", *profile, err) + fmt.Fprintf( + os.Stderr, + "Error: Profiling requested with storage in %s but that file could not be opened: %v\n", + *profile, + err, + ) return } pprof.StartCPUProfile(f) @@ -335,10 +453,16 @@ func main() { } generate_lbd := func() lbc.LoadBearingConnection { - return &lbc.LoadBearingConnectionDownload{Path: config.Urls.LargeUrl, KeyLogger: sslKeyFileConcurrentWriter} + return &lbc.LoadBearingConnectionDownload{ + Path: config.Urls.LargeUrl, + KeyLogger: sslKeyFileConcurrentWriter, + } } generate_lbu := func() lbc.LoadBearingConnection { - return &lbc.LoadBearingConnectionUpload{Path: config.Urls.UploadUrl, KeyLogger: sslKeyFileConcurrentWriter} + return &lbc.LoadBearingConnectionUpload{ + Path: config.Urls.UploadUrl, + KeyLogger: sslKeyFileConcurrentWriter, + } } var downloadDebugging *Debugging = nil @@ -348,7 +472,12 @@ func main() { uploadDebugging = &Debugging{Prefix: "upload"} } - downloadSaturationChannel := saturate(saturationCtx, operatingCtx, generate_lbd, downloadDebugging) + downloadSaturationChannel := saturate( + saturationCtx, + operatingCtx, + generate_lbd, + downloadDebugging, + ) uploadSaturationChannel := saturate(saturationCtx, operatingCtx, generate_lbu, uploadDebugging) saturationTimeout := false @@ -363,14 +492,24 @@ func main() { { downloadSaturated = true if *debug { - fmt.Printf("################# download is %s saturated (%fMBps, %d flows)!\n", utilities.Conditional(saturationTimeout, "(provisionally)", ""), utilities.ToMBps(downloadSaturation.RateBps), len(downloadSaturation.Lbcs)) + fmt.Printf( + "################# download is %s saturated (%fMBps, %d flows)!\n", + utilities.Conditional(saturationTimeout, "(provisionally)", ""), + utilities.ToMBps(downloadSaturation.RateBps), + len(downloadSaturation.Lbcs), + ) } } case uploadSaturation = <-uploadSaturationChannel: { uploadSaturated = true if *debug { - fmt.Printf("################# upload is %s saturated (%fMBps, %d flows)!\n", utilities.Conditional(saturationTimeout, "(provisionally)", ""), utilities.ToMBps(uploadSaturation.RateBps), len(uploadSaturation.Lbcs)) + fmt.Printf( + "################# upload is %s saturated (%fMBps, %d flows)!\n", + utilities.Conditional(saturationTimeout, "(provisionally)", ""), + utilities.ToMBps(uploadSaturation.RateBps), + len(uploadSaturation.Lbcs), + ) } } case <-timeoutChannel: @@ -379,7 +518,10 @@ func main() { // We already timedout on saturation. This signal means that // we are timedout on getting the provisional saturation. We // will exit! - fmt.Fprint(os.Stderr, "Error: Saturation could not be completed in time and no provisional rates could be accessed. Test failed.\n") + fmt.Fprint( + os.Stderr, + "Error: Saturation could not be completed in time and no provisional rates could be accessed. Test failed.\n", + ) cancelOperatingCtx() if *debug { time.Sleep(constants.CooldownPeriod) @@ -423,10 +565,18 @@ func main() { if len(downloadSaturation.Lbcs) == 0 { continue } - randomLbcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Lbcs) + randomLbcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))). + Int() % + len( + downloadSaturation.Lbcs, + ) if !downloadSaturation.Lbcs[randomLbcsIndex].IsValid() { if *debug { - fmt.Printf("%v: The randomly selected download LBC (at index %d) was invalid. Skipping.\n", debug, randomLbcsIndex) + fmt.Printf( + "%v: The randomly selected download LBC (at index %d) was invalid. Skipping.\n", + debug, + randomLbcsIndex, + ) } // Protect against pathological cases where we continuously select invalid connections and never @@ -447,7 +597,10 @@ func main() { case sequentialRTTsTime := <-utilities.CalculateSequentialRTTsTime(operatingCtx, downloadSaturation.Lbcs[randomLbcsIndex].Client(), &http.Client{}, config.Urls.SmallUrl): { if sequentialRTTsTime.Err != nil { - fmt.Printf("Failed to calculate a time for sequential RTTs: %v\n", sequentialRTTsTime.Err) + fmt.Printf( + "Failed to calculate a time for sequential RTTs: %v\n", + sequentialRTTsTime.Err, + ) continue } // We know that we have a good Sequential RTT. @@ -460,8 +613,18 @@ func main() { } } - fmt.Printf("Download: %f MBps (%f Mbps), using %d parallel connections.\n", utilities.ToMBps(downloadSaturation.RateBps), utilities.ToMbps(downloadSaturation.RateBps), len(downloadSaturation.Lbcs)) - fmt.Printf("Upload: %f MBps (%f Mbps), using %d parallel connections.\n", utilities.ToMBps(uploadSaturation.RateBps), utilities.ToMbps(uploadSaturation.RateBps), len(uploadSaturation.Lbcs)) + fmt.Printf( + "Download: %f MBps (%f Mbps), using %d parallel connections.\n", + utilities.ToMBps(downloadSaturation.RateBps), + utilities.ToMbps(downloadSaturation.RateBps), + len(downloadSaturation.Lbcs), + ) + fmt.Printf( + "Upload: %f MBps (%f Mbps), using %d parallel connections.\n", + utilities.ToMBps(uploadSaturation.RateBps), + utilities.ToMbps(uploadSaturation.RateBps), + len(uploadSaturation.Lbcs), + ) if totalRTTsCount != 0 { rpm := float64(time.Minute.Seconds()) / (totalRTTTime / (float64(totalRTTsCount))) diff --git a/utilities/utilities.go b/utilities/utilities.go index 337aa5e..c74e99b 100644 --- a/utilities/utilities.go +++ b/utilities/utilities.go @@ -25,7 +25,8 @@ import ( ) func IsInterfaceNil(ifc interface{}) bool { - return ifc == nil || (reflect.ValueOf(ifc).Kind() == reflect.Ptr && reflect.ValueOf(ifc).IsNil()) + return ifc == nil || + (reflect.ValueOf(ifc).Kind() == reflect.Ptr && reflect.ValueOf(ifc).IsNil()) } func SignedPercentDifference(current float64, previous float64) (difference float64) { @@ -56,7 +57,12 @@ type GetLatency struct { Err error } -func CalculateSequentialRTTsTime(ctx context.Context, client_a *http.Client, client_b *http.Client, url string) chan GetLatency { +func CalculateSequentialRTTsTime( + ctx context.Context, + client_a *http.Client, + client_b *http.Client, + url string, +) chan GetLatency { responseChannel := make(chan GetLatency) go func() { before := time.Now() |
