summaryrefslogtreecommitdiff
path: root/rpm/rpm.go
diff options
context:
space:
mode:
Diffstat (limited to 'rpm/rpm.go')
-rw-r--r--rpm/rpm.go296
1 files changed, 134 insertions, 162 deletions
diff --git a/rpm/rpm.go b/rpm/rpm.go
index ecab7c9..07121a6 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -38,30 +38,119 @@ func addFlows(
}
}
-type SaturationResult struct {
- RateBps float64
- LGCs []lgc.LoadGeneratingConnection
+type ProbeConfiguration struct {
+ URL string
}
-func Saturate(
- saturationCtx context.Context,
+type DataPoint struct {
+ RoundTripCount uint64
+ Duration time.Duration
+}
+
+type LGDataCollectionResult struct {
+ RateBps float64
+ LGCs []lgc.LoadGeneratingConnection
+ DataPoints []DataPoint
+}
+
+func LGProbe(parentProbeCtx context.Context, connection lgc.LoadGeneratingConnection, lgProbeUrl string, result *chan DataPoint, debugging *debug.DebugWithPrefix) error {
+ probeCtx, _ := context.WithCancel(parentProbeCtx)
+ probeTracer := NewProbeTracer(connection.Client(), true, debugging)
+ time_before_probe := time.Now()
+ probe_req, err := http.NewRequestWithContext(
+ httptrace.WithClientTrace(probeCtx, probeTracer.trace),
+ "GET",
+ lgProbeUrl,
+ nil,
+ )
+ if err != nil {
+ return err
+ }
+
+ probe_resp, err := connection.Client().Do(probe_req)
+ if err != nil {
+ return err
+ }
+
+ // TODO: Make this interruptable somehow by using _ctx_.
+ _, err = io.ReadAll(probe_resp.Body)
+ if err != nil {
+ return err
+ }
+ time_after_probe := time.Now()
+
+ // Depending on whether we think that Close() requires another RTT (via TCP), we
+ // may need to move this before/after capturing the after time.
+ probe_resp.Body.Close()
+
+ sanity := time_after_probe.Sub(time_before_probe)
+
+ tlsAndHttpHeaderDelta := probeTracer.GetTLSAndHttpHeaderDelta()
+ httpDownloadDelta := probeTracer.GetHttpDownloadDelta(
+ time_after_probe,
+ ) // Combined with above, constitutes 2 time measurements, per the Spec.
+ tcpDelta := probeTracer.GetTCPDelta() // Constitutes 1 time measurement, per the Spec.
+ totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + tcpDelta
+
+ // We must have reused the connection!
+ if !probeTracer.stats.ConnectionReused {
+ panic(!probeTracer.stats.ConnectionReused)
+ }
+
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) (Probe %v) sanity vs total: %v vs %v\n",
+ debugging.Prefix,
+ probeTracer.ProbeId(),
+ sanity,
+ totalDelay,
+ )
+ }
+ *result <- DataPoint{RoundTripCount: 1, Duration: totalDelay}
+
+ return nil
+ /////////////
+}
+func LGProber(proberCtx context.Context, defaultConnection lgc.LoadGeneratingConnection, altConnections *[]lgc.LoadGeneratingConnection, url string, interval time.Duration, debugging *debug.DebugWithPrefix) (points chan DataPoint) {
+ points = make(chan DataPoint)
+
+ go func() {
+ for proberCtx.Err() == nil {
+ time.Sleep(interval)
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf("(%s) About to probe!\n", debugging.Prefix)
+ }
+ go LGProbe(proberCtx, defaultConnection, url, &points, debugging)
+ }
+ }()
+
+ return
+}
+
+func LGCollectData(
+ lgDataCollectionCtx context.Context,
operatingCtx context.Context,
lgcGenerator func() lgc.LoadGeneratingConnection,
+ lgProbeConfigurationGenerator func() ProbeConfiguration,
debugging *debug.DebugWithPrefix,
-) (saturated chan SaturationResult) {
- saturated = make(chan SaturationResult)
+) (resulted chan LGDataCollectionResult) {
+ resulted = make(chan LGDataCollectionResult)
go func() {
lgcs := make([]lgc.LoadGeneratingConnection, 0)
addFlows(
- saturationCtx,
+ lgDataCollectionCtx,
constants.StartingNumberOfLoadGeneratingConnections,
&lgcs,
lgcGenerator,
debugging.Level,
)
+ lgProbeConfiguration := lgProbeConfigurationGenerator()
+
+ LGProber(lgDataCollectionCtx, lgcs[0], &lgcs, lgProbeConfiguration.URL, time.Duration(100*time.Millisecond), debugging)
+
previousFlowIncreaseInterval := uint64(0)
previousMovingAverage := float64(0)
@@ -84,13 +173,13 @@ func Saturate(
for currentInterval := uint64(0); true; currentInterval++ {
// When the program stops operating, then stop.
- if saturationCtx.Err() != nil {
+ if lgDataCollectionCtx.Err() != nil {
return
}
// We may be asked to stop trying to saturate the
// network and return our current status.
- if saturationCtx.Err() != nil {
+ if lgDataCollectionCtx.Err() != nil {
//break
}
@@ -204,7 +293,7 @@ func Saturate(
)
}
addFlows(
- saturationCtx,
+ lgDataCollectionCtx,
constants.AdditiveNumberOfLoadGeneratingConnections,
&lgcs,
lgcGenerator,
@@ -232,38 +321,39 @@ func Saturate(
if debug.IsDebug(debugging.Level) {
fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debugging)
}
- addFlows(saturationCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level)
+ addFlows(lgDataCollectionCtx, constants.AdditiveNumberOfLoadGeneratingConnections, &lgcs, lgcGenerator, debugging.Level)
previousFlowIncreaseInterval = currentInterval
}
}
}
- saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs}
+ resulted <- LGDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs}
}()
return
}
-type Probe struct {
+type ProbeTracer struct {
client *http.Client
stats *stats.TraceStats
trace *httptrace.ClientTrace
debug debug.DebugLevel
probeid uint64
+ isLG bool
}
-func (p *Probe) String() string {
+func (p *ProbeTracer) String() string {
return fmt.Sprintf("(Probe %v): stats: %v\n", p.probeid, p.stats)
}
-func (p *Probe) ProbeId() uint64 {
+func (p *ProbeTracer) ProbeId() uint64 {
return p.probeid
}
-func (p *Probe) GetTrace() *httptrace.ClientTrace {
+func (p *ProbeTracer) GetTrace() *httptrace.ClientTrace {
return p.trace
}
-func (p *Probe) GetDnsDelta() time.Duration {
+func (p *ProbeTracer) GetDnsDelta() time.Duration {
if p.stats.ConnectionReused {
return time.Duration(0)
}
@@ -274,7 +364,7 @@ func (p *Probe) GetDnsDelta() time.Duration {
return delta
}
-func (p *Probe) GetTCPDelta() time.Duration {
+func (p *ProbeTracer) GetTCPDelta() time.Duration {
if p.stats.ConnectionReused {
return time.Duration(0)
}
@@ -285,7 +375,7 @@ func (p *Probe) GetTCPDelta() time.Duration {
return delta
}
-func (p *Probe) GetTLSDelta() time.Duration {
+func (p *ProbeTracer) GetTLSDelta() time.Duration {
if utilities.IsSome(p.stats.TLSDoneTime) {
panic("There should not be TLS information, but there is.")
}
@@ -296,7 +386,7 @@ func (p *Probe) GetTLSDelta() time.Duration {
return delta
}
-func (p *Probe) GetTLSAndHttpHeaderDelta() time.Duration {
+func (p *ProbeTracer) GetTLSAndHttpHeaderDelta() time.Duration {
// Because the TLS handshake occurs *after* the TCP connection (ConnectDoneTime)
// and *before* the HTTP transaction, we know that the delta between the time
// that the first HTTP response byte is available and the time that the TCP
@@ -318,7 +408,7 @@ func (p *Probe) GetTLSAndHttpHeaderDelta() time.Duration {
return delta
}
-func (p *Probe) GetHttpHeaderDelta() time.Duration {
+func (p *ProbeTracer) GetHttpHeaderDelta() time.Duration {
panic(
"Unusable until TLS tracing support is enabled! Use GetTLSAndHttpHeaderDelta() instead.\n",
)
@@ -329,7 +419,7 @@ func (p *Probe) GetHttpHeaderDelta() time.Duration {
return delta
}
-func (p *Probe) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration {
+func (p *ProbeTracer) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration {
delta := httpDoneTime.Sub(p.stats.HttpResponseReadyTime)
if debug.IsDebug(p.debug) {
fmt.Printf("(Probe %v): Http Download Time: %v\n", p.probeid, delta)
@@ -337,21 +427,22 @@ func (p *Probe) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration {
return delta
}
-func NewProbe(client *http.Client, debugLevel debug.DebugLevel) *Probe {
- probe := &Probe{
+func NewProbeTracer(client *http.Client, isLG bool, debugging *debug.DebugWithPrefix) *ProbeTracer {
+ probe := &ProbeTracer{
client: client,
stats: &stats.TraceStats{},
trace: nil,
- debug: debugLevel,
+ debug: debugging.Level,
probeid: utilities.GenerateConnectionId(),
+ isLG: isLG,
}
- trace := traceable.GenerateHttpTimingTracer(probe, debugLevel)
+ trace := traceable.GenerateHttpTimingTracer(probe, debugging.Level)
probe.trace = trace
return probe
}
-func (probe *Probe) SetDnsStartTimeInfo(
+func (probe *ProbeTracer) SetDnsStartTimeInfo(
now time.Time,
dnsStartInfo httptrace.DNSStartInfo,
) {
@@ -366,7 +457,7 @@ func (probe *Probe) SetDnsStartTimeInfo(
}
}
-func (probe *Probe) SetDnsDoneTimeInfo(
+func (probe *ProbeTracer) SetDnsDoneTimeInfo(
now time.Time,
dnsDoneInfo httptrace.DNSDoneInfo,
) {
@@ -381,7 +472,7 @@ func (probe *Probe) SetDnsDoneTimeInfo(
}
}
-func (probe *Probe) SetConnectStartTime(
+func (probe *ProbeTracer) SetConnectStartTime(
now time.Time,
) {
probe.stats.ConnectStartTime = now
@@ -394,7 +485,7 @@ func (probe *Probe) SetConnectStartTime(
}
}
-func (probe *Probe) SetConnectDoneTimeError(
+func (probe *ProbeTracer) SetConnectDoneTimeError(
now time.Time,
err error,
) {
@@ -410,7 +501,7 @@ func (probe *Probe) SetConnectDoneTimeError(
}
}
-func (probe *Probe) SetGetConnTime(now time.Time) {
+func (probe *ProbeTracer) SetGetConnTime(now time.Time) {
probe.stats.GetConnectionStartTime = now
if debug.IsDebug(probe.debug) {
fmt.Printf(
@@ -421,21 +512,21 @@ func (probe *Probe) SetGetConnTime(now time.Time) {
}
}
-func (probe *Probe) SetGotConnTimeInfo(
+func (probe *ProbeTracer) SetGotConnTimeInfo(
now time.Time,
gotConnInfo httptrace.GotConnInfo,
) {
probe.stats.GetConnectionDoneTime = now
probe.stats.ConnInfo = gotConnInfo
probe.stats.ConnectionReused = gotConnInfo.Reused
+ if probe.isLG && !gotConnInfo.Reused {
+ fmt.Fprintf(os.Stderr, "A probe sent on an LG Connection used a new connection!\n")
+ } else if debug.IsDebug(probe.debug) {
+ fmt.Printf("Properly reused a connection when probing on an LG Connection!\n")
+ }
if debug.IsDebug(probe.debug) {
- reusedString := "(new)"
- if probe.stats.ConnectionReused {
- reusedString = "(reused)"
- }
fmt.Printf(
- "(Probe) Got %v connection for %v at %v with info %v\n",
- reusedString,
+ "(Probe) Got a reused connection for %v at %v with info %v\n",
probe.ProbeId(),
probe.stats.GetConnectionDoneTime,
probe.stats.ConnInfo,
@@ -443,7 +534,7 @@ func (probe *Probe) SetGotConnTimeInfo(
}
}
-func (probe *Probe) SetTLSHandshakeStartTime(
+func (probe *ProbeTracer) SetTLSHandshakeStartTime(
now time.Time,
) {
probe.stats.TLSStartTime = utilities.Some(now)
@@ -456,7 +547,7 @@ func (probe *Probe) SetTLSHandshakeStartTime(
}
}
-func (probe *Probe) SetTLSHandshakeDoneTimeState(
+func (probe *ProbeTracer) SetTLSHandshakeDoneTimeState(
now time.Time,
connectionState tls.ConnectionState,
) {
@@ -472,7 +563,7 @@ func (probe *Probe) SetTLSHandshakeDoneTimeState(
}
}
-func (probe *Probe) SetHttpWroteRequestTimeInfo(
+func (probe *ProbeTracer) SetHttpWroteRequestTimeInfo(
now time.Time,
info httptrace.WroteRequestInfo,
) {
@@ -488,7 +579,7 @@ func (probe *Probe) SetHttpWroteRequestTimeInfo(
}
}
-func (probe *Probe) SetHttpResponseReadyTime(
+func (probe *ProbeTracer) SetHttpResponseReadyTime(
now time.Time,
) {
probe.stats.HttpResponseReadyTime = now
@@ -500,122 +591,3 @@ func (probe *Probe) SetHttpResponseReadyTime(
)
}
}
-
-func getLatency(
- ctx context.Context,
- probe *Probe,
- url string,
- debugLevel debug.DebugLevel,
-) utilities.MeasurementResult {
- time_before_probe := time.Now()
- probe_req, err := http.NewRequestWithContext(
- httptrace.WithClientTrace(ctx, probe.GetTrace()),
- "GET",
- url,
- nil,
- )
- if err != nil {
- return utilities.MeasurementResult{Delay: 0, MeasurementCount: 0, Err: err}
- }
-
- probe_resp, err := probe.client.Do(probe_req)
- if err != nil {
- return utilities.MeasurementResult{Delay: 0, MeasurementCount: 0, Err: err}
- }
-
- // TODO: Make this interruptable somehow by using _ctx_.
- _, err = io.ReadAll(probe_resp.Body)
- if err != nil {
- return utilities.MeasurementResult{Delay: 0, Err: err}
- }
- time_after_probe := time.Now()
-
- // Depending on whether we think that Close() requires another RTT (via TCP), we
- // may need to move this before/after capturing the after time.
- probe_resp.Body.Close()
-
- sanity := time_after_probe.Sub(time_before_probe)
-
- tlsAndHttpHeaderDelta := probe.GetTLSAndHttpHeaderDelta()
- httpDownloadDelta := probe.GetHttpDownloadDelta(
- time_after_probe,
- ) // Combined with above, constitutes 2 time measurements, per the Spec.
- tcpDelta := probe.GetTCPDelta() // Constitutes 1 time measurement, per the Spec.
- totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + tcpDelta
-
- // By default, assume that there was a reused connection which
- // means that we only made 1 time measurement.
- var measurementCount uint16 = 1
- if !probe.stats.ConnectionReused {
- // If we did not reuse the connection, then we made three additional time measurements.
- // See above for details on that calculation.
- measurementCount = 3
- }
-
- if debug.IsDebug(debugLevel) {
- fmt.Printf(
- "(Probe %v) sanity vs total: %v vs %v\n",
- probe.ProbeId(),
- sanity,
- totalDelay,
- )
- }
- return utilities.MeasurementResult{
- Delay: totalDelay,
- MeasurementCount: measurementCount,
- Err: nil,
- }
-}
-
-func CalculateProbeMeasurements(
- ctx context.Context,
- strict bool,
- saturated_measurement_probe *Probe,
- unsaturated_measurement_probe *Probe,
- url string,
- debugLevel debug.DebugLevel,
-) chan utilities.MeasurementResult {
- responseChannel := make(chan utilities.MeasurementResult)
- go func() {
- /*
- * Depending on whether the user wants their measurements to be strict, we will
- * measure on the LGC.
- */
- var saturated_probe_latency utilities.MeasurementResult
- if strict {
-
- if debug.IsDebug(debugLevel) {
- fmt.Printf("Beginning saturated measurement probe.\n")
- }
- saturated_latency := getLatency(ctx, saturated_measurement_probe, url, debugLevel)
-
- if saturated_latency.Err != nil {
- fmt.Printf("Error occurred getting the saturated measurement.\n")
- responseChannel <- saturated_latency
- return
- }
- }
-
- if debug.IsDebug(debugLevel) {
- fmt.Printf("Beginning unsaturated measurement probe.\n")
- }
- unsaturated_probe_latency := getLatency(ctx, unsaturated_measurement_probe, url, debugLevel)
-
- if unsaturated_probe_latency.Err != nil {
- fmt.Printf("Error occurred getting the unsaturated measurement.\n")
- responseChannel <- unsaturated_probe_latency
- return
- }
-
- total_latency := unsaturated_probe_latency.Delay
- total_measurement_count := unsaturated_probe_latency.MeasurementCount
-
- if strict {
- total_latency += saturated_probe_latency.Delay
- total_measurement_count += saturated_probe_latency.MeasurementCount
- }
- responseChannel <- utilities.MeasurementResult{Delay: total_latency, MeasurementCount: total_measurement_count, Err: nil}
- return
- }()
- return responseChannel
-}