summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rpm/rpm.go22
1 files changed, 14 insertions, 8 deletions
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 9a92f9f..3c2fa7f 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -322,10 +322,11 @@ func LoadGenerator(
loadGeneratorCtx context.Context, // Stop our activity when we no longer need to generate load.
rampupInterval time.Duration,
lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection.
- throughputDataLogger datalogger.DataLogger[ThroughputDataPoint], // For logging data!
+ loadGeneratingConnections *lgc.LoadGeneratingConnectionCollection,
debugging *debug.DebugWithPrefix, // How can we forget debugging?
) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to use for self probes.
throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate.
+ lgcs []lgc.LoadGeneratingConnection, // The caller will want to look at this if they are interested in doing extended stats.
) {
throughputCalculations = make(chan ThroughputDataPoint)
@@ -333,6 +334,7 @@ func LoadGenerator(
// be read by the caller. We don't want to wait around until they are ready before we start doing our work.
// So, we'll make it buffered.
probeConnectionCommunicationChannel = make(chan lgc.LoadGeneratingConnection, 1)
+ lgcs = make([]lgc.LoadGeneratingConnection, 0)
go func() {
@@ -342,18 +344,20 @@ func LoadGenerator(
flowsCreated := uint64(0)
+ loadGeneratingConnections.Lock.Lock()
addFlows(
networkActivityCtx,
constants.StartingNumberOfLoadGeneratingConnections,
- &lgcs,
+ loadGeneratingConnections.LGCs,
lgcGenerator,
debugging.Level,
)
+ loadGeneratingConnections.Lock.Unlock()
flowsCreated += constants.StartingNumberOfLoadGeneratingConnections
// We have at least a single load-generating channel. This channel will be the one that
// the self probes use. Let's send it back to the caller so that they can pass it on if they need to.
- probeConnectionCommunicationChannel <- lgcs[0]
+ probeConnectionCommunicationChannel <- (*loadGeneratingConnections.LGCs)[0]
nextSampleStartTime := time.Now().Add(rampupInterval)
@@ -393,19 +397,19 @@ func LoadGenerator(
// bytes transferred within the last second.
var instantaneousTotalThroughput float64 = 0
allInvalid := true
- for i := range lgcs {
- if !lgcs[i].IsValid() {
+ for i := range *loadGeneratingConnections.LGCs {
+ if !(*loadGeneratingConnections.LGCs)[i].IsValid() {
if debug.IsDebug(debugging.Level) {
fmt.Printf(
"%v: Load-generating connection with id %d is invalid ... skipping.\n",
debugging,
- lgcs[i].ClientId(),
+ (*loadGeneratingConnections.LGCs)[i].ClientId(),
)
}
continue
}
allInvalid = false
- currentTransferred, currentInterval := lgcs[i].TransferredInInterval()
+ currentTransferred, currentInterval := (*loadGeneratingConnections.LGCs)[i].TransferredInInterval()
// normalize to a second-long interval!
instantaneousConnectionThroughput := float64(
currentTransferred,
@@ -442,13 +446,15 @@ func LoadGenerator(
}
// Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now!
+ loadGeneratingConnections.Lock.Lock()
addFlows(
networkActivityCtx,
constants.AdditiveNumberOfLoadGeneratingConnections,
- &lgcs,
+ loadGeneratingConnections.LGCs,
lgcGenerator,
debugging.Level,
)
+ loadGeneratingConnections.Lock.Unlock()
flowsCreated += constants.AdditiveNumberOfLoadGeneratingConnections
}