summaryrefslogtreecommitdiff
path: root/rpm/rpm.go
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2023-07-10 13:45:50 -0400
committerWill Hawkins <[email protected]>2023-07-10 13:45:50 -0400
commit78d574a74665c8bc062c26755c80a8b524bce347 (patch)
tree7ad65f0052defaea63acb2f3445be00ef97e24d6 /rpm/rpm.go
parentfe17152a507bbf94a11cca7f49a51cbae9c0d67b (diff)
[Feature] Major update: Track measurements that may be delayed
Among other major feature additions, this version of the client tracks any measurements that may be long delayed and considers their presence or absence as part of a stability measurement. This version of the client also more closely tracks the spec. In particular, it performs a sinle-sided trimmed mean rather than a double-sided trimmed mean. Signed-off-by: Will Hawkins <[email protected]>
Diffstat (limited to 'rpm/rpm.go')
-rw-r--r--rpm/rpm.go37
1 files changed, 30 insertions, 7 deletions
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 23ed9f4..b1b8ba4 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -29,6 +29,7 @@ import (
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
"github.com/network-quality/goresponsiveness/probe"
+ "github.com/network-quality/goresponsiveness/series"
"github.com/network-quality/goresponsiveness/utilities"
)
@@ -84,6 +85,11 @@ type SelfDataCollectionResult struct {
LoggingContinuation func()
}
+type ResponsivenessProbeResult struct {
+ Foreign *probe.ProbeDataPoint
+ Self *probe.ProbeDataPoint
+}
+
func ResponsivenessProber(
proberCtx context.Context,
networkActivityCtx context.Context,
@@ -95,7 +101,7 @@ func ResponsivenessProber(
keyLogger io.Writer,
captureExtendedStats bool,
debugging *debug.DebugWithPrefix,
-) (dataPoints chan utilities.Pair[*probe.ProbeDataPoint]) {
+) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, uint]) {
if debug.IsDebug(debugging.Level) {
fmt.Printf(
"(%s) Starting to collect responsiveness information at an interval of %v!\n",
@@ -106,7 +112,7 @@ func ResponsivenessProber(
// Make a channel to send back all the generated data points
// when we are probing.
- dataPoints = make(chan utilities.Pair[*probe.ProbeDataPoint])
+ dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, uint])
go func() {
wg := sync.WaitGroup{}
@@ -141,6 +147,11 @@ func ResponsivenessProber(
)
}
+ dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{
+ Type: series.SeriesMessageReserve, Bucket: probeCount,
+ Measure: utilities.None[ResponsivenessProbeResult](),
+ }
+
// The presence of a custom TLSClientConfig in a *generic* `transport`
// means that go will default to HTTP/1.1 and cowardly avoid HTTP/2:
// https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278
@@ -262,8 +273,13 @@ func ResponsivenessProber(
dataPointsLock.Lock()
// Now we have our four data points (three in the foreign probe data point and one in the self probe data point)
if dataPoints != nil {
- dataPoints <- utilities.Pair[*probe.ProbeDataPoint]{
- First: foreignProbeDataPoint, Second: selfProbeDataPoint,
+ measurement := ResponsivenessProbeResult{
+ Foreign: foreignProbeDataPoint, Self: selfProbeDataPoint,
+ }
+
+ dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{
+ Type: series.SeriesMessageMeasure, Bucket: probeCount,
+ Measure: utilities.Some[ResponsivenessProbeResult](measurement),
}
}
dataPointsLock.Unlock()
@@ -300,8 +316,8 @@ func LoadGenerator(
mnp int,
captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections?
debugging *debug.DebugWithPrefix, // How can we forget debugging?
-) (stabilizerCommunicationChannel chan ThroughputDataPoint) { // Send back all the instantaneous throughputs that we generate.
- stabilizerCommunicationChannel = make(chan ThroughputDataPoint)
+) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, uint64]) { // Send back all the instantaneous throughputs that we generate.
+ seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, uint64])
go func() {
flowsCreated := uint64(0)
@@ -454,7 +470,14 @@ func LoadGenerator(
len(*loadGeneratingConnectionsCollection.LGCs),
granularThroughputDatapoints,
}
- stabilizerCommunicationChannel <- throughputDataPoint
+
+ seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{
+ Type: series.SeriesMessageReserve, Bucket: currentInterval,
+ }
+ seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{
+ Type: series.SeriesMessageMeasure, Bucket: currentInterval,
+ Measure: utilities.Some[ThroughputDataPoint](throughputDataPoint),
+ }
if generateLoadCtx.Err() != nil {
// No need to add additional data points because the controller told us