summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lgc/lgc.go25
-rw-r--r--networkQuality.go214
-rw-r--r--rpm/rpm.go296
3 files changed, 209 insertions, 326 deletions
diff --git a/lgc/lgc.go b/lgc/lgc.go
index fb51ec1..b32721b 100644
--- a/lgc/lgc.go
+++ b/lgc/lgc.go
@@ -241,7 +241,7 @@ func (cr *countingReader) Read(p []byte) (n int, err error) {
}
func (lgd *LoadGeneratingConnectionDownload) Start(
- ctx context.Context,
+ parentCtx context.Context,
debugLevel debug.DebugLevel,
) bool {
lgd.downloaded = 0
@@ -278,9 +278,18 @@ func (lgd *LoadGeneratingConnectionDownload) Start(
lgd.clientId,
)
}
- go lgd.doDownload(ctx)
+
+ // Later, when the doDownload function attempts to add a tracer to the http request,
+ // it will be associated with the context. Multiple tracers associated with the same
+ // context will make it impossible to disambiguate the events. In other words, if there
+ // are multiple tracers associated with the same context, *all* the tracers get invoked
+ // every time that an event happens on a request with any of them! So, we will make a
+ // unique context so that there is a one-to-one correspondence between tracers and requests.
+ downloadCtx, _ := context.WithCancel(parentCtx)
+ go lgd.doDownload(downloadCtx)
return true
}
+
func (lgd *LoadGeneratingConnectionDownload) IsValid() bool {
return lgd.valid
}
@@ -394,7 +403,7 @@ func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) bool {
}
func (lgu *LoadGeneratingConnectionUpload) Start(
- ctx context.Context,
+ parentCtx context.Context,
debugLevel debug.DebugLevel,
) bool {
lgu.uploaded = 0
@@ -422,7 +431,15 @@ func (lgu *LoadGeneratingConnectionUpload) Start(
if debug.IsDebug(lgu.debug) {
fmt.Printf("Started a load-generating upload (id: %v).\n", lgu.clientId)
}
- go lgu.doUpload(ctx)
+
+ // Later, when the doUpload function attempts to add a tracer to the http request,
+ // it will be associated with the context. Multiple tracers associated with the same
+ // context will make it impossible to disambiguate the events. In other words, if there
+ // are multiple tracers associated with the same context, *all* the tracers get invoked
+ // every time that an event happens on a request with any of them! So, we will make a
+ // unique context so that there is a one-to-one correspondence between tracers and requests.
+ uploadCtx, _ := context.WithCancel(parentCtx)
+ go lgu.doUpload(uploadCtx)
return true
}
diff --git a/networkQuality.go b/networkQuality.go
index 93c42e0..d5c3523 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -16,10 +16,8 @@ package main
import (
"context"
- "crypto/tls"
"flag"
"fmt"
- "net/http"
"os"
"runtime/pprof"
"time"
@@ -33,7 +31,6 @@ import (
"github.com/network-quality/goresponsiveness/rpm"
"github.com/network-quality/goresponsiveness/timeoutat"
"github.com/network-quality/goresponsiveness/utilities"
- "golang.org/x/net/http2"
)
var (
@@ -98,7 +95,7 @@ func main() {
timeoutAbsoluteTime := time.Now().Add(timeoutDuration)
configHostPort := fmt.Sprintf("%s:%d", *configHost, *configPort)
operatingCtx, cancelOperatingCtx := context.WithCancel(context.Background())
- saturationCtx, cancelSaturationCtx := context.WithCancel(
+ lgDataCollectionCtx, cancelLGDataCollectionCtx := context.WithCancel(
context.Background(),
)
config := &config.Config{}
@@ -187,84 +184,90 @@ func main() {
* Create (and then, ironically, name) two anonymous functions that, when invoked,
* will create load-generating connections for upload/download/
*/
- generate_lbd := func() lgc.LoadGeneratingConnection {
+ generate_lgd := func() lgc.LoadGeneratingConnection {
return &lgc.LoadGeneratingConnectionDownload{
Path: config.Urls.LargeUrl,
KeyLogger: sslKeyFileConcurrentWriter,
}
}
- generate_lbu := func() lgc.LoadGeneratingConnection {
+ generate_lgu := func() lgc.LoadGeneratingConnection {
return &lgc.LoadGeneratingConnectionUpload{
Path: config.Urls.UploadUrl,
KeyLogger: sslKeyFileConcurrentWriter,
}
}
+ generate_lg_probe_configuration := func() rpm.ProbeConfiguration {
+ return rpm.ProbeConfiguration{URL: config.Urls.SmallUrl}
+ }
+
var downloadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "download")
var uploadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "upload")
- downloadSaturationChannel := rpm.Saturate(
- saturationCtx,
+ downloadDataCollectionChannel := rpm.LGCollectData(
+ lgDataCollectionCtx,
operatingCtx,
- generate_lbd,
+ generate_lgd,
+ generate_lg_probe_configuration,
downloadDebugging,
)
- uploadSaturationChannel := rpm.Saturate(
- saturationCtx,
+ uploadDataCollectionChannel := rpm.LGCollectData(
+ lgDataCollectionCtx,
operatingCtx,
- generate_lbu,
+ generate_lgu,
+ generate_lg_probe_configuration,
uploadDebugging,
)
- saturationTimeout := false
- uploadSaturated := false
- downloadSaturated := false
- downloadSaturation := rpm.SaturationResult{}
- uploadSaturation := rpm.SaturationResult{}
+ dataCollectionTimeout := false
+ uploadDataCollectionComplete := false
+ downloadDataCollectionComple := false
+ downloadDataCollectionResult := rpm.LGDataCollectionResult{}
+ uploadDataCollectionResult := rpm.LGDataCollectionResult{}
- for !(uploadSaturated && downloadSaturated) {
+ for !(uploadDataCollectionComplete && downloadDataCollectionComple) {
select {
- case downloadSaturation = <-downloadSaturationChannel:
+ case downloadDataCollectionResult = <-downloadDataCollectionChannel:
{
- downloadSaturated = true
+ downloadDataCollectionComple = true
if *debugCliFlag {
fmt.Printf(
- "################# download is %s saturated (%fMBps, %d flows)!\n",
+ "################# download load-generating data collection is %s complete (%fMBps, %d flows)!\n",
utilities.Conditional(
- saturationTimeout,
+ dataCollectionTimeout,
"(provisionally)",
"",
),
- utilities.ToMBps(downloadSaturation.RateBps),
- len(downloadSaturation.LGCs),
+ utilities.ToMBps(downloadDataCollectionResult.RateBps),
+ len(downloadDataCollectionResult.LGCs),
)
}
}
- case uploadSaturation = <-uploadSaturationChannel:
+ case uploadDataCollectionResult = <-uploadDataCollectionChannel:
{
- uploadSaturated = true
+ uploadDataCollectionComplete = true
if *debugCliFlag {
fmt.Printf(
- "################# upload is %s saturated (%fMBps, %d flows)!\n",
+ "################# upload load-generating data collection is %s complete (%fMBps, %d flows)!\n",
utilities.Conditional(
- saturationTimeout,
+ dataCollectionTimeout,
"(provisionally)",
"",
),
- utilities.ToMBps(uploadSaturation.RateBps),
- len(uploadSaturation.LGCs),
+ utilities.ToMBps(uploadDataCollectionResult.RateBps),
+ len(uploadDataCollectionResult.LGCs),
)
}
}
case <-timeoutChannel:
{
- if saturationTimeout {
- // We already timedout on saturation. This signal means that
- // we are timedout on getting the provisional saturation. We
+ if dataCollectionTimeout {
+ // We already timedout on data collection. This signal means that
+ // we are timedout on getting the provisional data collection. We
// will exit!
fmt.Fprint(
os.Stderr,
- "Error: Saturation could not be completed in time and no provisional rates could be assessed. Test failed.\n",
+ "Error: Load-Generating data collection could not be completed in time and no provisional data could be gathered. Test failed.\n",
)
cancelOperatingCtx()
if *debugCliFlag {
@@ -272,13 +275,13 @@ func main() {
}
return
}
- saturationTimeout = true
+ dataCollectionTimeout = true
- // We timed out attempting to saturate the link. So, we will
- // shut down all the saturation xfers
- cancelSaturationCtx()
+ // We timed out attempting to collect data about the link. So, we will
+ // shut down all the collection xfers
+ cancelLGDataCollectionCtx()
// and then we will give ourselves some additional time in order
- // to calculate a provisional saturation.
+ // to complete provisional data collection.
timeoutAbsoluteTime = time.Now().
Add(time.Second * time.Duration(*rpmtimeout))
timeoutChannel = timeoutat.TimeoutAt(
@@ -288,39 +291,27 @@ func main() {
)
if *debugCliFlag {
fmt.Printf(
- "################# timeout reaching saturation!\n",
+ "################# timeout collecting load-generating data!\n",
)
}
}
}
}
- // Give ourselves no more than 15 seconds to complete the RPM calculation.
- // This is conditional because (above) we may have already added the time.
- // We did it up there so that we could also limit the amount of time waiting
- // for a conditional saturation calculation.
- if !saturationTimeout {
- timeoutAbsoluteTime = time.Now().Add(time.Second * time.Duration(*rpmtimeout))
- timeoutChannel = timeoutat.TimeoutAt(
- operatingCtx,
- timeoutAbsoluteTime,
- debugLevel,
- )
- }
+ // In the new version we are no longer going to wait to send probes until after
+ // saturation. When we get here we are now only going to compute the results
+ // and/or extended statistics!
- totalMeasurements := uint64(0)
- totalMeasurementTimes := float64(0)
- measurementTimeout := false
extendedStats := extendedstats.ExtendedStats{}
- for i := 0; i < len(downloadSaturation.LGCs); i++ {
+ for i := 0; i < len(downloadDataCollectionResult.LGCs); i++ {
// Assume that extended statistics are available -- the check was done explicitly at
// program startup if the calculateExtendedStats flag was set by the user on the command line.
if *calculateExtendedStats {
if !extendedstats.ExtendedStatsAvailable() {
panic("Extended stats are not available but the user requested their calculation.")
}
- if err := extendedStats.IncorporateConnectionStats(downloadSaturation.LGCs[i].Stats().ConnInfo.Conn); err != nil {
+ if err := extendedStats.IncorporateConnectionStats(downloadDataCollectionResult.LGCs[i].Stats().ConnInfo.Conn); err != nil {
fmt.Fprintf(
os.Stderr,
"Warning: Could not add extended stats for the connection: %v",
@@ -329,116 +320,19 @@ func main() {
}
}
}
-
- for i := 0; i < constants.MeasurementProbeCount && !measurementTimeout; i++ {
- if len(downloadSaturation.LGCs) == 0 {
- continue
- }
- randomLGCsIndex := utilities.RandBetween(len(downloadSaturation.LGCs))
- if !downloadSaturation.LGCs[randomLGCsIndex].IsValid() {
- if *debugCliFlag {
- fmt.Printf(
- "%v: The randomly selected saturated connection (with id %d) was invalid. Skipping.\n",
- debugCliFlag,
- downloadSaturation.LGCs[randomLGCsIndex].ClientId(),
- )
- }
-
- // Protect against pathological cases where we continuously select
- // invalid connections and never
- // do the select below
- if time.Since(timeoutAbsoluteTime) > 0 {
- if *debugCliFlag {
- fmt.Printf(
- "Pathologically could not find valid saturated connections use for measurement.\n",
- )
- }
- break
- }
- continue
- }
-
- unsaturatedMeasurementTransport := http2.Transport{}
- unsaturatedMeasurementTransport.TLSClientConfig = &tls.Config{}
- if sslKeyFileConcurrentWriter != nil {
- unsaturatedMeasurementTransport.TLSClientConfig.KeyLogWriter = sslKeyFileConcurrentWriter
- }
- unsaturatedMeasurementTransport.TLSClientConfig.InsecureSkipVerify = true
- newClient := http.Client{Transport: &unsaturatedMeasurementTransport}
-
- unsaturatedMeasurementProbe := rpm.NewProbe(&newClient, debugLevel)
-
- saturatedMeasurementProbe := rpm.NewProbe(
- downloadSaturation.LGCs[randomLGCsIndex].Client(),
- debugLevel,
- )
-
- select {
- case <-timeoutChannel:
- {
- measurementTimeout = true
- }
- case sequentialMeasurementTimes := <-rpm.CalculateProbeMeasurements(operatingCtx, *strictFlag, saturatedMeasurementProbe, unsaturatedMeasurementProbe, config.Urls.SmallUrl, debugLevel):
- {
- if sequentialMeasurementTimes.Err != nil {
- fmt.Printf(
- "Failed to calculate a time for sequential measurements: %v\n",
- sequentialMeasurementTimes.Err,
- )
- continue
- }
-
- if debug.IsDebug(debugLevel) {
- fmt.Printf("unsaturatedMeasurementProbe: %v\n", unsaturatedMeasurementProbe)
- }
- // We know that we have a good Sequential measurement.
- totalMeasurements += uint64(sequentialMeasurementTimes.MeasurementCount)
- totalMeasurementTimes += sequentialMeasurementTimes.Delay.Seconds()
- if debug.IsDebug(debugLevel) {
- fmt.Printf(
- "most-recent sequential measurement time: %v; most-recent sequential measurement count: %v\n",
- sequentialMeasurementTimes.Delay.Seconds(),
- sequentialMeasurementTimes.MeasurementCount,
- )
- }
- }
- }
- }
-
fmt.Printf(
"Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
- utilities.ToMbps(downloadSaturation.RateBps),
- utilities.ToMBps(downloadSaturation.RateBps),
- len(downloadSaturation.LGCs),
+ utilities.ToMbps(downloadDataCollectionResult.RateBps),
+ utilities.ToMBps(downloadDataCollectionResult.RateBps),
+ len(downloadDataCollectionResult.LGCs),
)
fmt.Printf(
"Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
- utilities.ToMbps(uploadSaturation.RateBps),
- utilities.ToMBps(uploadSaturation.RateBps),
- len(uploadSaturation.LGCs),
+ utilities.ToMbps(uploadDataCollectionResult.RateBps),
+ utilities.ToMBps(uploadDataCollectionResult.RateBps),
+ len(uploadDataCollectionResult.LGCs),
)
- if totalMeasurements != 0 {
- // "... it sums the five time values for each probe, and divides by the
- // total
- // number of probes to compute an average probe duration. The
- // reciprocal of this, normalized to 60 seconds, gives the Round-trips
- // Per Minute (RPM)."
- // "average probe duration" = totalMeasurementTimes / totalMeasurements.
- // The reciprocol of this = 1 / (totalMeasurementTimes / totalMeasurements) <-
- // semantically the probes-per-second.
- // Normalized to 60 seconds: 60 * (1
- // / ((totalMeasurementTimes / totalMeasurements)))) <- semantically the number of
- // probes per minute.
- rpm := float64(
- time.Minute.Seconds(),
- ) / (totalMeasurementTimes / (float64(totalMeasurements)))
- fmt.Printf("Total measurements: %d\n", totalMeasurements)
- fmt.Printf("RPM: %5.0f\n", rpm)
- } else {
- fmt.Printf("Error occurred calculating RPM -- no probe measurements received.\n")
- }
-
if *calculateExtendedStats {
fmt.Println(extendedStats.Repr())
}
diff --git a/rpm/rpm.go b/rpm/rpm.go
index ecab7c9..07121a6 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -38,30 +38,119 @@ func addFlows(
}
}
-type SaturationResult struct {
- RateBps float64
- LGCs []lgc.LoadGeneratingConnection
+type ProbeConfiguration struct {
+ URL string
}
-func Saturate(
- saturationCtx context.Context,
+type DataPoint struct {
+ RoundTripCount uint64
+ Duration time.Duration
+}
+
+type LGDataCollectionResult struct {
+ RateBps float64
+ LGCs []lgc.LoadGeneratingConnection
+ DataPoints []DataPoint
+}
+
+func LGProbe(parentProbeCtx context.Context, connection lgc.LoadGeneratingConnection, lgProbeUrl string, result *chan DataPoint, debugging *debug.DebugWithPrefix) error {
+ probeCtx, _ := context.WithCancel(parentProbeCtx)
+ probeTracer := NewProbeTracer(connection.Client(), true, debugging)
+ time_before_probe := time.Now()
+ probe_req, err := http.NewRequestWithContext(
+ httptrace.WithClientTrace(probeCtx, probeTracer.trace),
+ "GET",
+ lgProbeUrl,
+ nil,
+ )
+ if err != nil {
+ return err
+ }
+
+ probe_resp, err := connection.Client().Do(probe_req)
+ if err != nil {
+ return err
+ }
+
+ // TODO: Make this interruptable somehow by using _ctx_.
+ _, err = io.ReadAll(probe_resp.Body)
+ if err != nil {
+ return err
+ }
+ time_after_probe := time.Now()
+
+ // Depending on whether we think that Close() requires another RTT (via TCP), we
+ // may need to move this before/after capturing the after time.
+ probe_resp.Body.Close()
+
+ sanity := time_after_probe.Sub(time_before_probe)
+
+ tlsAndHttpHeaderDelta := probeTracer.GetTLSAndHttpHeaderDelta()
+ httpDownloadDelta := probeTracer.GetHttpDownloadDelta(
+ time_after_probe,
+ ) // Combined with above, constitutes 2 time measurements, per the Spec.
+ tcpDelta := probeTracer.GetTCPDelta() // Constitutes 1 time measurement, per the Spec.
+ totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + tcpDelta
+
+ // We must have reused the connection!
+ if !probeTracer.stats.ConnectionReused {
+ panic(!probeTracer.stats.ConnectionReused)
+ }
+
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) (Probe %v) sanity vs total: %v vs %v\n",
+ debugging.Prefix,
+ probeTracer.ProbeId(),
+ sanity,
+ totalDelay,
+ )
+ }
+ *result <- DataPoint{RoundTripCount: 1, Duration: totalDelay}
+
+ return nil
+ /////////////
+}
+func LGProber(proberCtx context.Context, defaultConnection lgc.LoadGeneratingConnection, altConnections *[]lgc.LoadGeneratingConnection, url string, interval time.Duration, debugging *debug.DebugWithPrefix) (points chan DataPoint) {
+ points = make(chan DataPoint)
+
+ go func() {
+ for proberCtx.Err() == nil {
+ time.Sleep(interval)
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf("(%s) About to probe!\n", debugging.Prefix)
+ }
+ go LGProbe(proberCtx, defaultConnection, url, &points, debugging)
+ }
+ }()
+
+ return
+}
+
+func LGCollectData(
+ lgDataCollectionCtx context.Context,
operatingCtx context.Context,
lgcGenerator func() lgc.LoadGeneratingConnection,
+ lgProbeConfigurationGenerator func() ProbeConfiguration,
debugging *debug.DebugWithPrefix,
-) (saturated chan SaturationResult) {
- saturated = make(chan SaturationResult)
+) (resulted chan LGDataCollectionResult) {
+ resulted = make(chan LGDataCollectionResult)
go func() {
lgcs := make([]lgc.LoadGeneratingConnection, 0)
addFlows(
- saturationCtx,
+ lgDataCollectionCtx,
constants.StartingNumberOfLoadGeneratingConnections,
&lgcs,
lgcGenerator,
debugging.Level,
)
+ lgProbeConfiguration := lgProbeConfigurationGenerator()
+
+ LGProber(lgDataCollectionCtx, lgcs[0], &lgcs, lgProbeConfiguration.URL, time.Duration(100*time.Millisecond), debugging)
+
previousFlowIncreaseInterval := uint64(0)
previousMovingAverage := float64(0)
@@ -84,13 +173,13 @@ func Saturate(
for currentInterval := uint64(0); true; currentInterval++ {
// When the program stops operating, then stop.
- if saturationCtx.Err() != nil {
+ if lgDataCollectionCtx.Err() != nil {
return
}
// We may be asked to stop trying to saturate the
// network and return our current status.
- if saturationCtx.Err() != nil {
+ if lgDataCollectionCtx.Err() != nil {
//break
}
@@ -204,7 +293,7 @@ func Saturate(
)
}
addFlows(
- saturationCtx,
+ lgDataCollectionCtx,
constants.AdditiveNumberOfLoadGeneratingConnections,
&lgcs,
lgcGenerator,
@@ -232,38 +321,39 @@ func Saturate(
if debug.IsDebug(debugging.Level) {
fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging)
}
- addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level)
+ addFlows(lgDataCollectionCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level)
previousFlowIncreaseInterval = currentInterval
}
}
}
- saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs}
+ resulted <- LGDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs}
}()
return
}
-type Probe struct {
+type ProbeTracer struct {
client *http.Client
stats *stats.TraceStats
trace *httptrace.ClientTrace
debug debug.DebugLevel
probeid uint64
+ isLG bool
}
-func (p *Probe) String() string {
+func (p *ProbeTracer) String() string {
return fmt.Sprintf("(Probe %v): stats: %v\n", p.probeid, p.stats)
}
-func (p *Probe) ProbeId() uint64 {
+func (p *ProbeTracer) ProbeId() uint64 {
return p.probeid
}
-func (p *Probe) GetTrace() *httptrace.ClientTrace {
+func (p *ProbeTracer) GetTrace() *httptrace.ClientTrace {
return p.trace
}
-func (p *Probe) GetDnsDelta() time.Duration {
+func (p *ProbeTracer) GetDnsDelta() time.Duration {
if p.stats.ConnectionReused {
return time.Duration(0)
}
@@ -274,7 +364,7 @@ func (p *Probe) GetDnsDelta() time.Duration {
return delta
}
-func (p *Probe) GetTCPDelta() time.Duration {
+func (p *ProbeTracer) GetTCPDelta() time.Duration {
if p.stats.ConnectionReused {
return time.Duration(0)
}
@@ -285,7 +375,7 @@ func (p *Probe) GetTCPDelta() time.Duration {
return delta
}
-func (p *Probe) GetTLSDelta() time.Duration {
+func (p *ProbeTracer) GetTLSDelta() time.Duration {
if utilities.IsSome(p.stats.TLSDoneTime) {
panic("There should not be TLS information, but there is.")
}
@@ -296,7 +386,7 @@ func (p *Probe) GetTLSDelta() time.Duration {
return delta
}
-func (p *Probe) GetTLSAndHttpHeaderDelta() time.Duration {
+func (p *ProbeTracer) GetTLSAndHttpHeaderDelta() time.Duration {
// Because the TLS handshake occurs *after* the TCP connection (ConnectDoneTime)
// and *before* the HTTP transaction, we know that the delta between the time
// that the first HTTP response byte is available and the time that the TCP
@@ -318,7 +408,7 @@ func (p *Probe) GetTLSAndHttpHeaderDelta() time.Duration {
return delta
}
-func (p *Probe) GetHttpHeaderDelta() time.Duration {
+func (p *ProbeTracer) GetHttpHeaderDelta() time.Duration {
panic(
"Unusable until TLS tracing support is enabled! Use GetTLSAndHttpHeaderDelta() instead.\n",
)
@@ -329,7 +419,7 @@ func (p *Probe) GetHttpHeaderDelta() time.Duration {
return delta
}
-func (p *Probe) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration {
+func (p *ProbeTracer) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration {
delta := httpDoneTime.Sub(p.stats.HttpResponseReadyTime)
if debug.IsDebug(p.debug) {
fmt.Printf("(Probe %v): Http Download Time: %v\n", p.probeid, delta)
@@ -337,21 +427,22 @@ func (p *Probe) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration {
return delta
}
-func NewProbe(client *http.Client, debugLevel debug.DebugLevel) *Probe {
- probe := &Probe{
+func NewProbeTracer(client *http.Client, isLG bool, debugging *debug.DebugWithPrefix) *ProbeTracer {
+ probe := &ProbeTracer{
client: client,
stats: &stats.TraceStats{},
trace: nil,
- debug: debugLevel,
+ debug: debugging.Level,
probeid: utilities.GenerateConnectionId(),
+ isLG: isLG,
}
- trace := traceable.GenerateHttpTimingTracer(probe, debugLevel)
+ trace := traceable.GenerateHttpTimingTracer(probe, debugging.Level)
probe.trace = trace
return probe
}
-func (probe *Probe) SetDnsStartTimeInfo(
+func (probe *ProbeTracer) SetDnsStartTimeInfo(
now time.Time,
dnsStartInfo httptrace.DNSStartInfo,
) {
@@ -366,7 +457,7 @@ func (probe *Probe) SetDnsStartTimeInfo(
}
}
-func (probe *Probe) SetDnsDoneTimeInfo(
+func (probe *ProbeTracer) SetDnsDoneTimeInfo(
now time.Time,
dnsDoneInfo httptrace.DNSDoneInfo,
) {
@@ -381,7 +472,7 @@ func (probe *Probe) SetDnsDoneTimeInfo(
}
}
-func (probe *Probe) SetConnectStartTime(
+func (probe *ProbeTracer) SetConnectStartTime(
now time.Time,
) {
probe.stats.ConnectStartTime = now
@@ -394,7 +485,7 @@ func (probe *Probe) SetConnectStartTime(
}
}
-func (probe *Probe) SetConnectDoneTimeError(
+func (probe *ProbeTracer) SetConnectDoneTimeError(
now time.Time,
err error,
) {
@@ -410,7 +501,7 @@ func (probe *Probe) SetConnectDoneTimeError(
}
}
-func (probe *Probe) SetGetConnTime(now time.Time) {
+func (probe *ProbeTracer) SetGetConnTime(now time.Time) {
probe.stats.GetConnectionStartTime = now
if debug.IsDebug(probe.debug) {
fmt.Printf(
@@ -421,21 +512,21 @@ func (probe *Probe) SetGetConnTime(now time.Time) {
}
}
-func (probe *Probe) SetGotConnTimeInfo(
+func (probe *ProbeTracer) SetGotConnTimeInfo(
now time.Time,
gotConnInfo httptrace.GotConnInfo,
) {
probe.stats.GetConnectionDoneTime = now
probe.stats.ConnInfo = gotConnInfo
probe.stats.ConnectionReused = gotConnInfo.Reused
+ if probe.isLG && !gotConnInfo.Reused {
+ fmt.Fprintf(os.Stderr, "A probe sent on an LG Connection used a new connection!\n")
+ } else if debug.IsDebug(probe.debug) {
+ fmt.Printf("Properly reused a connection when probing on an LG Connection!\n")
+ }
if debug.IsDebug(probe.debug) {
- reusedString := "(new)"
- if probe.stats.ConnectionReused {
- reusedString = "(reused)"
- }
fmt.Printf(
- "(Probe) Got %v connection for %v at %v with info %v\n",
- reusedString,
+ "(Probe) Got a reused connection for %v at %v with info %v\n",
probe.ProbeId(),
probe.stats.GetConnectionDoneTime,
probe.stats.ConnInfo,
@@ -443,7 +534,7 @@ func (probe *Probe) SetGotConnTimeInfo(
}
}
-func (probe *Probe) SetTLSHandshakeStartTime(
+func (probe *ProbeTracer) SetTLSHandshakeStartTime(
now time.Time,
) {
probe.stats.TLSStartTime = utilities.Some(now)
@@ -456,7 +547,7 @@ func (probe *Probe) SetTLSHandshakeStartTime(
}
}
-func (probe *Probe) SetTLSHandshakeDoneTimeState(
+func (probe *ProbeTracer) SetTLSHandshakeDoneTimeState(
now time.Time,
connectionState tls.ConnectionState,
) {
@@ -472,7 +563,7 @@ func (probe *Probe) SetTLSHandshakeDoneTimeState(
}
}
-func (probe *Probe) SetHttpWroteRequestTimeInfo(
+func (probe *ProbeTracer) SetHttpWroteRequestTimeInfo(
now time.Time,
info httptrace.WroteRequestInfo,
) {
@@ -488,7 +579,7 @@ func (probe *Probe) SetHttpWroteRequestTimeInfo(
}
}
-func (probe *Probe) SetHttpResponseReadyTime(
+func (probe *ProbeTracer) SetHttpResponseReadyTime(
now time.Time,
) {
probe.stats.HttpResponseReadyTime = now
@@ -500,122 +591,3 @@ func (probe *Probe) SetHttpResponseReadyTime(
)
}
}
-
-func getLatency(
- ctx context.Context,
- probe *Probe,
- url string,
- debugLevel debug.DebugLevel,
-) utilities.MeasurementResult {
- time_before_probe := time.Now()
- probe_req, err := http.NewRequestWithContext(
- httptrace.WithClientTrace(ctx, probe.GetTrace()),
- "GET",
- url,
- nil,
- )
- if err != nil {
- return utilities.MeasurementResult{Delay: 0, MeasurementCount: 0, Err: err}
- }
-
- probe_resp, err := probe.client.Do(probe_req)
- if err != nil {
- return utilities.MeasurementResult{Delay: 0, MeasurementCount: 0, Err: err}
- }
-
- // TODO: Make this interruptable somehow by using _ctx_.
- _, err = io.ReadAll(probe_resp.Body)
- if err != nil {
- return utilities.MeasurementResult{Delay: 0, Err: err}
- }
- time_after_probe := time.Now()
-
- // Depending on whether we think that Close() requires another RTT (via TCP), we
- // may need to move this before/after capturing the after time.
- probe_resp.Body.Close()
-
- sanity := time_after_probe.Sub(time_before_probe)
-
- tlsAndHttpHeaderDelta := probe.GetTLSAndHttpHeaderDelta()
- httpDownloadDelta := probe.GetHttpDownloadDelta(
- time_after_probe,
- ) // Combined with above, constitutes 2 time measurements, per the Spec.
- tcpDelta := probe.GetTCPDelta() // Constitutes 1 time measurement, per the Spec.
- totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + tcpDelta
-
- // By default, assume that there was a reused connection which
- // means that we only made 1 time measurement.
- var measurementCount uint16 = 1
- if !probe.stats.ConnectionReused {
- // If we did not reuse the connection, then we made three additional time measurements.
- // See above for details on that calculation.
- measurementCount = 3
- }
-
- if debug.IsDebug(debugLevel) {
- fmt.Printf(
- "(Probe %v) sanity vs total: %v vs %v\n",
- probe.ProbeId(),
- sanity,
- totalDelay,
- )
- }
- return utilities.MeasurementResult{
- Delay: totalDelay,
- MeasurementCount: measurementCount,
- Err: nil,
- }
-}
-
-func CalculateProbeMeasurements(
- ctx context.Context,
- strict bool,
- saturated_measurement_probe *Probe,
- unsaturated_measurement_probe *Probe,
- url string,
- debugLevel debug.DebugLevel,
-) chan utilities.MeasurementResult {
- responseChannel := make(chan utilities.MeasurementResult)
- go func() {
- /*
- * Depending on whether the user wants their measurements to be strict, we will
- * measure on the LGC.
- */
- var saturated_probe_latency utilities.MeasurementResult
- if strict {
-
- if debug.IsDebug(debugLevel) {
- fmt.Printf("Beginning saturated measurement probe.\n")
- }
- saturated_latency := getLatency(ctx, saturated_measurement_probe, url, debugLevel)
-
- if saturated_latency.Err != nil {
- fmt.Printf("Error occurred getting the saturated measurement.\n")
- responseChannel <- saturated_latency
- return
- }
- }
-
- if debug.IsDebug(debugLevel) {
- fmt.Printf("Beginning unsaturated measurement probe.\n")
- }
- unsaturated_probe_latency := getLatency(ctx, unsaturated_measurement_probe, url, debugLevel)
-
- if unsaturated_probe_latency.Err != nil {
- fmt.Printf("Error occurred getting the unsaturated measurement.\n")
- responseChannel <- unsaturated_probe_latency
- return
- }
-
- total_latency := unsaturated_probe_latency.Delay
- total_measurement_count := unsaturated_probe_latency.MeasurementCount
-
- if strict {
- total_latency += saturated_probe_latency.Delay
- total_measurement_count += saturated_probe_latency.MeasurementCount
- }
- responseChannel <- utilities.MeasurementResult{Delay: total_latency, MeasurementCount: total_measurement_count, Err: nil}
- return
- }()
- return responseChannel
-}