diff options
Diffstat (limited to 'rpm')
| -rw-r--r-- | rpm/calculations.go | 94 | ||||
| -rw-r--r-- | rpm/parameters.go | 85 | ||||
| -rw-r--r-- | rpm/parameters_test.go | 93 | ||||
| -rw-r--r-- | rpm/rpm.go | 324 |
4 files changed, 475 insertions, 121 deletions
diff --git a/rpm/calculations.go b/rpm/calculations.go new file mode 100644 index 0000000..5387aa7 --- /dev/null +++ b/rpm/calculations.go @@ -0,0 +1,94 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package rpm + +import ( + "fmt" + + "github.com/network-quality/goresponsiveness/ms" +) + +type Rpm struct { + SelfRttsTotal int + ForeignRttsTotal int + SelfRttsTrimmed int + ForeignRttsTrimmed int + SelfProbeRttPN float64 + ForeignProbeRttPN float64 + SelfProbeRttMean float64 + ForeignProbeRttMean float64 + PNRpm float64 + MeanRpm float64 +} + +func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.MathematicalSeries[float64], trimming uint, percentile int) Rpm { + // First, let's do a double-sided trim of the top/bottom 10% of our measurements. + selfRttsTotalCount := selfRtts.Len() + foreignRttsTotalCount := foreignRtts.Len() + + selfRttsTrimmed := selfRtts.DoubleSidedTrim(trimming) + foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(trimming) + + selfRttsTrimmedCount := selfRttsTrimmed.Len() + foreignRttsTrimmedCount := foreignRttsTrimmed.Len() + + // Then, let's take the mean of those ... + selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage() + foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage() + + // Second, let's do the P90 calculations. + selfProbeRoundTripTimePN := selfRtts.Percentile(percentile) + foreignProbeRoundTripTimePN := foreignRtts.Percentile(percentile) + + // Note: The specification indicates that we want to calculate the foreign probes as such: + // 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign + // where tcp_foreign, tls_foreign, http_foreign are the P90 RTTs for the connection + // of the tcp, tls and http connections, respectively. However, we cannot break out + // the individual RTTs so we assume that they are roughly equal. + + // This is 60 because we measure in seconds not ms + pnRpm := 60.0 / (float64(selfProbeRoundTripTimePN+foreignProbeRoundTripTimePN) / 2.0) + meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0) + + return Rpm{ + SelfRttsTotal: selfRttsTotalCount, ForeignRttsTotal: foreignRttsTotalCount, + SelfRttsTrimmed: selfRttsTrimmedCount, ForeignRttsTrimmed: foreignRttsTrimmedCount, + SelfProbeRttPN: selfProbeRoundTripTimePN, ForeignProbeRttPN: foreignProbeRoundTripTimePN, + SelfProbeRttMean: selfProbeRoundTripTimeMean, ForeignProbeRttMean: foreignProbeRoundTripTimeMean, + PNRpm: pnRpm, MeanRpm: meanRpm, + } +} + +func (rpm *Rpm) ToString() string { + return fmt.Sprintf( + `Total Self Probes: %d +Total Foreign Probes: %d +Trimmed Self Probes Count: %d +Trimmed Foreign Probes Count: %d +P90 Self RTT: %f +P90 Foreign RTT: %f +Trimmed Mean Self RTT: %f +Trimmed Mean Foreign RTT: %f +`, + rpm.SelfRttsTotal, + rpm.ForeignRttsTotal, + rpm.SelfRttsTrimmed, + rpm.ForeignRttsTrimmed, + rpm.SelfProbeRttPN, + rpm.ForeignProbeRttPN, + rpm.SelfProbeRttMean, + rpm.ForeignProbeRttMean, + ) +} diff --git a/rpm/parameters.go b/rpm/parameters.go new file mode 100644 index 0000000..aff8639 --- /dev/null +++ b/rpm/parameters.go @@ -0,0 +1,85 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package rpm + +import ( + "fmt" + "time" + + "github.com/network-quality/goresponsiveness/utilities" +) + +type SpecParameters struct { + TestTimeout time.Duration // Total test time. + MovingAvgDist int + EvalInterval time.Duration // How often to reevaluate network conditions. + TrimmedMeanPct uint + StdDevTolerance float64 + MaxParallelConns int + ProbeInterval time.Duration + ProbeCapacityPct float64 +} + +func SpecParametersFromArguments(timeout int, mad int, id int, tmp uint, sdt float64, mnp int, mps int, ptc float64) (*SpecParameters, error) { + if timeout <= 0 { + return nil, fmt.Errorf("cannot specify a 0 or negative timeout for the test") + } + if mad <= 0 { + return nil, fmt.Errorf("cannot specify a 0 or negative moving-average distance for the test") + } + if id <= 0 { + return nil, fmt.Errorf("cannot specify a 0 or negative reevaluation interval for the test") + } + if tmp < 0 { + return nil, fmt.Errorf("cannot specify a negative trimming percentage for the test") + } + if sdt < 0 { + return nil, fmt.Errorf("cannot specify a negative standard-deviation tolerance for the test") + } + if mnp <= 0 { + return nil, fmt.Errorf("cannot specify a 0 or negative maximum number of parallel connections for the test") + } + if mps <= 0 { + return nil, fmt.Errorf("cannot specify a 0 or negative probing interval for the test") + } + if ptc <= 0 { + return nil, fmt.Errorf("cannot specify a 0 or negative probe capacity for the test") + } + testTimeout := time.Second * time.Duration(timeout) + evalInterval := time.Second * time.Duration(id) + probeInterval := utilities.PerSecondToInterval(int64(mps)) + + params := SpecParameters{ + TestTimeout: testTimeout, MovingAvgDist: mad, + EvalInterval: evalInterval, TrimmedMeanPct: tmp, StdDevTolerance: sdt, + MaxParallelConns: mnp, ProbeInterval: probeInterval, ProbeCapacityPct: ptc, + } + return ¶ms, nil +} + +func (parameters *SpecParameters) ToString() string { + return fmt.Sprintf( + `Timeout: %v, +Moving-Average Distance: %v, +Interval Duration: %v, +Trimmed-Mean Percentage: %v, +Standard-Deviation Tolerance: %v, +Maximum number of parallel connections: %v, +Probe Interval: %v (derived from given maximum-probes-per-second parameter), +Maximum Percentage Of Throughput For Probes: %v`, + parameters.TestTimeout, parameters.MovingAvgDist, parameters.EvalInterval, parameters.TrimmedMeanPct, + parameters.StdDevTolerance, parameters.MaxParallelConns, parameters.ProbeInterval, parameters.ProbeCapacityPct, + ) +} diff --git a/rpm/parameters_test.go b/rpm/parameters_test.go new file mode 100644 index 0000000..4a955c5 --- /dev/null +++ b/rpm/parameters_test.go @@ -0,0 +1,93 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package rpm + +import ( + "strings" + "testing" +) + +func TestSpecParametersFromArgumentsBadTimeout(t *testing.T) { + _, err := SpecParametersFromArguments(0, 0, 0, 0, 0, 0, 0, 0) + if err == nil || !strings.Contains(err.Error(), "timeout") { + t.Fatalf("0 timeout improperly allowed.") + } + _, err = SpecParametersFromArguments(-1, 0, 0, 0, 0, 0, 0, 0) + if err == nil || !strings.Contains(err.Error(), "timeout") { + t.Fatalf("negative timeout improperly allowed.") + } +} + +func TestSpecParametersFromArgumentsBadMad(t *testing.T) { + _, err := SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0) + if err == nil || !strings.Contains(err.Error(), "moving-average") { + t.Fatalf("0 mad improperly allowed.") + } + _, err = SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0) + if err == nil || !strings.Contains(err.Error(), "moving-average") { + t.Fatalf("negative mad improperly allowed.") + } +} + +func TestSpecParametersFromArgumentsBadId(t *testing.T) { + _, err := SpecParametersFromArguments(1, 1, 0, 0, 0, 0, 0, 0) + if err == nil || !strings.Contains(err.Error(), "reevaluation") { + t.Fatalf("0 id improperly allowed.") + } + _, err = SpecParametersFromArguments(1, 1, -1, 0, 0, 0, 0, 0) + if err == nil || !strings.Contains(err.Error(), "reevaluation") { + t.Fatalf("negative id improperly allowed.") + } +} + +func TestSpecParametersFromArgumentsBadSdt(t *testing.T) { + _, err := SpecParametersFromArguments(1, 1, 1, 1, -1, 0, 0, 0) + if err == nil || !strings.Contains(err.Error(), "deviation") { + t.Fatalf("0 sdt improperly allowed.") + } +} + +func TestSpecParametersFromArgumentsBadMnp(t *testing.T) { + _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 0, 0, 0) + if err == nil || !strings.Contains(err.Error(), "parallel") { + t.Fatalf("0 mnp improperly allowed.") + } + _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, -1, 0, 0) + if err == nil || !strings.Contains(err.Error(), "parallel") { + t.Fatalf("negative mnp improperly allowed.") + } +} + +func TestSpecParametersFromArgumentsBadMps(t *testing.T) { + _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 0, 0) + if err == nil || !strings.Contains(err.Error(), "probing interval") { + t.Fatalf("0 mps improperly allowed.") + } + _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, -1, 0) + if err == nil || !strings.Contains(err.Error(), "probing interval") { + t.Fatalf("negative mps improperly allowed.") + } +} + +func TestSpecParametersFromArgumentsBadPtc(t *testing.T) { + _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 0) + if err == nil || !strings.Contains(err.Error(), "capacity") { + t.Fatalf("0 ptc improperly allowed.") + } + _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, -1) + if err == nil || !strings.Contains(err.Error(), "capacity") { + t.Fatalf("negative ptc improperly allowed.") + } +} @@ -37,19 +37,22 @@ func addFlows( toAdd uint64, lgcc *lgc.LoadGeneratingConnectionCollection, lgcGenerator func() lgc.LoadGeneratingConnection, - debug debug.DebugLevel, + debugging debug.DebugLevel, ) uint64 { lgcc.Lock.Lock() defer lgcc.Lock.Unlock() for i := uint64(0); i < toAdd; i++ { // First, generate the connection. - newGenerator := lgcGenerator() - lgcc.Append(newGenerator) + newConnection := lgcGenerator() + lgcc.Append(newConnection) + if debug.IsDebug(debugging) { + fmt.Printf("Added a new %s load-generating connection.\n", newConnection.Direction()) + } // Second, try to start the connection. - if !newGenerator.Start(ctx, debug) { + if !newConnection.Start(ctx, debugging) { // If there was an error, we'll make sure that the caller knows it. fmt.Printf( - "Error starting lgc with id %d!\n", newGenerator.ClientId(), + "Error starting lgc with id %d!\n", newConnection.ClientId(), ) return i } @@ -81,49 +84,60 @@ type SelfDataCollectionResult struct { LoggingContinuation func() } -func CombinedProber( +func ResponsivenessProber( proberCtx context.Context, networkActivityCtx context.Context, foreignProbeConfigurationGenerator func() probe.ProbeConfiguration, selfProbeConfigurationGenerator func() probe.ProbeConfiguration, - selfDownProbeConnection lgc.LoadGeneratingConnection, - selfUpProbeConnection lgc.LoadGeneratingConnection, + selfProbeConnectionCollection *lgc.LoadGeneratingConnectionCollection, + probeDirection lgc.LgcDirection, probeInterval time.Duration, keyLogger io.Writer, captureExtendedStats bool, debugging *debug.DebugWithPrefix, -) (dataPoints chan probe.ProbeDataPoint) { +) (dataPoints chan utilities.Pair[*probe.ProbeDataPoint]) { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) Starting to collect responsiveness information at an interval of %v!\n", + debugging.Prefix, + probeInterval, + ) + } + // Make a channel to send back all the generated data points // when we are probing. - dataPoints = make(chan probe.ProbeDataPoint) + dataPoints = make(chan utilities.Pair[*probe.ProbeDataPoint]) go func() { wg := sync.WaitGroup{} - probeCount := 0 + probeCount := uint(0) + + dataPointsLock := sync.Mutex{} // As long as our context says that we can continue to probe! for proberCtx.Err() == nil { - time.Sleep(probeInterval) - foreignProbeConfiguration := foreignProbeConfigurationGenerator() - selfProbeConfiguration := selfProbeConfigurationGenerator() - - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "(%s) About to send round %d of probes!\n", - debugging.Prefix, - probeCount+1, - ) + // We may have slept for a very long time. So, let's check to see if we are + // still active, just for fun! + if proberCtx.Err() != nil { + break } - transport := &http.Transport{} - transport.TLSClientConfig = &tls.Config{} - transport.Proxy = http.ProxyFromEnvironment - if !utilities.IsInterfaceNil(keyLogger) { + wg.Add(1) + go func() { + defer wg.Done() + probeCount++ + probeCount := probeCount + + foreignProbeConfiguration := foreignProbeConfigurationGenerator() + selfProbeConfiguration := selfProbeConfigurationGenerator() + if debug.IsDebug(debugging.Level) { fmt.Printf( - "Using an SSL Key Logger for this foreign probe.\n", + "(%s) About to send round %d of probes!\n", + debugging.Prefix, + probeCount, ) } @@ -134,112 +148,160 @@ func CombinedProber( // depend on whether the url contains // https:// or http://: // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L74 - transport.TLSClientConfig.KeyLogWriter = keyLogger - } + transport := &http.Transport{} + transport.TLSClientConfig = &tls.Config{} + transport.Proxy = http.ProxyFromEnvironment - transport.TLSClientConfig.InsecureSkipVerify = - foreignProbeConfiguration.InsecureSkipVerify + if !utilities.IsInterfaceNil(keyLogger) { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "Using an SSL Key Logger for a foreign probe.\n", + ) + } - utilities.OverrideHostTransport(transport, - foreignProbeConfiguration.ConnectToAddr) + transport.TLSClientConfig.KeyLogWriter = keyLogger + } - foreignProbeClient := &http.Client{Transport: transport} + transport.TLSClientConfig.InsecureSkipVerify = + foreignProbeConfiguration.InsecureSkipVerify - // Start Foreign Connection Prober - probeCount++ - go probe.Probe( - networkActivityCtx, - &wg, - foreignProbeClient, - nil, - foreignProbeConfiguration.URL, - foreignProbeConfiguration.Host, - probe.Foreign, - &dataPoints, - captureExtendedStats, - debugging, - ) + utilities.OverrideHostTransport(transport, + foreignProbeConfiguration.ConnectToAddr) - // Start Self Download Connection Prober + foreignProbeClient := &http.Client{Transport: transport} - // TODO: Make the following sanity check more than just a check. - // We only want to start a SelfDown probe on a connection that is - // in the RUNNING state. - if selfDownProbeConnection.Status() == lgc.LGC_STATUS_RUNNING { - go probe.Probe( + // Start Foreign Connection Prober + foreignProbeDataPoint, err := probe.Probe( networkActivityCtx, - &wg, - selfDownProbeConnection.Client(), - selfDownProbeConnection, - selfProbeConfiguration.URL, - selfProbeConfiguration.Host, - probe.SelfDown, - &dataPoints, + foreignProbeClient, + foreignProbeConfiguration.URL, + foreignProbeConfiguration.Host, + probeDirection, + probe.Foreign, + probeCount, captureExtendedStats, debugging, ) - } else { - panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n", - debugging.Prefix, selfDownProbeConnection.Status())) - } + if err != nil { + return + } + + var selfProbeConnection *lgc.LoadGeneratingConnection = nil + func() { + selfProbeConnectionCollection.Lock.Lock() + defer selfProbeConnectionCollection.Lock.Unlock() + selfProbeConnection, err = selfProbeConnectionCollection.GetRandom() + if err != nil { + if debug.IsWarn(debugging.Level) { + fmt.Printf( + "(%s) Failed to get a random %s load-generating connection on which to send a probe: %v.\n", + debugging.Prefix, + utilities.Conditional(probeDirection == lgc.LGC_DOWN, "download", "upload"), + err, + ) + } + return + } + }() + if selfProbeConnection == nil { + return + } - // Start Self Upload Connection Prober + // TODO: Make the following sanity check more than just a check. + // We only want to start a SelfUp probe on a connection that is + // in the RUNNING state. + if (*selfProbeConnection).Status() != lgc.LGC_STATUS_RUNNING { + if debug.IsWarn(debugging.Level) { + fmt.Printf( + "(%s) The selected random %s load-generating connection on which to send a probe was not running.\n", + debugging.Prefix, + utilities.Conditional(probeDirection == lgc.LGC_DOWN, "download", "upload"), + ) + } + return + } - // TODO: Make the following sanity check more than just a check. - // We only want to start a SelfDown probe on a connection that is - // in the RUNNING state. - if selfUpProbeConnection.Status() == lgc.LGC_STATUS_RUNNING { - go probe.Probe( + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) Selected %s load-generating connection with ID %d to send a self probe with Id %d.\n", + debugging.Prefix, + utilities.Conditional(probeDirection == lgc.LGC_DOWN, "download", "upload"), + (*selfProbeConnection).ClientId(), + probeCount, + ) + } + selfProbeDataPoint, err := probe.Probe( proberCtx, - &wg, - selfUpProbeConnection.Client(), - nil, + (*selfProbeConnection).Client(), selfProbeConfiguration.URL, selfProbeConfiguration.Host, - probe.SelfUp, - &dataPoints, + probeDirection, + utilities.Conditional(probeDirection == lgc.LGC_DOWN, probe.SelfDown, probe.SelfUp), + probeCount, captureExtendedStats, debugging, ) - } else { - panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n", - debugging.Prefix, selfUpProbeConnection.Status())) - } + if err != nil { + fmt.Printf( + "(%s) There was an error sending a self probe with Id %d: %v\n", + debugging.Prefix, + probeCount, + err, + ) + return + } + + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) About to report results for round %d of probes!\n", + debugging.Prefix, + probeCount, + ) + } + + dataPointsLock.Lock() + // Now we have our four data points (three in the foreign probe data point and one in the self probe data point) + if dataPoints != nil { + dataPoints <- utilities.Pair[*probe.ProbeDataPoint]{ + First: foreignProbeDataPoint, Second: selfProbeDataPoint, + } + } + dataPointsLock.Unlock() + }() } if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) Combined probe driver is going to start waiting for its probes to finish.\n", + "(%s) Probe driver is going to start waiting for its probes to finish.\n", debugging.Prefix, ) } utilities.OrTimeout(func() { wg.Wait() }, 2*time.Second) if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) Combined probe driver is done waiting for its probes to finish.\n", + "(%s) Probe driver is done waiting for its probes to finish.\n", debugging.Prefix, ) } + dataPointsLock.Lock() close(dataPoints) + dataPoints = nil + dataPointsLock.Unlock() }() return } func LoadGenerator( + throughputCtx context.Context, // Stop our activity when we no longer need any throughput networkActivityCtx context.Context, // Create all network connections in this context. - loadGeneratorCtx context.Context, // Stop our activity when we no longer need to generate load. + generateLoadCtx context.Context, // Stop adding additional throughput when we are stable. rampupInterval time.Duration, lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection. loadGeneratingConnectionsCollection *lgc.LoadGeneratingConnectionCollection, + mnp int, captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections? debugging *debug.DebugWithPrefix, // How can we forget debugging? -) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes. - throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate. -) { - throughputCalculations = make(chan ThroughputDataPoint) - // The channel that we are going to use to send back the connection to use for probing may not immediately - // be read by the caller. We don't want to wait around until they are ready before we start doing our work. - // So, we'll make it buffered. - probeConnectionCommunicationChannel = make(chan lgc.LoadGeneratingConnection, 1) +) (stabilizerCommunicationChannel chan ThroughputDataPoint) { // Send back all the instantaneous throughputs that we generate. + stabilizerCommunicationChannel = make(chan ThroughputDataPoint) go func() { flowsCreated := uint64(0) @@ -252,32 +314,12 @@ func LoadGenerator( debugging.Level, ) - // We have at least a single load-generating channel. This channel will be the one that - // the self probes use. - go func() { - loadGeneratingConnectionsCollection.Lock.Lock() - zerothConnection, err := loadGeneratingConnectionsCollection.Get(0) - loadGeneratingConnectionsCollection.Lock.Unlock() - if err != nil { - panic("Could not get the zeroth connection!\n") - } - // We are going to wait until it is started. - if !(*zerothConnection).WaitUntilStarted(loadGeneratorCtx) { - fmt.Fprintf(os.Stderr, "Could not wait until the zeroth load-generating connection was started!\n") - return - } - // Now that it is started, we will send it back to the caller so that - // they can pass it on to the CombinedProber which will use it for the - // self probes. - probeConnectionCommunicationChannel <- *zerothConnection - }() - nextSampleStartTime := time.Now().Add(rampupInterval) for currentInterval := uint64(0); true; currentInterval++ { - // If the loadGeneratorCtx is canceled, then that means our work here is done ... - if loadGeneratorCtx.Err() != nil { + // If the throughputCtx is canceled, then that means our work here is done ... + if throughputCtx.Err() != nil { break } @@ -297,6 +339,12 @@ func LoadGenerator( } nextSampleStartTime = time.Now().Add(time.Second) + // Waiting is the hardest part -- that was a long time asleep + // and we may have been cancelled during that time! + if throughputCtx.Err() != nil { + break + } + // Compute "instantaneous aggregate" goodput which is the number of // bytes transferred within the last second. var instantaneousThroughputTotal float64 = 0 @@ -406,16 +454,50 @@ func LoadGenerator( len(*loadGeneratingConnectionsCollection.LGCs), granularThroughputDatapoints, } - throughputCalculations <- throughputDataPoint + stabilizerCommunicationChannel <- throughputDataPoint - // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now! - flowsCreated += addFlows( - networkActivityCtx, - constants.AdditiveNumberOfLoadGeneratingConnections, - loadGeneratingConnectionsCollection, - lgcGenerator, - debugging.Level, - ) + if generateLoadCtx.Err() != nil { + // No need to add additional data points because the controller told us + // that we were stable. But, we want to continue taking measurements! + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Throughput is stable; not adding any additional load-generating connections.\n", + debugging, + ) + } + continue + } + + loadGeneratingConnectionsCollection.Lock.Lock() + currentParallelConnectionCount, err := + loadGeneratingConnectionsCollection.Len() + loadGeneratingConnectionsCollection.Lock.Unlock() + + if err != nil { + if debug.IsWarn(debugging.Level) { + fmt.Printf( + "%v: Failed to get a count of the number of parallel load-generating connections: %v.\n", + debugging, + err, + ) + } + } + if currentParallelConnectionCount < mnp { + // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now! + flowsCreated += addFlows( + networkActivityCtx, + constants.AdditiveNumberOfLoadGeneratingConnections, + loadGeneratingConnectionsCollection, + lgcGenerator, + debugging.Level, + ) + } else if debug.IsWarn(debugging.Level) { + fmt.Printf( + "%v: Maximum number of parallel transport-layer connections reached (%d). Not adding another.\n", + debugging, + mnp, + ) + } } if debug.IsDebug(debugging.Level) { |
