From e184c0808b980e81ee87791264a7fa030f52e962 Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Wed, 11 May 2022 10:25:25 -0400 Subject: Refactor RPM calculations Refactor RPM calculations to make them more reusble. Enable (perhaps temporarily) probe calculations on the saturated connections. This change is to check whether this makes our RPM calculations match the ones generated by Apple's native client. --- rpm/rpm.go | 113 +++++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 68 insertions(+), 45 deletions(-) (limited to 'rpm') diff --git a/rpm/rpm.go b/rpm/rpm.go index e01e2e8..40a7729 100644 --- a/rpm/rpm.go +++ b/rpm/rpm.go @@ -252,6 +252,55 @@ func (probe *Probe) SetHttpResponseReadyTime( } } +func getLatency(ctx context.Context, probe *Probe, url string, debugLevel debug.DebugLevel) utilities.GetLatency { + before := time.Now() + c_b_req, err := http.NewRequestWithContext( + httptrace.WithClientTrace(ctx, probe.GetTrace()), + "GET", + url, + nil, + ) + if err != nil { + return utilities.GetLatency{Delay: 0, RoundTripCount: 0, Err: err} + } + + c_b, err := probe.client.Do(c_b_req) + if err != nil { + return utilities.GetLatency{Delay: 0, RoundTripCount: 0, Err: err} + } + + // TODO: Make this interruptable somehow by using _ctx_. + _, err = io.ReadAll(c_b.Body) + if err != nil { + return utilities.GetLatency{Delay: 0, Err: err} + } + after := time.Now() + + // Depending on whether we think that Close() requires another RTT (via TCP), we + // may need to move this before/after capturing the after time. + c_b.Body.Close() + + sanity := after.Sub(before) + + tlsAndHttpHeaderDelta := probe.GetTLSAndHttpHeaderDelta() // Constitutes 2 RTT, per the Spec. + httpDownloadDelta := probe.GetHttpDownloadDelta(after) // Constitutes 1 RTT, per the Spec. + dnsDelta := probe.GetDnsDelta() // Constitutes 1 RTT, per the Spec. + tcpDelta := probe.GetTCPDelta() // Constitutes 1 RTT, per the Spec. + totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + dnsDelta + tcpDelta + + if debug.IsDebug(debugLevel) { + fmt.Printf( + "(Probe %v) sanity vs total: %v vs %v\n", + probe.ProbeId(), + sanity, + totalDelay, + ) + } + + roundTripCount := uint16(5) // According to addition, there are 5 RTTs that we measured. + return utilities.GetLatency{Delay: totalDelay, RoundTripCount: roundTripCount, Err: nil} +} + func CalculateSequentialRTTsTime( ctx context.Context, saturated_rtt_probe *Probe, @@ -261,8 +310,17 @@ func CalculateSequentialRTTsTime( ) chan utilities.GetLatency { responseChannel := make(chan utilities.GetLatency) go func() { - before := time.Now() - roundTripCount := uint16(0) + + if debug.IsDebug(debugLevel) { + fmt.Printf("Beginning saturated RTT probe.\n") + } + saturated_latency := getLatency(ctx, saturated_rtt_probe, url, debugLevel) + + if saturated_latency.Err != nil { + fmt.Printf("Error occurred getting the saturated RTT.\n") + responseChannel <- saturated_latency + return + } /* TODO: We are not going to measure round-trip times on the load-generating connection right now because we are dealing with a massive amount of buffer bloat on the @@ -289,54 +347,19 @@ func CalculateSequentialRTTsTime( roundTripCount += 5 c_a.Body.Close() */ - c_b_req, err := http.NewRequestWithContext( - httptrace.WithClientTrace(ctx, new_rtt_probe.GetTrace()), - "GET", - url, - nil, - ) - if err != nil { - responseChannel <- utilities.GetLatency{Delay: 0, RoundTripCount: 0, Err: err} - return - } - - c_b, err := new_rtt_probe.client.Do(c_b_req) - if err != nil { - responseChannel <- utilities.GetLatency{Delay: 0, RoundTripCount: 0, Err: err} - return + if debug.IsDebug(debugLevel) { + fmt.Printf("Beginning unsaturated RTT probe.\n") } + new_rtt_latency := getLatency(ctx, new_rtt_probe, url, debugLevel) - // TODO: Make this interruptable somehow by using _ctx_. - _, err = io.ReadAll(c_b.Body) - if err != nil { - responseChannel <- utilities.GetLatency{Delay: 0, Err: err} + if new_rtt_latency.Err != nil { + fmt.Printf("Error occurred getting the unsaturated RTT.\n") + responseChannel <- new_rtt_latency return } - after := time.Now() - - // Depending on whether we think that Close() requires another RTT (via TCP), we - // may need to move this before/after capturing the after time. - c_b.Body.Close() - - sanity := after.Sub(before) - - tlsAndHttpHeaderDelta := new_rtt_probe.GetTLSAndHttpHeaderDelta() // Constitutes 2 RTT, per the Spec. - httpDownloadDelta := new_rtt_probe.GetHttpDownloadDelta(after) // Constitutes 1 RTT, per the Spec. - dnsDelta := new_rtt_probe.GetDnsDelta() // Constitutes 1 RTT, per the Spec. - tcpDelta := new_rtt_probe.GetTCPDelta() // Constitutes 1 RTT, per the Spec. - totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + dnsDelta + tcpDelta - - if debug.IsDebug(debugLevel) { - fmt.Printf( - "(Probe %v) sanity vs total: %v vs %v\n", - new_rtt_probe.ProbeId(), - sanity, - totalDelay, - ) - } - roundTripCount += 5 // According to addition, there are 5 RTTs that we measured. - responseChannel <- utilities.GetLatency{Delay: totalDelay, RoundTripCount: roundTripCount, Err: nil} + responseChannel <- utilities.GetLatency{Delay: saturated_latency.Delay + new_rtt_latency.Delay, RoundTripCount: uint16(10), Err: nil} + return }() return responseChannel } -- cgit v1.2.3 From 49301ae7f6ef1c479fe3b23cba29d74a815d15ee Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Wed, 11 May 2022 14:35:26 -0400 Subject: [Bugfix] Support reused connections in calculating RPM There was an error in prior versions when calculating the RPM in the presence of reused connections because invalid time values were being compared. This patch fixes that error. --- rpm/rpm.go | 69 ++++++++++++++++++++++++++++++---------------------------- stats/stats.go | 2 ++ 2 files changed, 38 insertions(+), 33 deletions(-) (limited to 'rpm') diff --git a/rpm/rpm.go b/rpm/rpm.go index 40a7729..34501c8 100644 --- a/rpm/rpm.go +++ b/rpm/rpm.go @@ -36,6 +36,9 @@ func (p *Probe) GetTrace() *httptrace.ClientTrace { } func (p *Probe) GetDnsDelta() time.Duration { + if p.stats.ConnectionReused { + return time.Duration(0) + } delta := p.stats.DnsDoneTime.Sub(p.stats.DnsStartTime) if debug.IsDebug(p.debug) { fmt.Printf("(Probe %v): DNS Time: %v\n", p.probeid, delta) @@ -44,6 +47,9 @@ func (p *Probe) GetDnsDelta() time.Duration { } func (p *Probe) GetTCPDelta() time.Duration { + if p.stats.ConnectionReused { + return time.Duration(0) + } delta := p.stats.ConnectDoneTime.Sub(p.stats.ConnectStartTime) if debug.IsDebug(p.debug) { fmt.Printf("(Probe %v): TCP Connection Time: %v\n", p.probeid, delta) @@ -70,7 +76,14 @@ func (p *Probe) GetTLSAndHttpHeaderDelta() time.Duration { // *and* the TLS handshake RTT, whether we can specifically measure the latter // or not. Eventually when TLS handshake tracing is fixed, we can break these // into separate buckets, but for now this workaround is reasonable. - delta := p.stats.HttpResponseReadyTime.Sub(p.stats.ConnectDoneTime) + before := p.stats.ConnectDoneTime + if p.stats.ConnectionReused { + // When we reuse a connection there will be no time logged for when the + // TCP connection was established (obviously). So, fall back to the time + // when were notified about reusing a connection (as a close approximation!). + before = p.stats.GetConnectionDoneTime + } + delta := p.stats.HttpResponseReadyTime.Sub(before) if debug.IsDebug(p.debug) { fmt.Printf("(Probe %v): Http TLS and Header Time: %v\n", p.probeid, delta) } @@ -184,9 +197,15 @@ func (probe *Probe) SetGotConnTimeInfo( ) { probe.stats.GetConnectionDoneTime = now probe.stats.ConnInfo = gotConnInfo + probe.stats.ConnectionReused = gotConnInfo.Reused if debug.IsDebug(probe.debug) { + reusedString := "(new)" + if probe.stats.ConnectionReused { + reusedString = "(reused)" + } fmt.Printf( - "(Probe) Got connection for %v at %v with info %v\n", + "(Probe) Got %v connection for %v at %v with info %v\n", + reusedString, probe.ProbeId(), probe.stats.GetConnectionDoneTime, probe.stats.ConnInfo, @@ -288,6 +307,15 @@ func getLatency(ctx context.Context, probe *Probe, url string, debugLevel debug. tcpDelta := probe.GetTCPDelta() // Constitutes 1 RTT, per the Spec. totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + dnsDelta + tcpDelta + // By default, assume that there was a reused connection which + // means that we only made 2 round trips. + roundTripCount := uint16(2) + if !probe.stats.ConnectionReused { + // If we did not reuse the connection, then we made three additional RTTs -- one for the DNS, + // one for the TCP, one for the TLS. + roundTripCount = 5 + } + if debug.IsDebug(debugLevel) { fmt.Printf( "(Probe %v) sanity vs total: %v vs %v\n", @@ -296,8 +324,6 @@ func getLatency(ctx context.Context, probe *Probe, url string, debugLevel debug. totalDelay, ) } - - roundTripCount := uint16(5) // According to addition, there are 5 RTTs that we measured. return utilities.GetLatency{Delay: totalDelay, RoundTripCount: roundTripCount, Err: nil} } @@ -310,7 +336,11 @@ func CalculateSequentialRTTsTime( ) chan utilities.GetLatency { responseChannel := make(chan utilities.GetLatency) go func() { - + /* + TODO: We *are* measuring round-trip times on the load-generating connection + right now. However, it is not clear if Apple is doing the same in their native + client. We will have to adjust based on that. + */ if debug.IsDebug(debugLevel) { fmt.Printf("Beginning saturated RTT probe.\n") } @@ -321,32 +351,6 @@ func CalculateSequentialRTTsTime( responseChannel <- saturated_latency return } - /* - TODO: We are not going to measure round-trip times on the load-generating connection - right now because we are dealing with a massive amount of buffer bloat on the - Apple CDN. - - TODO: When this functionality is enabled, we may need to change the assertion in - the GotConn callback in the Traceable interface in traceable.go because a connection - will be reused in that case. If such a situation does come to pass, we will want to - move that assertion in to the various Traceable interface implementations that continue - to rely on this assertion. - - c_a, err := saturated_client.Get(url) - if err != nil { - responseChannel <- GetLatency{Delay: 0, RTTs: 0, Err: err} - return - } - // TODO: Make this interruptable somehow - // by using _ctx_. - _, err = io.ReadAll(c_a.Body) - if err != nil { - responseChannel <- GetLatency{Delay: 0, RTTs: 0, Err: err} - return - } - roundTripCount += 5 - c_a.Body.Close() - */ if debug.IsDebug(debugLevel) { fmt.Printf("Beginning unsaturated RTT probe.\n") } @@ -357,8 +361,7 @@ func CalculateSequentialRTTsTime( responseChannel <- new_rtt_latency return } - - responseChannel <- utilities.GetLatency{Delay: saturated_latency.Delay + new_rtt_latency.Delay, RoundTripCount: uint16(10), Err: nil} + responseChannel <- utilities.GetLatency{Delay: saturated_latency.Delay + new_rtt_latency.Delay, RoundTripCount: saturated_latency.RoundTripCount + new_rtt_latency.RoundTripCount, Err: nil} return }() return responseChannel diff --git a/stats/stats.go b/stats/stats.go index a636326..f5ae4cb 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -22,6 +22,7 @@ type TraceStats struct { TLSDoneTime utilities.Optional[time.Time] ConnectStartTime time.Time ConnectDoneTime time.Time + ConnectionReused bool GetConnectionStartTime time.Time GetConnectionDoneTime time.Time HttpWroteRequestTime time.Time @@ -44,6 +45,7 @@ func (s *TraceStats) String() string { fmt.Sprintf("TLSDoneTime: %v\n", s.TLSDoneTime) + fmt.Sprintf("ConnectStartTime: %v\n", s.ConnectStartTime) + fmt.Sprintf("ConnectDoneTime: %v\n", s.ConnectDoneTime) + + fmt.Sprintf("ConnectionReused: %v\n", s.ConnectionReused) + fmt.Sprintf("GetConnectionStartTime: %v\n", s.GetConnectionStartTime) + fmt.Sprintf("GetConnectionDoneTime: %v\n", s.GetConnectionDoneTime) + fmt.Sprintf("HttpResponseReadyTime: %v\n", s.HttpResponseReadyTime) -- cgit v1.2.3 From fdfe96b53ef8f4532e5b6f65f86ba39fe242cc5b Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Wed, 11 May 2022 16:49:09 -0400 Subject: [Refactor/Bugfix] Refactor RPM calculation and fix calculation algorithm As it turns out, I was misreading the algorithm for calculating the RPM based upon the measurements taken during execution. This patch fixes that mistake and also (starts) renames "RTT" as "measurement" (those are technically a better nomenclature according to the spec.) --- constants/constants.go | 3 ++ networkQuality.go | 9 ++++-- rpm/rpm.go | 86 +++++++++++++++++++++++++++++--------------------- utilities/utilities.go | 8 ++--- 4 files changed, 64 insertions(+), 42 deletions(-) (limited to 'rpm') diff --git a/constants/constants.go b/constants/constants.go index 1a060dd..2f906b6 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -30,4 +30,7 @@ var ( DefaultDebug bool = false // The default URL for the config host. DefaultConfigHost string = "networkquality.example.com" + + // The default decision about whether to run the test in strict mode. + DefaultStrict bool = false ) diff --git a/networkQuality.go b/networkQuality.go index 6b80a7e..2aba054 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -64,6 +64,11 @@ var ( constants.DefaultDebug, "Enable debugging.", ) + strictFlag = flag.Bool( + "strict", + constants.DefaultStrict, + "Whether to run the test in strict mode (measure HTTP get time on load-generating connection)", + ) timeout = flag.Int( "timeout", constants.DefaultTestTime, @@ -680,7 +685,7 @@ func main() { { rttTimeout = true } - case sequentialRTTimes := <-rpm.CalculateSequentialRTTsTime(operatingCtx, saturatedRTTProbe, newRTTProbe, config.Urls.SmallUrl, debugLevel): + case sequentialRTTimes := <-rpm.CalculateProbeMeasurements(operatingCtx, *strictFlag, saturatedRTTProbe, newRTTProbe, config.Urls.SmallUrl, debugLevel): { if sequentialRTTimes.Err != nil { fmt.Printf( @@ -694,7 +699,7 @@ func main() { fmt.Printf("rttProbe: %v\n", newRTTProbe) } // We know that we have a good Sequential RTT. - totalRTsCount += uint64(sequentialRTTimes.RoundTripCount) + totalRTsCount += uint64(sequentialRTTimes.MeasurementCount) totalRTTimes += sequentialRTTimes.Delay.Seconds() if debug.IsDebug(debugLevel) { fmt.Printf( diff --git a/rpm/rpm.go b/rpm/rpm.go index 34501c8..f41ad92 100644 --- a/rpm/rpm.go +++ b/rpm/rpm.go @@ -271,49 +271,49 @@ func (probe *Probe) SetHttpResponseReadyTime( } } -func getLatency(ctx context.Context, probe *Probe, url string, debugLevel debug.DebugLevel) utilities.GetLatency { - before := time.Now() - c_b_req, err := http.NewRequestWithContext( +func getLatency(ctx context.Context, probe *Probe, url string, debugLevel debug.DebugLevel) utilities.MeasurementResult { + time_before_probe := time.Now() + probe_req, err := http.NewRequestWithContext( httptrace.WithClientTrace(ctx, probe.GetTrace()), "GET", url, nil, ) if err != nil { - return utilities.GetLatency{Delay: 0, RoundTripCount: 0, Err: err} + return utilities.MeasurementResult{Delay: 0, MeasurementCount: 0, Err: err} } - c_b, err := probe.client.Do(c_b_req) + probe_resp, err := probe.client.Do(probe_req) if err != nil { - return utilities.GetLatency{Delay: 0, RoundTripCount: 0, Err: err} + return utilities.MeasurementResult{Delay: 0, MeasurementCount: 0, Err: err} } // TODO: Make this interruptable somehow by using _ctx_. - _, err = io.ReadAll(c_b.Body) + _, err = io.ReadAll(probe_resp.Body) if err != nil { - return utilities.GetLatency{Delay: 0, Err: err} + return utilities.MeasurementResult{Delay: 0, Err: err} } - after := time.Now() + time_after_probe := time.Now() // Depending on whether we think that Close() requires another RTT (via TCP), we // may need to move this before/after capturing the after time. - c_b.Body.Close() + probe_resp.Body.Close() - sanity := after.Sub(before) + sanity := time_after_probe.Sub(time_before_probe) - tlsAndHttpHeaderDelta := probe.GetTLSAndHttpHeaderDelta() // Constitutes 2 RTT, per the Spec. - httpDownloadDelta := probe.GetHttpDownloadDelta(after) // Constitutes 1 RTT, per the Spec. - dnsDelta := probe.GetDnsDelta() // Constitutes 1 RTT, per the Spec. - tcpDelta := probe.GetTCPDelta() // Constitutes 1 RTT, per the Spec. + tlsAndHttpHeaderDelta := probe.GetTLSAndHttpHeaderDelta() + httpDownloadDelta := probe.GetHttpDownloadDelta(time_after_probe) // Combined with above, constitutes 2 time measurements, per the Spec. + dnsDelta := probe.GetDnsDelta() // Constitutes 1 time measurement, per the Spec. + tcpDelta := probe.GetTCPDelta() // Constitutes 1 time measurement, per the Spec. totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + dnsDelta + tcpDelta // By default, assume that there was a reused connection which - // means that we only made 2 round trips. - roundTripCount := uint16(2) + // means that we only made 1 time measurement. + var measurementCount uint16 = 1 if !probe.stats.ConnectionReused { - // If we did not reuse the connection, then we made three additional RTTs -- one for the DNS, - // one for the TCP, one for the TLS. - roundTripCount = 5 + // If we did not reuse the connection, then we made three additional time measurements. + // See above for details on that calculation. + measurementCount = 4 } if debug.IsDebug(debugLevel) { @@ -324,44 +324,58 @@ func getLatency(ctx context.Context, probe *Probe, url string, debugLevel debug. totalDelay, ) } - return utilities.GetLatency{Delay: totalDelay, RoundTripCount: roundTripCount, Err: nil} + return utilities.MeasurementResult{Delay: totalDelay, MeasurementCount: measurementCount, Err: nil} } -func CalculateSequentialRTTsTime( +func CalculateProbeMeasurements( ctx context.Context, - saturated_rtt_probe *Probe, - new_rtt_probe *Probe, + strict bool, + saturated_measurement_probe *Probe, + new_measurement_probe *Probe, url string, debugLevel debug.DebugLevel, -) chan utilities.GetLatency { - responseChannel := make(chan utilities.GetLatency) +) chan utilities.MeasurementResult { + responseChannel := make(chan utilities.MeasurementResult) go func() { /* TODO: We *are* measuring round-trip times on the load-generating connection right now. However, it is not clear if Apple is doing the same in their native client. We will have to adjust based on that. */ - if debug.IsDebug(debugLevel) { - fmt.Printf("Beginning saturated RTT probe.\n") + var saturated_latency utilities.MeasurementResult + if strict { + + if debug.IsDebug(debugLevel) { + fmt.Printf("Beginning saturated RTT probe.\n") + } + saturated_latency := getLatency(ctx, saturated_measurement_probe, url, debugLevel) + + if saturated_latency.Err != nil { + fmt.Printf("Error occurred getting the saturated RTT.\n") + responseChannel <- saturated_latency + return + } } - saturated_latency := getLatency(ctx, saturated_rtt_probe, url, debugLevel) - if saturated_latency.Err != nil { - fmt.Printf("Error occurred getting the saturated RTT.\n") - responseChannel <- saturated_latency - return - } if debug.IsDebug(debugLevel) { fmt.Printf("Beginning unsaturated RTT probe.\n") } - new_rtt_latency := getLatency(ctx, new_rtt_probe, url, debugLevel) + new_rtt_latency := getLatency(ctx, new_measurement_probe, url, debugLevel) if new_rtt_latency.Err != nil { fmt.Printf("Error occurred getting the unsaturated RTT.\n") responseChannel <- new_rtt_latency return } - responseChannel <- utilities.GetLatency{Delay: saturated_latency.Delay + new_rtt_latency.Delay, RoundTripCount: saturated_latency.RoundTripCount + new_rtt_latency.RoundTripCount, Err: nil} + + total_delay := new_rtt_latency.Delay + total_measurement_count := new_rtt_latency.MeasurementCount + + if strict { + total_delay += saturated_latency.Delay + total_measurement_count += saturated_latency.MeasurementCount + } + responseChannel <- utilities.MeasurementResult{Delay: total_delay, MeasurementCount: total_measurement_count, Err: nil} return }() return responseChannel diff --git a/utilities/utilities.go b/utilities/utilities.go index 160368b..b1b180f 100644 --- a/utilities/utilities.go +++ b/utilities/utilities.go @@ -61,10 +61,10 @@ func ToMBps(bytes float64) float64 { return float64(bytes) / float64(1024*1024) } -type GetLatency struct { - Delay time.Duration - RoundTripCount uint16 - Err error +type MeasurementResult struct { + Delay time.Duration + MeasurementCount uint16 + Err error } func SeekForAppend(file *os.File) (err error) { -- cgit v1.2.3 From f0dcb55643612ab05137314b46ee66ccf9308a53 Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Thu, 12 May 2022 15:45:40 -0400 Subject: [Functionality] Match Apple's Client RPM Calcuation -- Remove DNS Per a (to-be-released) update from the spec, the time for a DNS lookup is no longer considered to make up a portion of the RPM. --- rpm/rpm.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'rpm') diff --git a/rpm/rpm.go b/rpm/rpm.go index f41ad92..a349cee 100644 --- a/rpm/rpm.go +++ b/rpm/rpm.go @@ -303,9 +303,8 @@ func getLatency(ctx context.Context, probe *Probe, url string, debugLevel debug. tlsAndHttpHeaderDelta := probe.GetTLSAndHttpHeaderDelta() httpDownloadDelta := probe.GetHttpDownloadDelta(time_after_probe) // Combined with above, constitutes 2 time measurements, per the Spec. - dnsDelta := probe.GetDnsDelta() // Constitutes 1 time measurement, per the Spec. tcpDelta := probe.GetTCPDelta() // Constitutes 1 time measurement, per the Spec. - totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + dnsDelta + tcpDelta + totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + tcpDelta // By default, assume that there was a reused connection which // means that we only made 1 time measurement. @@ -313,7 +312,7 @@ func getLatency(ctx context.Context, probe *Probe, url string, debugLevel debug. if !probe.stats.ConnectionReused { // If we did not reuse the connection, then we made three additional time measurements. // See above for details on that calculation. - measurementCount = 4 + measurementCount = 3 } if debug.IsDebug(debugLevel) { -- cgit v1.2.3 From 89f26501e59095e1e6ac59cf158f6305e4e93389 Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Fri, 13 May 2022 11:02:09 -0400 Subject: [Refactor] Create config package and package-ify saturate() 1. Create a separate package to handle the config information. 2. Move the saturate functionality into the rpm package. 3. Do general renaming/refactoring so that we are consistently saying measurement and not RTT (this nomenclature is more consistent with the standard). --- config/config.go | 134 +++++++++++++++ constants/constants.go | 2 +- networkQuality.go | 443 ++++++------------------------------------------- rpm/rpm.go | 221 ++++++++++++++++++++++++ utilities/utilities.go | 5 + 5 files changed, 412 insertions(+), 393 deletions(-) create mode 100644 config/config.go (limited to 'rpm') diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..05ec61d --- /dev/null +++ b/config/config.go @@ -0,0 +1,134 @@ +package config + +import ( + "crypto/tls" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strings" + + "github.com/network-quality/goresponsiveness/utilities" + "golang.org/x/net/http2" +) + +type ConfigUrls struct { + SmallUrl string `json:"small_https_download_url"` + LargeUrl string `json:"large_https_download_url"` + UploadUrl string `json:"https_upload_url"` +} + +type Config struct { + Version int + Urls ConfigUrls `json:"urls"` + Source string + Test_Endpoint string +} + +func (c *Config) Get(configHost string, configPath string) error { + configTransport := http2.Transport{} + configTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + configClient := &http.Client{Transport: &configTransport} + // Extraneous /s in URLs is normally okay, but the Apple CDN does not + // like them. Make sure that we put exactly one (1) / between the host + // and the path. + if !strings.HasPrefix(configPath, "/") { + configPath = "/" + configPath + } + c.Source = fmt.Sprintf("https://%s%s", configHost, configPath) + resp, err := configClient.Get(c.Source) + if err != nil { + return fmt.Errorf( + "Error: Could not connect to configuration host %s: %v\n", + configHost, + err, + ) + } + + jsonConfig, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf( + "Error: Could not read configuration content downloaded from %s: %v\n", + c.Source, + err, + ) + } + + err = json.Unmarshal(jsonConfig, c) + if err != nil { + return fmt.Errorf( + "Error: Could not parse configuration returned from %s: %v\n", + c.Source, + err, + ) + } + + //if len(c.Test_Endpoint) != 0 { + if false { + tempUrl, err := url.Parse(c.Urls.LargeUrl) + if err != nil { + return fmt.Errorf("Error parsing large_https_download_url: %v", err) + } + c.Urls.LargeUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path + tempUrl, err = url.Parse(c.Urls.SmallUrl) + if err != nil { + return fmt.Errorf("Error parsing small_https_download_url: %v", err) + } + c.Urls.SmallUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path + tempUrl, err = url.Parse(c.Urls.UploadUrl) + if err != nil { + return fmt.Errorf("Error parsing https_upload_url: %v", err) + } + c.Urls.UploadUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path + } + return nil +} + +func (c *Config) String() string { + return fmt.Sprintf( + "Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s\nEndpoint: %s\n", + c.Version, + c.Urls.SmallUrl, + c.Urls.LargeUrl, + c.Urls.UploadUrl, + c.Test_Endpoint, + ) +} + +func (c *Config) IsValid() error { + if parsedUrl, err := url.ParseRequestURI(c.Urls.LargeUrl); err != nil || + parsedUrl.Scheme != "https" { + return fmt.Errorf( + "Configuration url large_https_download_url is invalid: %s", + utilities.Conditional( + len(c.Urls.LargeUrl) != 0, + c.Urls.LargeUrl, + "Missing", + ), + ) + } + if parsedUrl, err := url.ParseRequestURI(c.Urls.SmallUrl); err != nil || + parsedUrl.Scheme != "https" { + return fmt.Errorf( + "Configuration url small_https_download_url is invalid: %s", + utilities.Conditional( + len(c.Urls.SmallUrl) != 0, + c.Urls.SmallUrl, + "Missing", + ), + ) + } + if parsedUrl, err := url.ParseRequestURI(c.Urls.UploadUrl); err != nil || + parsedUrl.Scheme != "https" { + return fmt.Errorf( + "Configuration url https_upload_url is invalid: %s", + utilities.Conditional( + len(c.Urls.UploadUrl) != 0, + c.Urls.UploadUrl, + "Missing", + ), + ) + } + return nil +} diff --git a/constants/constants.go b/constants/constants.go index 2f906b6..147b643 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -18,7 +18,7 @@ var ( // The amount of time that the client will cooldown if it is in debug mode. CooldownPeriod time.Duration = 4 * time.Second // The number of probes to send when calculating RTT. - RPMProbeCount int = 5 + MeasurementProbeCount int = 5 // The amount of time that we give ourselves to calculate the RPM. RPMCalculationTime time.Duration = 10 * time.Second diff --git a/networkQuality.go b/networkQuality.go index 2aba054..ab9517e 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -17,25 +17,20 @@ package main import ( "context" "crypto/tls" - "encoding/json" "flag" "fmt" _ "io" - "io/ioutil" _ "log" - "math/rand" "net/http" - "net/url" "os" "runtime/pprof" - "strings" "time" "github.com/network-quality/goresponsiveness/ccw" + "github.com/network-quality/goresponsiveness/config" "github.com/network-quality/goresponsiveness/constants" "github.com/network-quality/goresponsiveness/debug" "github.com/network-quality/goresponsiveness/lgc" - "github.com/network-quality/goresponsiveness/ma" "github.com/network-quality/goresponsiveness/rpm" "github.com/network-quality/goresponsiveness/timeoutat" "github.com/network-quality/goresponsiveness/utilities" @@ -86,343 +81,6 @@ var ( ) ) -type ConfigUrls struct { - SmallUrl string `json:"small_https_download_url"` - LargeUrl string `json:"large_https_download_url"` - UploadUrl string `json:"https_upload_url"` -} - -type Config struct { - Version int - Urls ConfigUrls `json:"urls"` - Source string - Test_Endpoint string -} - -func (c *Config) Get(configHost string, configPath string) error { - configTransport := http2.Transport{} - configTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} - configClient := &http.Client{Transport: &configTransport} - // Extraneous /s in URLs is normally okay, but the Apple CDN does not - // like them. Make sure that we put exactly one (1) / between the host - // and the path. - if !strings.HasPrefix(configPath, "/") { - configPath = "/" + configPath - } - c.Source = fmt.Sprintf("https://%s%s", configHost, configPath) - resp, err := configClient.Get(c.Source) - if err != nil { - return fmt.Errorf( - "Error: Could not connect to configuration host %s: %v\n", - configHost, - err, - ) - } - - jsonConfig, err := ioutil.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf( - "Error: Could not read configuration content downloaded from %s: %v\n", - c.Source, - err, - ) - } - - err = json.Unmarshal(jsonConfig, c) - if err != nil { - return fmt.Errorf( - "Error: Could not parse configuration returned from %s: %v\n", - c.Source, - err, - ) - } - - //if len(c.Test_Endpoint) != 0 { - if false { - tempUrl, err := url.Parse(c.Urls.LargeUrl) - if err != nil { - return fmt.Errorf("Error parsing large_https_download_url: %v", err) - } - c.Urls.LargeUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path - tempUrl, err = url.Parse(c.Urls.SmallUrl) - if err != nil { - return fmt.Errorf("Error parsing small_https_download_url: %v", err) - } - c.Urls.SmallUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path - tempUrl, err = url.Parse(c.Urls.UploadUrl) - if err != nil { - return fmt.Errorf("Error parsing https_upload_url: %v", err) - } - c.Urls.UploadUrl = tempUrl.Scheme + "://" + c.Test_Endpoint + "/" + tempUrl.Path - } - return nil -} - -func (c *Config) String() string { - return fmt.Sprintf( - "Version: %d\nSmall URL: %s\nLarge URL: %s\nUpload URL: %s\nEndpoint: %s\n", - c.Version, - c.Urls.SmallUrl, - c.Urls.LargeUrl, - c.Urls.UploadUrl, - c.Test_Endpoint, - ) -} - -func (c *Config) IsValid() error { - if parsedUrl, err := url.ParseRequestURI(c.Urls.LargeUrl); err != nil || - parsedUrl.Scheme != "https" { - return fmt.Errorf( - "Configuration url large_https_download_url is invalid: %s", - utilities.Conditional( - len(c.Urls.LargeUrl) != 0, - c.Urls.LargeUrl, - "Missing", - ), - ) - } - if parsedUrl, err := url.ParseRequestURI(c.Urls.SmallUrl); err != nil || - parsedUrl.Scheme != "https" { - return fmt.Errorf( - "Configuration url small_https_download_url is invalid: %s", - utilities.Conditional( - len(c.Urls.SmallUrl) != 0, - c.Urls.SmallUrl, - "Missing", - ), - ) - } - if parsedUrl, err := url.ParseRequestURI(c.Urls.UploadUrl); err != nil || - parsedUrl.Scheme != "https" { - return fmt.Errorf( - "Configuration url https_upload_url is invalid: %s", - utilities.Conditional( - len(c.Urls.UploadUrl) != 0, - c.Urls.UploadUrl, - "Missing", - ), - ) - } - return nil -} - -func addFlows( - ctx context.Context, - toAdd uint64, - lgcs *[]lgc.LoadGeneratingConnection, - lgcsPreviousTransferred *[]uint64, - lgcGenerator func() lgc.LoadGeneratingConnection, - debug debug.DebugLevel, -) { - for i := uint64(0); i < toAdd; i++ { - *lgcs = append(*lgcs, lgcGenerator()) - *lgcsPreviousTransferred = append(*lgcsPreviousTransferred, 0) - if !(*lgcs)[len(*lgcs)-1].Start(ctx, debug) { - fmt.Printf( - "Error starting lgc with id %d!\n", - (*lgcs)[len(*lgcs)-1].ClientId(), - ) - return - } - } -} - -type SaturationResult struct { - RateBps float64 - lgcs []lgc.LoadGeneratingConnection -} - -func saturate( - saturationCtx context.Context, - operatingCtx context.Context, - lgcGenerator func() lgc.LoadGeneratingConnection, - debugging *debug.DebugWithPrefix, -) (saturated chan SaturationResult) { - saturated = make(chan SaturationResult) - go func() { - - lgcs := make([]lgc.LoadGeneratingConnection, 0) - lgcsPreviousTransferred := make([]uint64, 0) - - addFlows( - saturationCtx, - constants.StartingNumberOfLoadGeneratingConnections, - &lgcs, - &lgcsPreviousTransferred, - lgcGenerator, - debugging.Level, - ) - - previousFlowIncreaseIteration := uint64(0) - previousMovingAverage := float64(0) - movingAverage := ma.NewMovingAverage( - constants.MovingAverageIntervalCount, - ) - movingAverageAverage := ma.NewMovingAverage( - constants.MovingAverageIntervalCount, - ) - - nextSampleStartTime := time.Now().Add(time.Second) - - for currentIteration := uint64(0); true; currentIteration++ { - - // When the program stops operating, then stop. - if saturationCtx.Err() != nil { - return - } - - // We may be asked to stop trying to saturate the - // network and return our current status. - if saturationCtx.Err() != nil { - //break - } - - now := time.Now() - // At each 1-second interval - if nextSampleStartTime.Sub(now) > 0 { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Sleeping until %v\n", - debugging, - nextSampleStartTime, - ) - } - time.Sleep(nextSampleStartTime.Sub(now)) - } else { - fmt.Fprintf(os.Stderr, "Warning: Missed a one-second deadline.\n") - } - nextSampleStartTime = time.Now().Add(time.Second) - - // Compute "instantaneous aggregate" goodput which is the number of - // bytes transferred within the last second. - totalTransfer := uint64(0) - allInvalid := true - for i := range lgcs { - if !lgcs[i].IsValid() { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Load-generating connection with id %d is invalid ... skipping.\n", - debugging, - lgcs[i].ClientId(), - ) - } - continue - } - allInvalid = false - previousTransferred := lgcsPreviousTransferred[i] - currentTransferred := lgcs[i].Transferred() - totalTransfer += (currentTransferred - previousTransferred) - lgcsPreviousTransferred[i] = currentTransferred - } - - // For some reason, all the lgcs are invalid. This likely means that - // the network/server went away. - if allInvalid { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: All lgcs were invalid. Assuming that network/server went away.\n", - debugging, - ) - } - break - } - - // Compute a moving average of the last - // constants.MovingAverageIntervalCount "instantaneous aggregate - // goodput" measurements - movingAverage.AddMeasurement(float64(totalTransfer)) - currentMovingAverage := movingAverage.CalculateAverage() - movingAverageAverage.AddMeasurement(currentMovingAverage) - movingAverageDelta := utilities.SignedPercentDifference( - currentMovingAverage, - previousMovingAverage, - ) - - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Instantaneous goodput: %f MB.\n", - debugging, - utilities.ToMBps(float64(totalTransfer)), - ) - fmt.Printf( - "%v: Previous moving average: %f MB.\n", - debugging, - utilities.ToMBps(previousMovingAverage), - ) - fmt.Printf( - "%v: Current moving average: %f MB.\n", - debugging, - utilities.ToMBps(currentMovingAverage), - ) - fmt.Printf( - "%v: Moving average delta: %f.\n", - debugging, - movingAverageDelta, - ) - } - - previousMovingAverage = currentMovingAverage - - // Special case: We won't make any adjustments on the first - // iteration. - if currentIteration == 0 { - continue - } - - // If moving average > "previous" moving average + InstabilityDelta: - if movingAverageDelta > constants.InstabilityDelta { - // Network did not yet reach saturation. If no flows added - // within the last 4 seconds, add 4 more flows - if (currentIteration - previousFlowIncreaseIteration) > uint64( - constants.MovingAverageStabilitySpan, - ) { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Adding flows because we are unsaturated and waited a while.\n", - debugging, - ) - } - addFlows( - saturationCtx, - constants.AdditiveNumberOfLoadGeneratingConnections, - &lgcs, - &lgcsPreviousTransferred, - lgcGenerator, - debugging.Level, - ) - previousFlowIncreaseIteration = currentIteration - } else { - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debugging) - } - } - } else { // Else, network reached saturation for the current flow count. - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: Network reached saturation with current flow count.\n", debugging) - } - // If new flows added and for 4 seconds the moving average - // throughput did not change: network reached stable saturation - if (currentIteration-previousFlowIncreaseIteration) < uint64(constants.MovingAverageStabilitySpan) && movingAverageAverage.AllSequentialIncreasesLessThan(float64(5)) { - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging) - } - break - } else { - // Else, add four more flows - if debug.IsDebug(debugging.Level) { - fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging) - } - addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, &lgcsPreviousTransferred, lgcGenerator, debugging.Level) - previousFlowIncreaseIteration = currentIteration - } - } - - } - saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), lgcs: lgcs} - }() - return -} - func main() { flag.Parse() @@ -433,7 +91,7 @@ func main() { saturationCtx, cancelSaturationCtx := context.WithCancel( context.Background(), ) - config := &Config{} + config := &config.Config{} var debugLevel debug.DebugLevel = debug.Error if *debugCliFlag { @@ -508,6 +166,10 @@ func main() { } } + /* + * Create (and then, ironically, name) two anonymous functions that, when invoked, + * will create load-generating connections for upload/download/ + */ generate_lbd := func() lgc.LoadGeneratingConnection { return &lgc.LoadGeneratingConnectionDownload{ Path: config.Urls.LargeUrl, @@ -524,13 +186,13 @@ func main() { var downloadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "download") var uploadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "upload") - downloadSaturationChannel := saturate( + downloadSaturationChannel := rpm.Saturate( saturationCtx, operatingCtx, generate_lbd, downloadDebugging, ) - uploadSaturationChannel := saturate( + uploadSaturationChannel := rpm.Saturate( saturationCtx, operatingCtx, generate_lbu, @@ -540,8 +202,8 @@ func main() { saturationTimeout := false uploadSaturated := false downloadSaturated := false - downloadSaturation := SaturationResult{} - uploadSaturation := SaturationResult{} + downloadSaturation := rpm.SaturationResult{} + uploadSaturation := rpm.SaturationResult{} for !(uploadSaturated && downloadSaturated) { select { @@ -557,7 +219,7 @@ func main() { "", ), utilities.ToMBps(downloadSaturation.RateBps), - len(downloadSaturation.lgcs), + len(downloadSaturation.LGCs), ) } } @@ -573,7 +235,7 @@ func main() { "", ), utilities.ToMBps(uploadSaturation.RateBps), - len(uploadSaturation.lgcs), + len(uploadSaturation.LGCs), ) } } @@ -585,7 +247,7 @@ func main() { // will exit! fmt.Fprint( os.Stderr, - "Error: Saturation could not be completed in time and no provisional rates could be accessed. Test failed.\n", + "Error: Saturation could not be completed in time and no provisional rates could be assessed. Test failed.\n", ) cancelOperatingCtx() if *debugCliFlag { @@ -629,25 +291,21 @@ func main() { ) } - totalRTsCount := uint64(0) - totalRTTimes := float64(0) - rttTimeout := false + totalMeasurements := uint64(0) + totalMeasurementTimes := float64(0) + measurementTimeout := false - for i := 0; i < constants.RPMProbeCount && !rttTimeout; i++ { - if len(downloadSaturation.lgcs) == 0 { + for i := 0; i < constants.MeasurementProbeCount && !measurementTimeout; i++ { + if len(downloadSaturation.LGCs) == 0 { continue } - randomlgcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))). - Int() % - len( - downloadSaturation.lgcs, - ) - if !downloadSaturation.lgcs[randomlgcsIndex].IsValid() { + randomLGCsIndex := utilities.RandBetween(len(downloadSaturation.LGCs)) + if !downloadSaturation.LGCs[randomLGCsIndex].IsValid() { if *debugCliFlag { fmt.Printf( - "%v: The randomly selected download lgc (with id %d) was invalid. Skipping.\n", + "%v: The randomly selected saturated connection (with id %d) was invalid. Skipping.\n", debugCliFlag, - downloadSaturation.lgcs[randomlgcsIndex].ClientId(), + downloadSaturation.LGCs[randomLGCsIndex].ClientId(), ) } @@ -657,7 +315,7 @@ func main() { if time.Since(timeoutAbsoluteTime) > 0 { if *debugCliFlag { fmt.Printf( - "Pathologically could not find valid lgcs to use for measurement.\n", + "Pathologically could not find valid saturated connections use for measurement.\n", ) } break @@ -665,46 +323,47 @@ func main() { continue } - newTransport := http2.Transport{} - newTransport.TLSClientConfig = &tls.Config{} + unsaturatedMeasurementTransport := http2.Transport{} + unsaturatedMeasurementTransport.TLSClientConfig = &tls.Config{} if sslKeyFileConcurrentWriter != nil { - newTransport.TLSClientConfig.KeyLogWriter = sslKeyFileConcurrentWriter + unsaturatedMeasurementTransport.TLSClientConfig.KeyLogWriter = sslKeyFileConcurrentWriter } - newTransport.TLSClientConfig.InsecureSkipVerify = true - newClient := http.Client{Transport: &newTransport} + unsaturatedMeasurementTransport.TLSClientConfig.InsecureSkipVerify = true + newClient := http.Client{Transport: &unsaturatedMeasurementTransport} - newRTTProbe := rpm.NewProbe(&newClient, debugLevel) + unsaturatedMeasurementProbe := rpm.NewProbe(&newClient, debugLevel) - saturatedRTTProbe := rpm.NewProbe( - downloadSaturation.lgcs[randomlgcsIndex].Client(), + saturatedMeasurementProbe := rpm.NewProbe( + downloadSaturation.LGCs[randomLGCsIndex].Client(), debugLevel, ) select { case <-timeoutChannel: { - rttTimeout = true + measurementTimeout = true } - case sequentialRTTimes := <-rpm.CalculateProbeMeasurements(operatingCtx, *strictFlag, saturatedRTTProbe, newRTTProbe, config.Urls.SmallUrl, debugLevel): + case sequentialMeasurementTimes := <-rpm.CalculateProbeMeasurements(operatingCtx, *strictFlag, saturatedMeasurementProbe, unsaturatedMeasurementProbe, config.Urls.SmallUrl, debugLevel): { - if sequentialRTTimes.Err != nil { + if sequentialMeasurementTimes.Err != nil { fmt.Printf( - "Failed to calculate a time for sequential RTTs: %v\n", - sequentialRTTimes.Err, + "Failed to calculate a time for sequential measurements: %v\n", + sequentialMeasurementTimes.Err, ) continue } if debug.IsDebug(debugLevel) { - fmt.Printf("rttProbe: %v\n", newRTTProbe) + fmt.Printf("unsaturatedMeasurementProbe: %v\n", unsaturatedMeasurementProbe) } - // We know that we have a good Sequential RTT. - totalRTsCount += uint64(sequentialRTTimes.MeasurementCount) - totalRTTimes += sequentialRTTimes.Delay.Seconds() + // We know that we have a good Sequential measurement. + totalMeasurements += uint64(sequentialMeasurementTimes.MeasurementCount) + totalMeasurementTimes += sequentialMeasurementTimes.Delay.Seconds() if debug.IsDebug(debugLevel) { fmt.Printf( - "sequentialRTTsTime: %v\n", - sequentialRTTimes.Delay.Seconds(), + "most-recent sequential measurement time: %v; most-recent sequential measurement count: %v\n", + sequentialMeasurementTimes.Delay.Seconds(), + sequentialMeasurementTimes.MeasurementCount, ) } } @@ -715,31 +374,31 @@ func main() { "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", utilities.ToMbps(downloadSaturation.RateBps), utilities.ToMBps(downloadSaturation.RateBps), - len(downloadSaturation.lgcs), + len(downloadSaturation.LGCs), ) fmt.Printf( "Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", utilities.ToMbps(uploadSaturation.RateBps), utilities.ToMBps(uploadSaturation.RateBps), - len(uploadSaturation.lgcs), + len(uploadSaturation.LGCs), ) - if totalRTsCount != 0 { + if totalMeasurements != 0 { // "... it sums the five time values for each probe, and divides by the // total // number of probes to compute an average probe duration. The // reciprocal of this, normalized to 60 seconds, gives the Round-trips // Per Minute (RPM)." - // "average probe duration" = totalRTTimes / totalRTsCount. - // The reciprocol of this = 1 / (totalRTTimes / totalRTsCount) <- + // "average probe duration" = totalMeasurementTimes / totalMeasurements. + // The reciprocol of this = 1 / (totalMeasurementTimes / totalMeasurements) <- // semantically the probes-per-second. // Normalized to 60 seconds: 60 * (1 - // / (totalRTTimes / totalRTsCount))) <- semantically the number of + // / ((totalMeasurementTimes / totalMeasurements)))) <- semantically the number of // probes per minute. rpm := float64( time.Minute.Seconds(), - ) / (totalRTTimes / (float64(totalRTsCount))) - fmt.Printf("Total RTTs measured: %d\n", totalRTsCount) + ) / (totalMeasurementTimes / (float64(totalMeasurements))) + fmt.Printf("Total measurements: %d\n", totalMeasurements) fmt.Printf("RPM: %5.0f\n", rpm) } else { fmt.Printf("Error occurred calculating RPM -- no probe measurements received.\n") diff --git a/rpm/rpm.go b/rpm/rpm.go index a349cee..8f431b6 100644 --- a/rpm/rpm.go +++ b/rpm/rpm.go @@ -7,14 +7,235 @@ import ( "io" "net/http" "net/http/httptrace" + "os" "time" + "github.com/network-quality/goresponsiveness/constants" "github.com/network-quality/goresponsiveness/debug" + "github.com/network-quality/goresponsiveness/lgc" + "github.com/network-quality/goresponsiveness/ma" "github.com/network-quality/goresponsiveness/stats" "github.com/network-quality/goresponsiveness/traceable" "github.com/network-quality/goresponsiveness/utilities" ) +func addFlows( + ctx context.Context, + toAdd uint64, + lgcs *[]lgc.LoadGeneratingConnection, + lgcsPreviousTransferred *[]uint64, + lgcGenerator func() lgc.LoadGeneratingConnection, + debug debug.DebugLevel, +) { + for i := uint64(0); i < toAdd; i++ { + *lgcs = append(*lgcs, lgcGenerator()) + *lgcsPreviousTransferred = append(*lgcsPreviousTransferred, 0) + if !(*lgcs)[len(*lgcs)-1].Start(ctx, debug) { + fmt.Printf( + "Error starting lgc with id %d!\n", + (*lgcs)[len(*lgcs)-1].ClientId(), + ) + return + } + } +} + +type SaturationResult struct { + RateBps float64 + LGCs []lgc.LoadGeneratingConnection +} + +func Saturate( + saturationCtx context.Context, + operatingCtx context.Context, + lgcGenerator func() lgc.LoadGeneratingConnection, + debugging *debug.DebugWithPrefix, +) (saturated chan SaturationResult) { + saturated = make(chan SaturationResult) + go func() { + + lgcs := make([]lgc.LoadGeneratingConnection, 0) + lgcsPreviousTransferred := make([]uint64, 0) + + addFlows( + saturationCtx, + constants.StartingNumberOfLoadGeneratingConnections, + &lgcs, + &lgcsPreviousTransferred, + lgcGenerator, + debugging.Level, + ) + + previousFlowIncreaseIteration := uint64(0) + previousMovingAverage := float64(0) + movingAverage := ma.NewMovingAverage( + constants.MovingAverageIntervalCount, + ) + movingAverageAverage := ma.NewMovingAverage( + constants.MovingAverageIntervalCount, + ) + + nextSampleStartTime := time.Now().Add(time.Second) + + for currentIteration := uint64(0); true; currentIteration++ { + + // When the program stops operating, then stop. + if saturationCtx.Err() != nil { + return + } + + // We may be asked to stop trying to saturate the + // network and return our current status. + if saturationCtx.Err() != nil { + //break + } + + now := time.Now() + // At each 1-second interval + if nextSampleStartTime.Sub(now) > 0 { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Sleeping until %v\n", + debugging, + nextSampleStartTime, + ) + } + time.Sleep(nextSampleStartTime.Sub(now)) + } else { + fmt.Fprintf(os.Stderr, "Warning: Missed a one-second deadline.\n") + } + nextSampleStartTime = time.Now().Add(time.Second) + + // Compute "instantaneous aggregate" goodput which is the number of + // bytes transferred within the last second. + totalTransfer := uint64(0) + allInvalid := true + for i := range lgcs { + if !lgcs[i].IsValid() { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Load-generating connection with id %d is invalid ... skipping.\n", + debugging, + lgcs[i].ClientId(), + ) + } + continue + } + allInvalid = false + previousTransferred := lgcsPreviousTransferred[i] + currentTransferred := lgcs[i].Transferred() + totalTransfer += (currentTransferred - previousTransferred) + lgcsPreviousTransferred[i] = currentTransferred + } + + // For some reason, all the lgcs are invalid. This likely means that + // the network/server went away. + if allInvalid { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: All lgcs were invalid. Assuming that network/server went away.\n", + debugging, + ) + } + break + } + + // Compute a moving average of the last + // constants.MovingAverageIntervalCount "instantaneous aggregate + // goodput" measurements + movingAverage.AddMeasurement(float64(totalTransfer)) + currentMovingAverage := movingAverage.CalculateAverage() + movingAverageAverage.AddMeasurement(currentMovingAverage) + movingAverageDelta := utilities.SignedPercentDifference( + currentMovingAverage, + previousMovingAverage, + ) + + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Instantaneous goodput: %f MB.\n", + debugging, + utilities.ToMBps(float64(totalTransfer)), + ) + fmt.Printf( + "%v: Previous moving average: %f MB.\n", + debugging, + utilities.ToMBps(previousMovingAverage), + ) + fmt.Printf( + "%v: Current moving average: %f MB.\n", + debugging, + utilities.ToMBps(currentMovingAverage), + ) + fmt.Printf( + "%v: Moving average delta: %f.\n", + debugging, + movingAverageDelta, + ) + } + + previousMovingAverage = currentMovingAverage + + // Special case: We won't make any adjustments on the first + // iteration. + if currentIteration == 0 { + continue + } + + // If moving average > "previous" moving average + InstabilityDelta: + if movingAverageDelta > constants.InstabilityDelta { + // Network did not yet reach saturation. If no flows added + // within the last 4 seconds, add 4 more flows + if (currentIteration - previousFlowIncreaseIteration) > uint64( + constants.MovingAverageStabilitySpan, + ) { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Adding flows because we are unsaturated and waited a while.\n", + debugging, + ) + } + addFlows( + saturationCtx, + constants.AdditiveNumberOfLoadGeneratingConnections, + &lgcs, + &lgcsPreviousTransferred, + lgcGenerator, + debugging.Level, + ) + previousFlowIncreaseIteration = currentIteration + } else { + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debugging) + } + } + } else { // Else, network reached saturation for the current flow count. + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: Network reached saturation with current flow count.\n", debugging) + } + // If new flows added and for 4 seconds the moving average + // throughput did not change: network reached stable saturation + if (currentIteration-previousFlowIncreaseIteration) < uint64(constants.MovingAverageStabilitySpan) && movingAverageAverage.AllSequentialIncreasesLessThan(float64(5)) { + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging) + } + break + } else { + // Else, add four more flows + if debug.IsDebug(debugging.Level) { + fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging) + } + addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, &lgcsPreviousTransferred, lgcGenerator, debugging.Level) + previousFlowIncreaseIteration = currentIteration + } + } + + } + saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs} + }() + return +} + type Probe struct { client *http.Client stats *stats.TraceStats diff --git a/utilities/utilities.go b/utilities/utilities.go index 4b114ba..a143d31 100644 --- a/utilities/utilities.go +++ b/utilities/utilities.go @@ -17,6 +17,7 @@ package utilities import ( "fmt" "math" + "math/rand" "os" "reflect" "sync/atomic" @@ -117,3 +118,7 @@ func (optional Optional[S]) String() string { return "None" } } + +func RandBetween(max int) int { + return rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % max +} -- cgit v1.2.3 From 39e17236e4f759fdfa26660c9476528c82556faa Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Fri, 13 May 2022 19:50:05 -0400 Subject: [Refactor] Additional (general) refactoring. --- constants/constants.go | 2 +- rpm/rpm.go | 57 +++++++++++++++++++++++++++++--------------------- 2 files changed, 34 insertions(+), 25 deletions(-) (limited to 'rpm') diff --git a/constants/constants.go b/constants/constants.go index 147b643..a015f14 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -9,7 +9,7 @@ var ( // calculation. MovingAverageIntervalCount int = 4 // The number of intervals across which to consider a moving average stable. - MovingAverageStabilitySpan int = 4 + MovingAverageStabilitySpan uint64 = 4 // The number of connections to add to a LBC when unsaturated. AdditiveNumberOfLoadGeneratingConnections uint64 = 4 // The cutoff of the percent difference that defines instability. diff --git a/rpm/rpm.go b/rpm/rpm.go index 8f431b6..8956509 100644 --- a/rpm/rpm.go +++ b/rpm/rpm.go @@ -66,18 +66,26 @@ func Saturate( debugging.Level, ) - previousFlowIncreaseIteration := uint64(0) + previousFlowIncreaseInterval := uint64(0) previousMovingAverage := float64(0) + + // The moving average will contain the average for the last + // constants.MovingAverageIntervalCount throughputs. + // ie, ma[i] = (throughput[i-3] + throughput[i-2] + throughput[i-1] + throughput[i])/4 movingAverage := ma.NewMovingAverage( constants.MovingAverageIntervalCount, ) + + // The moving average average will be the average of the last + // constants.MovingAverageIntervalCount moving averages. + // ie, maa[i] = (ma[i-3] + ma[i-2] + ma[i-1] + ma[i])/4 movingAverageAverage := ma.NewMovingAverage( constants.MovingAverageIntervalCount, ) nextSampleStartTime := time.Now().Add(time.Second) - for currentIteration := uint64(0); true; currentIteration++ { + for currentInterval := uint64(0); true; currentInterval++ { // When the program stops operating, then stop. if saturationCtx.Err() != nil { @@ -124,7 +132,8 @@ func Saturate( allInvalid = false previousTransferred := lgcsPreviousTransferred[i] currentTransferred := lgcs[i].Transferred() - totalTransfer += (currentTransferred - previousTransferred) + instantaneousTransferred := currentTransferred - previousTransferred + totalTransfer += instantaneousTransferred lgcsPreviousTransferred[i] = currentTransferred } @@ -176,9 +185,11 @@ func Saturate( previousMovingAverage = currentMovingAverage + intervalsSinceLastFlowIncrease := currentInterval - previousFlowIncreaseInterval + // Special case: We won't make any adjustments on the first // iteration. - if currentIteration == 0 { + if currentInterval == 0 { continue } @@ -186,9 +197,7 @@ func Saturate( if movingAverageDelta > constants.InstabilityDelta { // Network did not yet reach saturation. If no flows added // within the last 4 seconds, add 4 more flows - if (currentIteration - previousFlowIncreaseIteration) > uint64( - constants.MovingAverageStabilitySpan, - ) { + if intervalsSinceLastFlowIncrease > constants.MovingAverageStabilitySpan { if debug.IsDebug(debugging.Level) { fmt.Printf( "%v: Adding flows because we are unsaturated and waited a while.\n", @@ -203,7 +212,7 @@ func Saturate( lgcGenerator, debugging.Level, ) - previousFlowIncreaseIteration = currentIteration + previousFlowIncreaseInterval = currentInterval } else { if debug.IsDebug(debugging.Level) { fmt.Printf("%v: We are unsaturated, but it still too early to add anything.\n", debugging) @@ -215,7 +224,7 @@ func Saturate( } // If new flows added and for 4 seconds the moving average // throughput did not change: network reached stable saturation - if (currentIteration-previousFlowIncreaseIteration) < uint64(constants.MovingAverageStabilitySpan) && movingAverageAverage.AllSequentialIncreasesLessThan(float64(5)) { + if intervalsSinceLastFlowIncrease < constants.MovingAverageStabilitySpan && movingAverageAverage.AllSequentialIncreasesLessThan(constants.InstabilityDelta) { if debug.IsDebug(debugging.Level) { fmt.Printf("%v: New flows added within the last four seconds and the moving-average average is consistent!\n", debugging) } @@ -226,7 +235,7 @@ func Saturate( fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging) } addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, &lgcsPreviousTransferred, lgcGenerator, debugging.Level) - previousFlowIncreaseIteration = currentIteration + previousFlowIncreaseInterval = currentInterval } } @@ -551,7 +560,7 @@ func CalculateProbeMeasurements( ctx context.Context, strict bool, saturated_measurement_probe *Probe, - new_measurement_probe *Probe, + unsaturated_measurement_probe *Probe, url string, debugLevel debug.DebugLevel, ) chan utilities.MeasurementResult { @@ -562,40 +571,40 @@ func CalculateProbeMeasurements( right now. However, it is not clear if Apple is doing the same in their native client. We will have to adjust based on that. */ - var saturated_latency utilities.MeasurementResult + var saturated_probe_latency utilities.MeasurementResult if strict { if debug.IsDebug(debugLevel) { - fmt.Printf("Beginning saturated RTT probe.\n") + fmt.Printf("Beginning saturated measurement probe.\n") } saturated_latency := getLatency(ctx, saturated_measurement_probe, url, debugLevel) if saturated_latency.Err != nil { - fmt.Printf("Error occurred getting the saturated RTT.\n") + fmt.Printf("Error occurred getting the saturated measurement.\n") responseChannel <- saturated_latency return } } if debug.IsDebug(debugLevel) { - fmt.Printf("Beginning unsaturated RTT probe.\n") + fmt.Printf("Beginning unsaturated measurement probe.\n") } - new_rtt_latency := getLatency(ctx, new_measurement_probe, url, debugLevel) + unsaturated_probe_latency := getLatency(ctx, unsaturated_measurement_probe, url, debugLevel) - if new_rtt_latency.Err != nil { - fmt.Printf("Error occurred getting the unsaturated RTT.\n") - responseChannel <- new_rtt_latency + if unsaturated_probe_latency.Err != nil { + fmt.Printf("Error occurred getting the unsaturated measurement.\n") + responseChannel <- unsaturated_probe_latency return } - total_delay := new_rtt_latency.Delay - total_measurement_count := new_rtt_latency.MeasurementCount + total_latency := unsaturated_probe_latency.Delay + total_measurement_count := unsaturated_probe_latency.MeasurementCount if strict { - total_delay += saturated_latency.Delay - total_measurement_count += saturated_latency.MeasurementCount + total_latency += saturated_probe_latency.Delay + total_measurement_count += saturated_probe_latency.MeasurementCount } - responseChannel <- utilities.MeasurementResult{Delay: total_delay, MeasurementCount: total_measurement_count, Err: nil} + responseChannel <- utilities.MeasurementResult{Delay: total_latency, MeasurementCount: total_measurement_count, Err: nil} return }() return responseChannel -- cgit v1.2.3 From 140c64907408d1cf998f1616d6aec4bc2904ce95 Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Fri, 13 May 2022 23:49:56 -0400 Subject: [Improvement] Increase granularity of transfer rate calculation Previously the code relied on the fact that the go runtime would wake up the saturation go function at exactly 1 second intervals, as we asked. With this change, the code takes into account any minor fluctuations in that time when calculating the throughput in an interval. I hope that this change is an improvement. --- lgc/lgc.go | 70 +++++++++++++++++++++++++++++++++++++++----------------------- rpm/rpm.go | 16 +++++--------- 2 files changed, 49 insertions(+), 37 deletions(-) (limited to 'rpm') diff --git a/lgc/lgc.go b/lgc/lgc.go index 1f6332d..59b623d 100644 --- a/lgc/lgc.go +++ b/lgc/lgc.go @@ -34,22 +34,25 @@ import ( type LoadGeneratingConnection interface { Start(context.Context, debug.DebugLevel) bool - Transferred() uint64 + TransferredInInterval() (uint64, time.Duration) Client() *http.Client IsValid() bool ClientId() uint64 } type LoadGeneratingConnectionDownload struct { - Path string - downloaded uint64 - client *http.Client - debug debug.DebugLevel - valid bool - KeyLogger io.Writer - clientId uint64 - tracer *httptrace.ClientTrace - stats stats.TraceStats + Path string + downloaded uint64 + downloadStartTime time.Time + lastDownloaded uint64 + lastIntervalEnd int64 + client *http.Client + debug debug.DebugLevel + valid bool + KeyLogger io.Writer + clientId uint64 + tracer *httptrace.ClientTrace + stats stats.TraceStats } func (lgd *LoadGeneratingConnectionDownload) SetDnsStartTimeInfo( @@ -204,12 +207,15 @@ func (lgd *LoadGeneratingConnectionDownload) ClientId() uint64 { return lgd.clientId } -func (lgd *LoadGeneratingConnectionDownload) Transferred() uint64 { - transferred := atomic.LoadUint64(&lgd.downloaded) +func (lgd *LoadGeneratingConnectionDownload) TransferredInInterval() (uint64, time.Duration) { + transferred := atomic.SwapUint64(&lgd.downloaded, 0) + newIntervalEnd := (time.Now().Sub(lgd.downloadStartTime)).Nanoseconds() + previousIntervalEnd := atomic.SwapInt64(&lgd.lastIntervalEnd, newIntervalEnd) + intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd) if debug.IsDebug(lgd.debug) { - fmt.Printf("download: Transferred: %v\n", transferred) + fmt.Printf("download: Transferred: %v bytes in %v.\n", transferred, intervalLength) } - return transferred + return transferred, intervalLength } func (lgd *LoadGeneratingConnectionDownload) Client() *http.Client { @@ -291,6 +297,9 @@ func (lbd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) { return } + lbd.downloadStartTime = time.Now() + lbd.lastIntervalEnd = 0 + if get, err = lbd.client.Do(request); err != nil { lbd.valid = false return @@ -304,25 +313,31 @@ func (lbd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) { } type LoadGeneratingConnectionUpload struct { - Path string - uploaded uint64 - client *http.Client - debug debug.DebugLevel - valid bool - KeyLogger io.Writer - clientId uint64 + Path string + uploaded uint64 + uploadStartTime time.Time + lastUploaded uint64 + lastIntervalEnd int64 + client *http.Client + debug debug.DebugLevel + valid bool + KeyLogger io.Writer + clientId uint64 } func (lgu *LoadGeneratingConnectionUpload) ClientId() uint64 { return lgu.clientId } -func (lgu *LoadGeneratingConnectionUpload) Transferred() uint64 { - transferred := atomic.LoadUint64(&lgu.uploaded) - if debug.IsDebug(lgu.debug) { - fmt.Printf("upload: Transferred: %v\n", transferred) +func (lgd *LoadGeneratingConnectionUpload) TransferredInInterval() (uint64, time.Duration) { + transferred := atomic.SwapUint64(&lgd.uploaded, 0) + newIntervalEnd := (time.Now().Sub(lgd.uploadStartTime)).Nanoseconds() + previousIntervalEnd := atomic.SwapInt64(&lgd.lastIntervalEnd, newIntervalEnd) + intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd) + if debug.IsDebug(lgd.debug) { + fmt.Printf("upload: Transferred: %v bytes in %v.\n", transferred, intervalLength) } - return transferred + return transferred, intervalLength } func (lgu *LoadGeneratingConnectionUpload) Client() *http.Client { @@ -355,6 +370,9 @@ func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) bool { var resp *http.Response = nil var err error + lgu.uploadStartTime = time.Now() + lgu.lastIntervalEnd = 0 + if resp, err = lgu.client.Post(lgu.Path, "application/octet-stream", s); err != nil { lgu.valid = false return false diff --git a/rpm/rpm.go b/rpm/rpm.go index 8956509..3425566 100644 --- a/rpm/rpm.go +++ b/rpm/rpm.go @@ -23,13 +23,11 @@ func addFlows( ctx context.Context, toAdd uint64, lgcs *[]lgc.LoadGeneratingConnection, - lgcsPreviousTransferred *[]uint64, lgcGenerator func() lgc.LoadGeneratingConnection, debug debug.DebugLevel, ) { for i := uint64(0); i < toAdd; i++ { *lgcs = append(*lgcs, lgcGenerator()) - *lgcsPreviousTransferred = append(*lgcsPreviousTransferred, 0) if !(*lgcs)[len(*lgcs)-1].Start(ctx, debug) { fmt.Printf( "Error starting lgc with id %d!\n", @@ -55,13 +53,11 @@ func Saturate( go func() { lgcs := make([]lgc.LoadGeneratingConnection, 0) - lgcsPreviousTransferred := make([]uint64, 0) addFlows( saturationCtx, constants.StartingNumberOfLoadGeneratingConnections, &lgcs, - &lgcsPreviousTransferred, lgcGenerator, debugging.Level, ) @@ -116,7 +112,7 @@ func Saturate( // Compute "instantaneous aggregate" goodput which is the number of // bytes transferred within the last second. - totalTransfer := uint64(0) + var totalTransfer float64 = 0 allInvalid := true for i := range lgcs { if !lgcs[i].IsValid() { @@ -130,11 +126,10 @@ func Saturate( continue } allInvalid = false - previousTransferred := lgcsPreviousTransferred[i] - currentTransferred := lgcs[i].Transferred() - instantaneousTransferred := currentTransferred - previousTransferred + currentTransferred, currentInterval := lgcs[i].TransferredInInterval() + // normalize to a second-long interval! + instantaneousTransferred := float64(currentTransferred) / float64(currentInterval.Seconds()) totalTransfer += instantaneousTransferred - lgcsPreviousTransferred[i] = currentTransferred } // For some reason, all the lgcs are invalid. This likely means that @@ -208,7 +203,6 @@ func Saturate( saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, - &lgcsPreviousTransferred, lgcGenerator, debugging.Level, ) @@ -234,7 +228,7 @@ func Saturate( if debug.IsDebug(debugging.Level) { fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging) } - addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, &lgcsPreviousTransferred, lgcGenerator, debugging.Level) + addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level) previousFlowIncreaseInterval = currentInterval } } -- cgit v1.2.3 From eb8f1e7826c375b1ad8ea16e842451677d74df11 Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Tue, 17 May 2022 16:53:27 -0400 Subject: [Comments] Fix a few comments 1. A misleading comment was corrected. 2. A typo was fixed. --- rpm/rpm.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'rpm') diff --git a/rpm/rpm.go b/rpm/rpm.go index 3425566..28496cd 100644 --- a/rpm/rpm.go +++ b/rpm/rpm.go @@ -304,7 +304,7 @@ func (p *Probe) GetTLSAndHttpHeaderDelta() time.Duration { if p.stats.ConnectionReused { // When we reuse a connection there will be no time logged for when the // TCP connection was established (obviously). So, fall back to the time - // when were notified about reusing a connection (as a close approximation!). + // when we were notified about reusing a connection (as a close approximation!). before = p.stats.GetConnectionDoneTime } delta := p.stats.HttpResponseReadyTime.Sub(before) @@ -561,10 +561,9 @@ func CalculateProbeMeasurements( responseChannel := make(chan utilities.MeasurementResult) go func() { /* - TODO: We *are* measuring round-trip times on the load-generating connection - right now. However, it is not clear if Apple is doing the same in their native - client. We will have to adjust based on that. - */ + * Depending on whether the user wants their measurements to be strict, we will + * measure on the LGC. + */ var saturated_probe_latency utilities.MeasurementResult if strict { -- cgit v1.2.3