summaryrefslogtreecommitdiff
path: root/rpm/rpm.go
diff options
context:
space:
mode:
Diffstat (limited to 'rpm/rpm.go')
-rw-r--r--rpm/rpm.go36
1 files changed, 21 insertions, 15 deletions
diff --git a/rpm/rpm.go b/rpm/rpm.go
index b1b8ba4..0f51461 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -90,18 +90,19 @@ type ResponsivenessProbeResult struct {
Self *probe.ProbeDataPoint
}
-func ResponsivenessProber(
+func ResponsivenessProber[BucketType utilities.Number](
proberCtx context.Context,
networkActivityCtx context.Context,
foreignProbeConfigurationGenerator func() probe.ProbeConfiguration,
selfProbeConfigurationGenerator func() probe.ProbeConfiguration,
selfProbeConnectionCollection *lgc.LoadGeneratingConnectionCollection,
+ bucketGenerator *series.NumericBucketGenerator[BucketType],
probeDirection lgc.LgcDirection,
probeInterval time.Duration,
keyLogger io.Writer,
captureExtendedStats bool,
debugging *debug.DebugWithPrefix,
-) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, uint]) {
+) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, BucketType]) {
if debug.IsDebug(debugging.Level) {
fmt.Printf(
"(%s) Starting to collect responsiveness information at an interval of %v!\n",
@@ -112,7 +113,7 @@ func ResponsivenessProber(
// Make a channel to send back all the generated data points
// when we are probing.
- dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, uint])
+ dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, BucketType])
go func() {
wg := sync.WaitGroup{}
@@ -147,8 +148,10 @@ func ResponsivenessProber(
)
}
- dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{
- Type: series.SeriesMessageReserve, Bucket: probeCount,
+ currentBucketId := bucketGenerator.Generate()
+
+ dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, BucketType]{
+ Type: series.SeriesMessageReserve, Bucket: currentBucketId,
Measure: utilities.None[ResponsivenessProbeResult](),
}
@@ -277,8 +280,8 @@ func ResponsivenessProber(
Foreign: foreignProbeDataPoint, Self: selfProbeDataPoint,
}
- dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{
- Type: series.SeriesMessageMeasure, Bucket: probeCount,
+ dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, BucketType]{
+ Type: series.SeriesMessageMeasure, Bucket: currentBucketId,
Measure: utilities.Some[ResponsivenessProbeResult](measurement),
}
}
@@ -306,18 +309,19 @@ func ResponsivenessProber(
return
}
-func LoadGenerator(
+func LoadGenerator[BucketType utilities.Number](
throughputCtx context.Context, // Stop our activity when we no longer need any throughput
networkActivityCtx context.Context, // Create all network connections in this context.
generateLoadCtx context.Context, // Stop adding additional throughput when we are stable.
rampupInterval time.Duration,
lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection.
loadGeneratingConnectionsCollection *lgc.LoadGeneratingConnectionCollection,
+ bucketGenerator *series.NumericBucketGenerator[BucketType],
mnp int,
captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections?
debugging *debug.DebugWithPrefix, // How can we forget debugging?
-) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, uint64]) { // Send back all the instantaneous throughputs that we generate.
- seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, uint64])
+) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, BucketType]) { // Send back all the instantaneous throughputs that we generate.
+ seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, BucketType])
go func() {
flowsCreated := uint64(0)
@@ -332,7 +336,7 @@ func LoadGenerator(
nextSampleStartTime := time.Now().Add(rampupInterval)
- for currentInterval := uint64(0); true; currentInterval++ {
+ for currentIntervalId := uint64(0); true; currentIntervalId++ {
// If the throughputCtx is canceled, then that means our work here is done ...
if throughputCtx.Err() != nil {
@@ -471,11 +475,13 @@ func LoadGenerator(
granularThroughputDatapoints,
}
- seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{
- Type: series.SeriesMessageReserve, Bucket: currentInterval,
+ currentBucketId := bucketGenerator.Generate()
+
+ seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, BucketType]{
+ Type: series.SeriesMessageReserve, Bucket: currentBucketId,
}
- seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{
- Type: series.SeriesMessageMeasure, Bucket: currentInterval,
+ seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, BucketType]{
+ Type: series.SeriesMessageMeasure, Bucket: currentBucketId,
Measure: utilities.Some[ThroughputDataPoint](throughputDataPoint),
}