diff options
Diffstat (limited to 'rpm')
| -rw-r--r-- | rpm/rpm.go | 36 |
1 files changed, 21 insertions, 15 deletions
@@ -90,18 +90,19 @@ type ResponsivenessProbeResult struct { Self *probe.ProbeDataPoint } -func ResponsivenessProber( +func ResponsivenessProber[BucketType utilities.Number]( proberCtx context.Context, networkActivityCtx context.Context, foreignProbeConfigurationGenerator func() probe.ProbeConfiguration, selfProbeConfigurationGenerator func() probe.ProbeConfiguration, selfProbeConnectionCollection *lgc.LoadGeneratingConnectionCollection, + bucketGenerator *series.NumericBucketGenerator[BucketType], probeDirection lgc.LgcDirection, probeInterval time.Duration, keyLogger io.Writer, captureExtendedStats bool, debugging *debug.DebugWithPrefix, -) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, uint]) { +) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, BucketType]) { if debug.IsDebug(debugging.Level) { fmt.Printf( "(%s) Starting to collect responsiveness information at an interval of %v!\n", @@ -112,7 +113,7 @@ func ResponsivenessProber( // Make a channel to send back all the generated data points // when we are probing. - dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, uint]) + dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, BucketType]) go func() { wg := sync.WaitGroup{} @@ -147,8 +148,10 @@ func ResponsivenessProber( ) } - dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{ - Type: series.SeriesMessageReserve, Bucket: probeCount, + currentBucketId := bucketGenerator.Generate() + + dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, BucketType]{ + Type: series.SeriesMessageReserve, Bucket: currentBucketId, Measure: utilities.None[ResponsivenessProbeResult](), } @@ -277,8 +280,8 @@ func ResponsivenessProber( Foreign: foreignProbeDataPoint, Self: selfProbeDataPoint, } - dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{ - Type: series.SeriesMessageMeasure, Bucket: probeCount, + dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, BucketType]{ + Type: series.SeriesMessageMeasure, Bucket: currentBucketId, Measure: utilities.Some[ResponsivenessProbeResult](measurement), } } @@ -306,18 +309,19 @@ func ResponsivenessProber( return } -func LoadGenerator( +func LoadGenerator[BucketType utilities.Number]( throughputCtx context.Context, // Stop our activity when we no longer need any throughput networkActivityCtx context.Context, // Create all network connections in this context. generateLoadCtx context.Context, // Stop adding additional throughput when we are stable. rampupInterval time.Duration, lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection. loadGeneratingConnectionsCollection *lgc.LoadGeneratingConnectionCollection, + bucketGenerator *series.NumericBucketGenerator[BucketType], mnp int, captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections? debugging *debug.DebugWithPrefix, // How can we forget debugging? -) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, uint64]) { // Send back all the instantaneous throughputs that we generate. - seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, uint64]) +) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, BucketType]) { // Send back all the instantaneous throughputs that we generate. + seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, BucketType]) go func() { flowsCreated := uint64(0) @@ -332,7 +336,7 @@ func LoadGenerator( nextSampleStartTime := time.Now().Add(rampupInterval) - for currentInterval := uint64(0); true; currentInterval++ { + for currentIntervalId := uint64(0); true; currentIntervalId++ { // If the throughputCtx is canceled, then that means our work here is done ... if throughputCtx.Err() != nil { @@ -471,11 +475,13 @@ func LoadGenerator( granularThroughputDatapoints, } - seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{ - Type: series.SeriesMessageReserve, Bucket: currentInterval, + currentBucketId := bucketGenerator.Generate() + + seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, BucketType]{ + Type: series.SeriesMessageReserve, Bucket: currentBucketId, } - seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{ - Type: series.SeriesMessageMeasure, Bucket: currentInterval, + seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, BucketType]{ + Type: series.SeriesMessageMeasure, Bucket: currentBucketId, Measure: utilities.Some[ThroughputDataPoint](throughputDataPoint), } |
