summaryrefslogtreecommitdiff
path: root/rpm/rpm.go
diff options
context:
space:
mode:
Diffstat (limited to 'rpm/rpm.go')
-rw-r--r--rpm/rpm.go37
1 files changed, 30 insertions, 7 deletions
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 23ed9f4..b1b8ba4 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -29,6 +29,7 @@ import (
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
"github.com/network-quality/goresponsiveness/probe"
+ "github.com/network-quality/goresponsiveness/series"
"github.com/network-quality/goresponsiveness/utilities"
)
@@ -84,6 +85,11 @@ type SelfDataCollectionResult struct {
LoggingContinuation func()
}
+type ResponsivenessProbeResult struct {
+ Foreign *probe.ProbeDataPoint
+ Self *probe.ProbeDataPoint
+}
+
func ResponsivenessProber(
proberCtx context.Context,
networkActivityCtx context.Context,
@@ -95,7 +101,7 @@ func ResponsivenessProber(
keyLogger io.Writer,
captureExtendedStats bool,
debugging *debug.DebugWithPrefix,
-) (dataPoints chan utilities.Pair[*probe.ProbeDataPoint]) {
+) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, uint]) {
if debug.IsDebug(debugging.Level) {
fmt.Printf(
"(%s) Starting to collect responsiveness information at an interval of %v!\n",
@@ -106,7 +112,7 @@ func ResponsivenessProber(
// Make a channel to send back all the generated data points
// when we are probing.
- dataPoints = make(chan utilities.Pair[*probe.ProbeDataPoint])
+ dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, uint])
go func() {
wg := sync.WaitGroup{}
@@ -141,6 +147,11 @@ func ResponsivenessProber(
)
}
+ dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{
+ Type: series.SeriesMessageReserve, Bucket: probeCount,
+ Measure: utilities.None[ResponsivenessProbeResult](),
+ }
+
// The presence of a custom TLSClientConfig in a *generic* `transport`
// means that go will default to HTTP/1.1 and cowardly avoid HTTP/2:
// https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278
@@ -262,8 +273,13 @@ func ResponsivenessProber(
dataPointsLock.Lock()
// Now we have our four data points (three in the foreign probe data point and one in the self probe data point)
if dataPoints != nil {
- dataPoints <- utilities.Pair[*probe.ProbeDataPoint]{
- First: foreignProbeDataPoint, Second: selfProbeDataPoint,
+ measurement := ResponsivenessProbeResult{
+ Foreign: foreignProbeDataPoint, Self: selfProbeDataPoint,
+ }
+
+ dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{
+ Type: series.SeriesMessageMeasure, Bucket: probeCount,
+ Measure: utilities.Some[ResponsivenessProbeResult](measurement),
}
}
dataPointsLock.Unlock()
@@ -300,8 +316,8 @@ func LoadGenerator(
mnp int,
captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections?
debugging *debug.DebugWithPrefix, // How can we forget debugging?
-) (stabilizerCommunicationChannel chan ThroughputDataPoint) { // Send back all the instantaneous throughputs that we generate.
- stabilizerCommunicationChannel = make(chan ThroughputDataPoint)
+) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, uint64]) { // Send back all the instantaneous throughputs that we generate.
+ seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, uint64])
go func() {
flowsCreated := uint64(0)
@@ -454,7 +470,14 @@ func LoadGenerator(
len(*loadGeneratingConnectionsCollection.LGCs),
granularThroughputDatapoints,
}
- stabilizerCommunicationChannel <- throughputDataPoint
+
+ seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{
+ Type: series.SeriesMessageReserve, Bucket: currentInterval,
+ }
+ seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{
+ Type: series.SeriesMessageMeasure, Bucket: currentInterval,
+ Measure: utilities.Some[ThroughputDataPoint](throughputDataPoint),
+ }
if generateLoadCtx.Err() != nil {
// No need to add additional data points because the controller told us