diff options
| -rw-r--r-- | rpm/rpm.go | 22 |
1 files changed, 14 insertions, 8 deletions
@@ -322,10 +322,11 @@ func LoadGenerator( loadGeneratorCtx context.Context, // Stop our activity when we no longer need to generate load. rampupInterval time.Duration, lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection. - throughputDataLogger datalogger.DataLogger[ThroughputDataPoint], // For logging data! + loadGeneratingConnections *lgc.LoadGeneratingConnectionCollection, debugging *debug.DebugWithPrefix, // How can we forget debugging? ) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to use for self probes. throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate. + lgcs []lgc.LoadGeneratingConnection, // The caller will want to look at this if they are interested in doing extended stats. ) { throughputCalculations = make(chan ThroughputDataPoint) @@ -333,6 +334,7 @@ func LoadGenerator( // be read by the caller. We don't want to wait around until they are ready before we start doing our work. // So, we'll make it buffered. probeConnectionCommunicationChannel = make(chan lgc.LoadGeneratingConnection, 1) + lgcs = make([]lgc.LoadGeneratingConnection, 0) go func() { @@ -342,18 +344,20 @@ func LoadGenerator( flowsCreated := uint64(0) + loadGeneratingConnections.Lock.Lock() addFlows( networkActivityCtx, constants.StartingNumberOfLoadGeneratingConnections, - &lgcs, + loadGeneratingConnections.LGCs, lgcGenerator, debugging.Level, ) + loadGeneratingConnections.Lock.Unlock() flowsCreated += constants.StartingNumberOfLoadGeneratingConnections // We have at least a single load-generating channel. This channel will be the one that // the self probes use. Let's send it back to the caller so that they can pass it on if they need to. - probeConnectionCommunicationChannel <- lgcs[0] + probeConnectionCommunicationChannel <- (*loadGeneratingConnections.LGCs)[0] nextSampleStartTime := time.Now().Add(rampupInterval) @@ -393,19 +397,19 @@ func LoadGenerator( // bytes transferred within the last second. var instantaneousTotalThroughput float64 = 0 allInvalid := true - for i := range lgcs { - if !lgcs[i].IsValid() { + for i := range *loadGeneratingConnections.LGCs { + if !(*loadGeneratingConnections.LGCs)[i].IsValid() { if debug.IsDebug(debugging.Level) { fmt.Printf( "%v: Load-generating connection with id %d is invalid ... skipping.\n", debugging, - lgcs[i].ClientId(), + (*loadGeneratingConnections.LGCs)[i].ClientId(), ) } continue } allInvalid = false - currentTransferred, currentInterval := lgcs[i].TransferredInInterval() + currentTransferred, currentInterval := (*loadGeneratingConnections.LGCs)[i].TransferredInInterval() // normalize to a second-long interval! instantaneousConnectionThroughput := float64( currentTransferred, @@ -442,13 +446,15 @@ func LoadGenerator( } // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now! + loadGeneratingConnections.Lock.Lock() addFlows( networkActivityCtx, constants.AdditiveNumberOfLoadGeneratingConnections, - &lgcs, + loadGeneratingConnections.LGCs, lgcGenerator, debugging.Level, ) + loadGeneratingConnections.Lock.Unlock() flowsCreated += constants.AdditiveNumberOfLoadGeneratingConnections } |
