summaryrefslogtreecommitdiff
path: root/rpm/rpm.go
diff options
context:
space:
mode:
Diffstat (limited to 'rpm/rpm.go')
-rw-r--r--rpm/rpm.go226
1 files changed, 109 insertions, 117 deletions
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 6aa68e8..29c735f 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -8,9 +8,11 @@ import (
"net/http"
"net/http/httptrace"
"os"
+ "sync"
"time"
"github.com/network-quality/goresponsiveness/constants"
+ "github.com/network-quality/goresponsiveness/datalogger"
"github.com/network-quality/goresponsiveness/debug"
"github.com/network-quality/goresponsiveness/lgc"
"github.com/network-quality/goresponsiveness/ma"
@@ -40,33 +42,51 @@ func addFlows(
}
type ProbeConfiguration struct {
- URL string
- Interval time.Duration
+ URL string
+ DataLogger datalogger.DataLogger[DataPoint]
+ Interval time.Duration
}
type DataPoint struct {
- RoundTripCount uint64
- Duration time.Duration
+ Time time.Time `Description:"Time of the generation of the data point."`
+ RoundTripCount uint64 `Description:"The number of round trips measured by this data point."`
+ Duration time.Duration `Description:"The duration for this measurement."`
}
-type LGDataCollectionResult struct {
+type SelfDataCollectionResult struct {
RateBps float64
LGCs []lgc.LoadGeneratingConnection
DataPoints []DataPoint
}
+type ProbeType int64
+
+const (
+ Self ProbeType = iota
+ Foreign
+)
+
+func (pt ProbeType) Value() string {
+ if pt == Self {
+ return "Self"
+ }
+ return "Foreign"
+}
+
func Probe(
parentProbeCtx context.Context,
+ waitGroup *sync.WaitGroup,
+ logger datalogger.DataLogger[DataPoint],
client *http.Client,
probeUrl string,
- isLGProbe bool,
+ probeType ProbeType,
result *chan DataPoint,
debugging *debug.DebugWithPrefix,
) error {
- probeTypeLabel := "New-Connection"
- if isLGProbe {
- probeTypeLabel = "Load-Generating"
+ if waitGroup != nil {
+ waitGroup.Add(1)
+ defer waitGroup.Done()
}
if client == nil {
@@ -74,7 +94,7 @@ func Probe(
}
probeId := utilities.GenerateUniqueId()
- probeTracer := NewProbeTracer(client, isLGProbe, probeId, debugging)
+ probeTracer := NewProbeTracer(client, probeType, probeId, debugging)
time_before_probe := time.Now()
probe_req, err := http.NewRequestWithContext(
httptrace.WithClientTrace(parentProbeCtx, probeTracer.trace),
@@ -104,18 +124,18 @@ func Probe(
sanity := time_after_probe.Sub(time_before_probe)
- // When the probe is run on a load-generating connection there should
+ // When the probe is run on a load-generating connection (a self probe) there should
// only be a single round trip that is measured. We will take the accumulation of all these
// values just to be sure, though. Because of how this traced connection was launched, most
// of the values will be 0 (or very small where the time that go takes for delivering callbacks
- // and doing context switches pokes through). When it is !isLGProbe then the values will
+ // and doing context switches pokes through). When it is !isSelfProbe then the values will
// be significant and we want to add them regardless!
totalDelay := probeTracer.GetTLSAndHttpHeaderDelta() + probeTracer.GetHttpDownloadDelta(
time_after_probe,
) + probeTracer.GetTCPDelta()
- // We must have reused the connection if we are a load-generating probe!
- if isLGProbe && !probeTracer.stats.ConnectionReused {
+ // We must have reused the connection if we are a self probe!
+ if probeType == Self && !probeTracer.stats.ConnectionReused {
panic(!probeTracer.stats.ConnectionReused)
}
@@ -123,14 +143,14 @@ func Probe(
fmt.Printf(
"(%s) (%s Probe %v) sanity vs total: %v vs %v\n",
debugging.Prefix,
- probeTypeLabel,
+ probeType.Value(),
probeId,
sanity,
totalDelay,
)
}
roundTripCount := uint64(1)
- if !isLGProbe {
+ if probeType == Foreign {
roundTripCount = 3
}
// TODO: Careful!!! It's possible that this channel has been closed because the Prober that
@@ -138,40 +158,44 @@ func Probe(
// matter because a panic just stops the go thread containing the paniced code and we are in
// a go thread that executes only this function.
defer func() {
- _ = recover()
- if debug.IsDebug(debugging.Level) {
+ isThreadPanicing := recover()
+ if isThreadPanicing != nil && debug.IsDebug(debugging.Level) {
fmt.Printf(
"(%s) (%s Probe %v) Probe attempted to write to the result channel after its invoker ended.\n",
debugging.Prefix,
- probeTypeLabel,
+ probeType.Value(),
probeId,
)
}
}()
- *result <- DataPoint{RoundTripCount: roundTripCount, Duration: totalDelay}
-
+ dataPoint := DataPoint{Time: time.Now(), RoundTripCount: roundTripCount, Duration: totalDelay}
+ if !utilities.IsInterfaceNil(logger) {
+ logger.LogRecord(dataPoint)
+ }
+ *result <- dataPoint
return nil
}
-func Prober(
+func ForeignProber(
proberCtx context.Context,
- ncProbeConfigurationGenerator func() ProbeConfiguration,
+ foreignProbeConfigurationGenerator func() ProbeConfiguration,
keyLogger io.Writer,
debugging *debug.DebugWithPrefix,
) (points chan DataPoint) {
points = make(chan DataPoint)
- ncProbeConfiguration := ncProbeConfigurationGenerator()
+ foreignProbeConfiguration := foreignProbeConfigurationGenerator()
go func() {
+ wg := sync.WaitGroup{}
probeCount := 0
for proberCtx.Err() == nil {
- time.Sleep(ncProbeConfiguration.Interval)
+ time.Sleep(foreignProbeConfiguration.Interval)
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) About to start new-connection probe number %d!\n",
+ "(%s) About to start foreign probe number %d!\n",
debugging.Prefix,
probeCount,
)
@@ -182,7 +206,7 @@ func Prober(
if !utilities.IsInterfaceNil(keyLogger) {
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "Using an SSL Key Logger for this new-connection probe.\n",
+ "Using an SSL Key Logger for this foreign probe.\n",
)
}
@@ -200,13 +224,19 @@ func Prober(
client := &http.Client{Transport: &transport}
probeCount++
- go Probe(proberCtx, client, ncProbeConfiguration.URL, false, &points, debugging)
+ go Probe(proberCtx, &wg, foreignProbeConfiguration.DataLogger, client, foreignProbeConfiguration.URL, Foreign, &points, debugging)
}
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) New-connection probing driver is stopping after sending %d probes.\n",
+ "(%s) Foreign probe driver is going to start waiting for its probes to finish.\n",
+ debugging.Prefix,
+ )
+ }
+ utilities.OrTimeout(func() { wg.Wait() }, 2*time.Second)
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) Foreign probe driver is done waiting for its probes to finish.\n",
debugging.Prefix,
- probeCount,
)
}
close(points)
@@ -214,24 +244,24 @@ func Prober(
return
}
-func LGProber(
+func SelfProber(
proberCtx context.Context,
defaultConnection lgc.LoadGeneratingConnection,
altConnections *[]lgc.LoadGeneratingConnection,
- lgProbeConfiguration ProbeConfiguration,
+ selfProbeConfiguration ProbeConfiguration,
debugging *debug.DebugWithPrefix,
) (points chan DataPoint) {
points = make(chan DataPoint)
- debugging = debug.NewDebugWithPrefix(debugging.Level, debugging.Prefix+" load-generating probe")
+ debugging = debug.NewDebugWithPrefix(debugging.Level, debugging.Prefix+" self probe")
go func() {
probeCount := 0
for proberCtx.Err() == nil {
- time.Sleep(lgProbeConfiguration.Interval)
+ time.Sleep(selfProbeConfiguration.Interval)
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) About to start load-generating probe number %d!\n",
+ "(%s) About to start self probe number %d!\n",
debugging.Prefix,
probeCount,
)
@@ -243,16 +273,18 @@ func LGProber(
// yet.
go Probe(
proberCtx,
+ nil,
+ selfProbeConfiguration.DataLogger,
defaultConnection.Client(),
- lgProbeConfiguration.URL,
- true,
+ selfProbeConfiguration.URL,
+ Self,
&points,
debugging,
)
}
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) Load-generating probing driver is stopping after sending %d probes.\n",
+ "(%s) self probing driver is stopping after sending %d probes.\n",
debugging.Prefix,
probeCount,
)
@@ -266,10 +298,10 @@ func LGCollectData(
lgDataCollectionCtx context.Context,
operatingCtx context.Context,
lgcGenerator func() lgc.LoadGeneratingConnection,
- lgProbeConfigurationGenerator func() ProbeConfiguration,
+ selfProbeConfigurationGenerator func() ProbeConfiguration,
debugging *debug.DebugWithPrefix,
-) (resulted chan LGDataCollectionResult) {
- resulted = make(chan LGDataCollectionResult)
+) (resulted chan SelfDataCollectionResult) {
+ resulted = make(chan SelfDataCollectionResult)
go func() {
lgcs := make([]lgc.LoadGeneratingConnection, 0)
@@ -282,11 +314,11 @@ func LGCollectData(
debugging.Level,
)
- lgProbeCtx, lgProbeCtxCancel := context.WithCancel(lgDataCollectionCtx)
- probeDataPointsChannel := LGProber(lgProbeCtx,
+ selfProbeCtx, selfProbeCtxCancel := context.WithCancel(lgDataCollectionCtx)
+ probeDataPointsChannel := SelfProber(selfProbeCtx,
lgcs[0],
&lgcs,
- lgProbeConfigurationGenerator(),
+ selfProbeConfigurationGenerator(),
debugging,
)
@@ -314,7 +346,7 @@ func LGCollectData(
// When the program stops operating, then stop. When our invoker tells
// us to stop, then stop.
if operatingCtx.Err() != nil || lgDataCollectionCtx.Err() != nil {
- lgProbeCtxCancel()
+ selfProbeCtxCancel()
return
}
@@ -462,30 +494,30 @@ func LGCollectData(
}
}
- lgProbeCtxCancel()
- probeDataPoints := make([]DataPoint, 0)
+ selfProbeCtxCancel()
+ selfProbeDataPoints := make([]DataPoint, 0)
for dataPoint := range probeDataPointsChannel {
- probeDataPoints = append(probeDataPoints, dataPoint)
+ selfProbeDataPoints = append(selfProbeDataPoints, dataPoint)
}
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) Collected %d load-generating probe data points\n",
+ "(%s) Collected %d self data points\n",
debugging.Prefix,
- len(probeDataPoints),
+ len(selfProbeDataPoints),
)
}
- resulted <- LGDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, DataPoints: probeDataPoints}
+ resulted <- SelfDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, DataPoints: selfProbeDataPoints}
}()
return
}
type ProbeTracer struct {
- client *http.Client
- stats *stats.TraceStats
- trace *httptrace.ClientTrace
- debug debug.DebugLevel
- probeid uint64
- isLG bool
+ client *http.Client
+ stats *stats.TraceStats
+ trace *httptrace.ClientTrace
+ debug debug.DebugLevel
+ probeid uint64
+ probeType ProbeType
}
func (p *ProbeTracer) String() string {
@@ -576,17 +608,17 @@ func (p *ProbeTracer) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration
func NewProbeTracer(
client *http.Client,
- isLG bool,
+ probeType ProbeType,
probeId uint64,
debugging *debug.DebugWithPrefix,
) *ProbeTracer {
probe := &ProbeTracer{
- client: client,
- stats: &stats.TraceStats{},
- trace: nil,
- debug: debugging.Level,
- probeid: probeId,
- isLG: isLG,
+ client: client,
+ stats: &stats.TraceStats{},
+ trace: nil,
+ debug: debugging.Level,
+ probeid: probeId,
+ probeType: probeType,
}
trace := traceable.GenerateHttpTimingTracer(probe, debugging.Level)
@@ -601,13 +633,9 @@ func (probe *ProbeTracer) SetDnsStartTimeInfo(
probe.stats.DnsStartTime = now
probe.stats.DnsStart = dnsStartInfo
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) DNS Start for Probe %v: %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
dnsStartInfo,
)
@@ -621,13 +649,9 @@ func (probe *ProbeTracer) SetDnsDoneTimeInfo(
probe.stats.DnsDoneTime = now
probe.stats.DnsDone = dnsDoneInfo
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) DNS Done for Probe %v: %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
probe.stats.DnsDone,
)
@@ -639,13 +663,9 @@ func (probe *ProbeTracer) SetConnectStartTime(
) {
probe.stats.ConnectStartTime = now
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) TCP Start for Probe %v at %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
probe.stats.ConnectStartTime,
)
@@ -659,13 +679,9 @@ func (probe *ProbeTracer) SetConnectDoneTimeError(
probe.stats.ConnectDoneTime = now
probe.stats.ConnectDoneError = err
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) TCP Done for Probe %v (with error %v) @ %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
probe.stats.ConnectDoneError,
probe.stats.ConnectDoneTime,
@@ -676,13 +692,9 @@ func (probe *ProbeTracer) SetConnectDoneTimeError(
func (probe *ProbeTracer) SetGetConnTime(now time.Time) {
probe.stats.GetConnectionStartTime = now
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) Started getting connection for Probe %v @ %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
probe.stats.GetConnectionStartTime,
)
@@ -696,22 +708,18 @@ func (probe *ProbeTracer) SetGotConnTimeInfo(
probe.stats.GetConnectionDoneTime = now
probe.stats.ConnInfo = gotConnInfo
probe.stats.ConnectionReused = gotConnInfo.Reused
- if probe.isLG && !gotConnInfo.Reused {
+ if probe.probeType == Self && !gotConnInfo.Reused {
fmt.Fprintf(
os.Stderr,
- "A probe sent on an load-generating connection used a new connection!\n",
+ "A self probe sent used a new connection!\n",
)
} else if debug.IsDebug(probe.debug) {
- fmt.Printf("Properly reused a connection when probing on a load-generating connection!\n")
+ fmt.Printf("Properly reused a connection when doing a self probe!\n")
}
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) Got a reused connection for Probe %v at %v with info %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
probe.stats.GetConnectionDoneTime,
probe.stats.ConnInfo,
@@ -724,13 +732,9 @@ func (probe *ProbeTracer) SetTLSHandshakeStartTime(
) {
probe.stats.TLSStartTime = utilities.Some(now)
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) Started TLS Handshake for Probe %v @ %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
probe.stats.TLSStartTime,
)
@@ -744,13 +748,9 @@ func (probe *ProbeTracer) SetTLSHandshakeDoneTimeState(
probe.stats.TLSDoneTime = utilities.Some(now)
probe.stats.TLSConnInfo = connectionState
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) Completed TLS handshake for Probe %v at %v with info %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
probe.stats.TLSDoneTime,
probe.stats.TLSConnInfo,
@@ -765,13 +765,9 @@ func (probe *ProbeTracer) SetHttpWroteRequestTimeInfo(
probe.stats.HttpWroteRequestTime = now
probe.stats.HttpInfo = info
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) Http finished writing request for Probe %v at %v with info %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
probe.stats.HttpWroteRequestTime,
probe.stats.HttpInfo,
@@ -784,13 +780,9 @@ func (probe *ProbeTracer) SetHttpResponseReadyTime(
) {
probe.stats.HttpResponseReadyTime = now
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) Http response is ready for Probe %v at %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
probe.stats.HttpResponseReadyTime,
)