diff options
| author | Will Hawkins <[email protected]> | 2023-07-10 13:45:50 -0400 |
|---|---|---|
| committer | Will Hawkins <[email protected]> | 2023-07-10 13:45:50 -0400 |
| commit | 78d574a74665c8bc062c26755c80a8b524bce347 (patch) | |
| tree | 7ad65f0052defaea63acb2f3445be00ef97e24d6 /rpm/rpm.go | |
| parent | fe17152a507bbf94a11cca7f49a51cbae9c0d67b (diff) | |
[Feature] Major update: Track measurements that may be delayed
Among other major feature additions, this version of the client tracks
any measurements that may be long delayed and considers their presence
or absence as part of a stability measurement.
This version of the client also more closely tracks the spec. In
particular, it performs a sinle-sided trimmed mean rather than a
double-sided trimmed mean.
Signed-off-by: Will Hawkins <[email protected]>
Diffstat (limited to 'rpm/rpm.go')
| -rw-r--r-- | rpm/rpm.go | 37 |
1 files changed, 30 insertions, 7 deletions
@@ -29,6 +29,7 @@ import ( "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" "github.com/network-quality/goresponsiveness/probe" + "github.com/network-quality/goresponsiveness/series" "github.com/network-quality/goresponsiveness/utilities" ) @@ -84,6 +85,11 @@ type SelfDataCollectionResult struct { LoggingContinuation func() } +type ResponsivenessProbeResult struct { + Foreign *probe.ProbeDataPoint + Self *probe.ProbeDataPoint +} + func ResponsivenessProber( proberCtx context.Context, networkActivityCtx context.Context, @@ -95,7 +101,7 @@ func ResponsivenessProber( keyLogger io.Writer, captureExtendedStats bool, debugging *debug.DebugWithPrefix, -) (dataPoints chan utilities.Pair[*probe.ProbeDataPoint]) { +) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, uint]) { if debug.IsDebug(debugging.Level) { fmt.Printf( "(%s) Starting to collect responsiveness information at an interval of %v!\n", @@ -106,7 +112,7 @@ func ResponsivenessProber( // Make a channel to send back all the generated data points // when we are probing. - dataPoints = make(chan utilities.Pair[*probe.ProbeDataPoint]) + dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, uint]) go func() { wg := sync.WaitGroup{} @@ -141,6 +147,11 @@ func ResponsivenessProber( ) } + dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{ + Type: series.SeriesMessageReserve, Bucket: probeCount, + Measure: utilities.None[ResponsivenessProbeResult](), + } + // The presence of a custom TLSClientConfig in a *generic* `transport` // means that go will default to HTTP/1.1 and cowardly avoid HTTP/2: // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278 @@ -262,8 +273,13 @@ func ResponsivenessProber( dataPointsLock.Lock() // Now we have our four data points (three in the foreign probe data point and one in the self probe data point) if dataPoints != nil { - dataPoints <- utilities.Pair[*probe.ProbeDataPoint]{ - First: foreignProbeDataPoint, Second: selfProbeDataPoint, + measurement := ResponsivenessProbeResult{ + Foreign: foreignProbeDataPoint, Self: selfProbeDataPoint, + } + + dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{ + Type: series.SeriesMessageMeasure, Bucket: probeCount, + Measure: utilities.Some[ResponsivenessProbeResult](measurement), } } dataPointsLock.Unlock() @@ -300,8 +316,8 @@ func LoadGenerator( mnp int, captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections? debugging *debug.DebugWithPrefix, // How can we forget debugging? -) (stabilizerCommunicationChannel chan ThroughputDataPoint) { // Send back all the instantaneous throughputs that we generate. - stabilizerCommunicationChannel = make(chan ThroughputDataPoint) +) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, uint64]) { // Send back all the instantaneous throughputs that we generate. + seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, uint64]) go func() { flowsCreated := uint64(0) @@ -454,7 +470,14 @@ func LoadGenerator( len(*loadGeneratingConnectionsCollection.LGCs), granularThroughputDatapoints, } - stabilizerCommunicationChannel <- throughputDataPoint + + seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{ + Type: series.SeriesMessageReserve, Bucket: currentInterval, + } + seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{ + Type: series.SeriesMessageMeasure, Bucket: currentInterval, + Measure: utilities.Some[ThroughputDataPoint](throughputDataPoint), + } if generateLoadCtx.Err() != nil { // No need to add additional data points because the controller told us |
