summaryrefslogtreecommitdiff
path: root/rpm
diff options
context:
space:
mode:
Diffstat (limited to 'rpm')
-rw-r--r--rpm/calculations.go94
-rw-r--r--rpm/parameters.go85
-rw-r--r--rpm/parameters_test.go93
-rw-r--r--rpm/rpm.go324
4 files changed, 475 insertions, 121 deletions
diff --git a/rpm/calculations.go b/rpm/calculations.go
new file mode 100644
index 0000000..5387aa7
--- /dev/null
+++ b/rpm/calculations.go
@@ -0,0 +1,94 @@
+/*
+ * This file is part of Go Responsiveness.
+ *
+ * Go Responsiveness is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software Foundation,
+ * either version 2 of the License, or (at your option) any later version.
+ * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package rpm
+
+import (
+ "fmt"
+
+ "github.com/network-quality/goresponsiveness/ms"
+)
+
+type Rpm struct {
+ SelfRttsTotal int
+ ForeignRttsTotal int
+ SelfRttsTrimmed int
+ ForeignRttsTrimmed int
+ SelfProbeRttPN float64
+ ForeignProbeRttPN float64
+ SelfProbeRttMean float64
+ ForeignProbeRttMean float64
+ PNRpm float64
+ MeanRpm float64
+}
+
+func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.MathematicalSeries[float64], trimming uint, percentile int) Rpm {
+ // First, let's do a double-sided trim of the top/bottom 10% of our measurements.
+ selfRttsTotalCount := selfRtts.Len()
+ foreignRttsTotalCount := foreignRtts.Len()
+
+ selfRttsTrimmed := selfRtts.DoubleSidedTrim(trimming)
+ foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(trimming)
+
+ selfRttsTrimmedCount := selfRttsTrimmed.Len()
+ foreignRttsTrimmedCount := foreignRttsTrimmed.Len()
+
+ // Then, let's take the mean of those ...
+ selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage()
+ foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage()
+
+ // Second, let's do the P90 calculations.
+ selfProbeRoundTripTimePN := selfRtts.Percentile(percentile)
+ foreignProbeRoundTripTimePN := foreignRtts.Percentile(percentile)
+
+ // Note: The specification indicates that we want to calculate the foreign probes as such:
+ // 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign
+ // where tcp_foreign, tls_foreign, http_foreign are the P90 RTTs for the connection
+ // of the tcp, tls and http connections, respectively. However, we cannot break out
+ // the individual RTTs so we assume that they are roughly equal.
+
+ // This is 60 because we measure in seconds not ms
+ pnRpm := 60.0 / (float64(selfProbeRoundTripTimePN+foreignProbeRoundTripTimePN) / 2.0)
+ meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0)
+
+ return Rpm{
+ SelfRttsTotal: selfRttsTotalCount, ForeignRttsTotal: foreignRttsTotalCount,
+ SelfRttsTrimmed: selfRttsTrimmedCount, ForeignRttsTrimmed: foreignRttsTrimmedCount,
+ SelfProbeRttPN: selfProbeRoundTripTimePN, ForeignProbeRttPN: foreignProbeRoundTripTimePN,
+ SelfProbeRttMean: selfProbeRoundTripTimeMean, ForeignProbeRttMean: foreignProbeRoundTripTimeMean,
+ PNRpm: pnRpm, MeanRpm: meanRpm,
+ }
+}
+
+func (rpm *Rpm) ToString() string {
+ return fmt.Sprintf(
+ `Total Self Probes: %d
+Total Foreign Probes: %d
+Trimmed Self Probes Count: %d
+Trimmed Foreign Probes Count: %d
+P90 Self RTT: %f
+P90 Foreign RTT: %f
+Trimmed Mean Self RTT: %f
+Trimmed Mean Foreign RTT: %f
+`,
+ rpm.SelfRttsTotal,
+ rpm.ForeignRttsTotal,
+ rpm.SelfRttsTrimmed,
+ rpm.ForeignRttsTrimmed,
+ rpm.SelfProbeRttPN,
+ rpm.ForeignProbeRttPN,
+ rpm.SelfProbeRttMean,
+ rpm.ForeignProbeRttMean,
+ )
+}
diff --git a/rpm/parameters.go b/rpm/parameters.go
new file mode 100644
index 0000000..aff8639
--- /dev/null
+++ b/rpm/parameters.go
@@ -0,0 +1,85 @@
+/*
+ * This file is part of Go Responsiveness.
+ *
+ * Go Responsiveness is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software Foundation,
+ * either version 2 of the License, or (at your option) any later version.
+ * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package rpm
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/network-quality/goresponsiveness/utilities"
+)
+
+type SpecParameters struct {
+ TestTimeout time.Duration // Total test time.
+ MovingAvgDist int
+ EvalInterval time.Duration // How often to reevaluate network conditions.
+ TrimmedMeanPct uint
+ StdDevTolerance float64
+ MaxParallelConns int
+ ProbeInterval time.Duration
+ ProbeCapacityPct float64
+}
+
+func SpecParametersFromArguments(timeout int, mad int, id int, tmp uint, sdt float64, mnp int, mps int, ptc float64) (*SpecParameters, error) {
+ if timeout <= 0 {
+ return nil, fmt.Errorf("cannot specify a 0 or negative timeout for the test")
+ }
+ if mad <= 0 {
+ return nil, fmt.Errorf("cannot specify a 0 or negative moving-average distance for the test")
+ }
+ if id <= 0 {
+ return nil, fmt.Errorf("cannot specify a 0 or negative reevaluation interval for the test")
+ }
+ if tmp < 0 {
+ return nil, fmt.Errorf("cannot specify a negative trimming percentage for the test")
+ }
+ if sdt < 0 {
+ return nil, fmt.Errorf("cannot specify a negative standard-deviation tolerance for the test")
+ }
+ if mnp <= 0 {
+ return nil, fmt.Errorf("cannot specify a 0 or negative maximum number of parallel connections for the test")
+ }
+ if mps <= 0 {
+ return nil, fmt.Errorf("cannot specify a 0 or negative probing interval for the test")
+ }
+ if ptc <= 0 {
+ return nil, fmt.Errorf("cannot specify a 0 or negative probe capacity for the test")
+ }
+ testTimeout := time.Second * time.Duration(timeout)
+ evalInterval := time.Second * time.Duration(id)
+ probeInterval := utilities.PerSecondToInterval(int64(mps))
+
+ params := SpecParameters{
+ TestTimeout: testTimeout, MovingAvgDist: mad,
+ EvalInterval: evalInterval, TrimmedMeanPct: tmp, StdDevTolerance: sdt,
+ MaxParallelConns: mnp, ProbeInterval: probeInterval, ProbeCapacityPct: ptc,
+ }
+ return &params, nil
+}
+
+func (parameters *SpecParameters) ToString() string {
+ return fmt.Sprintf(
+ `Timeout: %v,
+Moving-Average Distance: %v,
+Interval Duration: %v,
+Trimmed-Mean Percentage: %v,
+Standard-Deviation Tolerance: %v,
+Maximum number of parallel connections: %v,
+Probe Interval: %v (derived from given maximum-probes-per-second parameter),
+Maximum Percentage Of Throughput For Probes: %v`,
+ parameters.TestTimeout, parameters.MovingAvgDist, parameters.EvalInterval, parameters.TrimmedMeanPct,
+ parameters.StdDevTolerance, parameters.MaxParallelConns, parameters.ProbeInterval, parameters.ProbeCapacityPct,
+ )
+}
diff --git a/rpm/parameters_test.go b/rpm/parameters_test.go
new file mode 100644
index 0000000..4a955c5
--- /dev/null
+++ b/rpm/parameters_test.go
@@ -0,0 +1,93 @@
+/*
+ * This file is part of Go Responsiveness.
+ *
+ * Go Responsiveness is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software Foundation,
+ * either version 2 of the License, or (at your option) any later version.
+ * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package rpm
+
+import (
+ "strings"
+ "testing"
+)
+
+func TestSpecParametersFromArgumentsBadTimeout(t *testing.T) {
+ _, err := SpecParametersFromArguments(0, 0, 0, 0, 0, 0, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "timeout") {
+ t.Fatalf("0 timeout improperly allowed.")
+ }
+ _, err = SpecParametersFromArguments(-1, 0, 0, 0, 0, 0, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "timeout") {
+ t.Fatalf("negative timeout improperly allowed.")
+ }
+}
+
+func TestSpecParametersFromArgumentsBadMad(t *testing.T) {
+ _, err := SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "moving-average") {
+ t.Fatalf("0 mad improperly allowed.")
+ }
+ _, err = SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "moving-average") {
+ t.Fatalf("negative mad improperly allowed.")
+ }
+}
+
+func TestSpecParametersFromArgumentsBadId(t *testing.T) {
+ _, err := SpecParametersFromArguments(1, 1, 0, 0, 0, 0, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "reevaluation") {
+ t.Fatalf("0 id improperly allowed.")
+ }
+ _, err = SpecParametersFromArguments(1, 1, -1, 0, 0, 0, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "reevaluation") {
+ t.Fatalf("negative id improperly allowed.")
+ }
+}
+
+func TestSpecParametersFromArgumentsBadSdt(t *testing.T) {
+ _, err := SpecParametersFromArguments(1, 1, 1, 1, -1, 0, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "deviation") {
+ t.Fatalf("0 sdt improperly allowed.")
+ }
+}
+
+func TestSpecParametersFromArgumentsBadMnp(t *testing.T) {
+ _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 0, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "parallel") {
+ t.Fatalf("0 mnp improperly allowed.")
+ }
+ _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, -1, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "parallel") {
+ t.Fatalf("negative mnp improperly allowed.")
+ }
+}
+
+func TestSpecParametersFromArgumentsBadMps(t *testing.T) {
+ _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "probing interval") {
+ t.Fatalf("0 mps improperly allowed.")
+ }
+ _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, -1, 0)
+ if err == nil || !strings.Contains(err.Error(), "probing interval") {
+ t.Fatalf("negative mps improperly allowed.")
+ }
+}
+
+func TestSpecParametersFromArgumentsBadPtc(t *testing.T) {
+ _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 0)
+ if err == nil || !strings.Contains(err.Error(), "capacity") {
+ t.Fatalf("0 ptc improperly allowed.")
+ }
+ _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, -1)
+ if err == nil || !strings.Contains(err.Error(), "capacity") {
+ t.Fatalf("negative ptc improperly allowed.")
+ }
+}
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 07bc787..23ed9f4 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -37,19 +37,22 @@ func addFlows(
toAdd uint64,
lgcc *lgc.LoadGeneratingConnectionCollection,
lgcGenerator func() lgc.LoadGeneratingConnection,
- debug debug.DebugLevel,
+ debugging debug.DebugLevel,
) uint64 {
lgcc.Lock.Lock()
defer lgcc.Lock.Unlock()
for i := uint64(0); i < toAdd; i++ {
// First, generate the connection.
- newGenerator := lgcGenerator()
- lgcc.Append(newGenerator)
+ newConnection := lgcGenerator()
+ lgcc.Append(newConnection)
+ if debug.IsDebug(debugging) {
+ fmt.Printf("Added a new %s load-generating connection.\n", newConnection.Direction())
+ }
// Second, try to start the connection.
- if !newGenerator.Start(ctx, debug) {
+ if !newConnection.Start(ctx, debugging) {
// If there was an error, we'll make sure that the caller knows it.
fmt.Printf(
- "Error starting lgc with id %d!\n", newGenerator.ClientId(),
+ "Error starting lgc with id %d!\n", newConnection.ClientId(),
)
return i
}
@@ -81,49 +84,60 @@ type SelfDataCollectionResult struct {
LoggingContinuation func()
}
-func CombinedProber(
+func ResponsivenessProber(
proberCtx context.Context,
networkActivityCtx context.Context,
foreignProbeConfigurationGenerator func() probe.ProbeConfiguration,
selfProbeConfigurationGenerator func() probe.ProbeConfiguration,
- selfDownProbeConnection lgc.LoadGeneratingConnection,
- selfUpProbeConnection lgc.LoadGeneratingConnection,
+ selfProbeConnectionCollection *lgc.LoadGeneratingConnectionCollection,
+ probeDirection lgc.LgcDirection,
probeInterval time.Duration,
keyLogger io.Writer,
captureExtendedStats bool,
debugging *debug.DebugWithPrefix,
-) (dataPoints chan probe.ProbeDataPoint) {
+) (dataPoints chan utilities.Pair[*probe.ProbeDataPoint]) {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) Starting to collect responsiveness information at an interval of %v!\n",
+ debugging.Prefix,
+ probeInterval,
+ )
+ }
+
// Make a channel to send back all the generated data points
// when we are probing.
- dataPoints = make(chan probe.ProbeDataPoint)
+ dataPoints = make(chan utilities.Pair[*probe.ProbeDataPoint])
go func() {
wg := sync.WaitGroup{}
- probeCount := 0
+ probeCount := uint(0)
+
+ dataPointsLock := sync.Mutex{}
// As long as our context says that we can continue to probe!
for proberCtx.Err() == nil {
-
time.Sleep(probeInterval)
- foreignProbeConfiguration := foreignProbeConfigurationGenerator()
- selfProbeConfiguration := selfProbeConfigurationGenerator()
-
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "(%s) About to send round %d of probes!\n",
- debugging.Prefix,
- probeCount+1,
- )
+ // We may have slept for a very long time. So, let's check to see if we are
+ // still active, just for fun!
+ if proberCtx.Err() != nil {
+ break
}
- transport := &http.Transport{}
- transport.TLSClientConfig = &tls.Config{}
- transport.Proxy = http.ProxyFromEnvironment
- if !utilities.IsInterfaceNil(keyLogger) {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ probeCount++
+ probeCount := probeCount
+
+ foreignProbeConfiguration := foreignProbeConfigurationGenerator()
+ selfProbeConfiguration := selfProbeConfigurationGenerator()
+
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "Using an SSL Key Logger for this foreign probe.\n",
+ "(%s) About to send round %d of probes!\n",
+ debugging.Prefix,
+ probeCount,
)
}
@@ -134,112 +148,160 @@ func CombinedProber(
// depend on whether the url contains
// https:// or http://:
// https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L74
- transport.TLSClientConfig.KeyLogWriter = keyLogger
- }
+ transport := &http.Transport{}
+ transport.TLSClientConfig = &tls.Config{}
+ transport.Proxy = http.ProxyFromEnvironment
- transport.TLSClientConfig.InsecureSkipVerify =
- foreignProbeConfiguration.InsecureSkipVerify
+ if !utilities.IsInterfaceNil(keyLogger) {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "Using an SSL Key Logger for a foreign probe.\n",
+ )
+ }
- utilities.OverrideHostTransport(transport,
- foreignProbeConfiguration.ConnectToAddr)
+ transport.TLSClientConfig.KeyLogWriter = keyLogger
+ }
- foreignProbeClient := &http.Client{Transport: transport}
+ transport.TLSClientConfig.InsecureSkipVerify =
+ foreignProbeConfiguration.InsecureSkipVerify
- // Start Foreign Connection Prober
- probeCount++
- go probe.Probe(
- networkActivityCtx,
- &wg,
- foreignProbeClient,
- nil,
- foreignProbeConfiguration.URL,
- foreignProbeConfiguration.Host,
- probe.Foreign,
- &dataPoints,
- captureExtendedStats,
- debugging,
- )
+ utilities.OverrideHostTransport(transport,
+ foreignProbeConfiguration.ConnectToAddr)
- // Start Self Download Connection Prober
+ foreignProbeClient := &http.Client{Transport: transport}
- // TODO: Make the following sanity check more than just a check.
- // We only want to start a SelfDown probe on a connection that is
- // in the RUNNING state.
- if selfDownProbeConnection.Status() == lgc.LGC_STATUS_RUNNING {
- go probe.Probe(
+ // Start Foreign Connection Prober
+ foreignProbeDataPoint, err := probe.Probe(
networkActivityCtx,
- &wg,
- selfDownProbeConnection.Client(),
- selfDownProbeConnection,
- selfProbeConfiguration.URL,
- selfProbeConfiguration.Host,
- probe.SelfDown,
- &dataPoints,
+ foreignProbeClient,
+ foreignProbeConfiguration.URL,
+ foreignProbeConfiguration.Host,
+ probeDirection,
+ probe.Foreign,
+ probeCount,
captureExtendedStats,
debugging,
)
- } else {
- panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n",
- debugging.Prefix, selfDownProbeConnection.Status()))
- }
+ if err != nil {
+ return
+ }
+
+ var selfProbeConnection *lgc.LoadGeneratingConnection = nil
+ func() {
+ selfProbeConnectionCollection.Lock.Lock()
+ defer selfProbeConnectionCollection.Lock.Unlock()
+ selfProbeConnection, err = selfProbeConnectionCollection.GetRandom()
+ if err != nil {
+ if debug.IsWarn(debugging.Level) {
+ fmt.Printf(
+ "(%s) Failed to get a random %s load-generating connection on which to send a probe: %v.\n",
+ debugging.Prefix,
+ utilities.Conditional(probeDirection == lgc.LGC_DOWN, "download", "upload"),
+ err,
+ )
+ }
+ return
+ }
+ }()
+ if selfProbeConnection == nil {
+ return
+ }
- // Start Self Upload Connection Prober
+ // TODO: Make the following sanity check more than just a check.
+ // We only want to start a SelfUp probe on a connection that is
+ // in the RUNNING state.
+ if (*selfProbeConnection).Status() != lgc.LGC_STATUS_RUNNING {
+ if debug.IsWarn(debugging.Level) {
+ fmt.Printf(
+ "(%s) The selected random %s load-generating connection on which to send a probe was not running.\n",
+ debugging.Prefix,
+ utilities.Conditional(probeDirection == lgc.LGC_DOWN, "download", "upload"),
+ )
+ }
+ return
+ }
- // TODO: Make the following sanity check more than just a check.
- // We only want to start a SelfDown probe on a connection that is
- // in the RUNNING state.
- if selfUpProbeConnection.Status() == lgc.LGC_STATUS_RUNNING {
- go probe.Probe(
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) Selected %s load-generating connection with ID %d to send a self probe with Id %d.\n",
+ debugging.Prefix,
+ utilities.Conditional(probeDirection == lgc.LGC_DOWN, "download", "upload"),
+ (*selfProbeConnection).ClientId(),
+ probeCount,
+ )
+ }
+ selfProbeDataPoint, err := probe.Probe(
proberCtx,
- &wg,
- selfUpProbeConnection.Client(),
- nil,
+ (*selfProbeConnection).Client(),
selfProbeConfiguration.URL,
selfProbeConfiguration.Host,
- probe.SelfUp,
- &dataPoints,
+ probeDirection,
+ utilities.Conditional(probeDirection == lgc.LGC_DOWN, probe.SelfDown, probe.SelfUp),
+ probeCount,
captureExtendedStats,
debugging,
)
- } else {
- panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n",
- debugging.Prefix, selfUpProbeConnection.Status()))
- }
+ if err != nil {
+ fmt.Printf(
+ "(%s) There was an error sending a self probe with Id %d: %v\n",
+ debugging.Prefix,
+ probeCount,
+ err,
+ )
+ return
+ }
+
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) About to report results for round %d of probes!\n",
+ debugging.Prefix,
+ probeCount,
+ )
+ }
+
+ dataPointsLock.Lock()
+ // Now we have our four data points (three in the foreign probe data point and one in the self probe data point)
+ if dataPoints != nil {
+ dataPoints <- utilities.Pair[*probe.ProbeDataPoint]{
+ First: foreignProbeDataPoint, Second: selfProbeDataPoint,
+ }
+ }
+ dataPointsLock.Unlock()
+ }()
}
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) Combined probe driver is going to start waiting for its probes to finish.\n",
+ "(%s) Probe driver is going to start waiting for its probes to finish.\n",
debugging.Prefix,
)
}
utilities.OrTimeout(func() { wg.Wait() }, 2*time.Second)
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) Combined probe driver is done waiting for its probes to finish.\n",
+ "(%s) Probe driver is done waiting for its probes to finish.\n",
debugging.Prefix,
)
}
+ dataPointsLock.Lock()
close(dataPoints)
+ dataPoints = nil
+ dataPointsLock.Unlock()
}()
return
}
func LoadGenerator(
+ throughputCtx context.Context, // Stop our activity when we no longer need any throughput
networkActivityCtx context.Context, // Create all network connections in this context.
- loadGeneratorCtx context.Context, // Stop our activity when we no longer need to generate load.
+ generateLoadCtx context.Context, // Stop adding additional throughput when we are stable.
rampupInterval time.Duration,
lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection.
loadGeneratingConnectionsCollection *lgc.LoadGeneratingConnectionCollection,
+ mnp int,
captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections?
debugging *debug.DebugWithPrefix, // How can we forget debugging?
-) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes.
- throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate.
-) {
- throughputCalculations = make(chan ThroughputDataPoint)
- // The channel that we are going to use to send back the connection to use for probing may not immediately
- // be read by the caller. We don't want to wait around until they are ready before we start doing our work.
- // So, we'll make it buffered.
- probeConnectionCommunicationChannel = make(chan lgc.LoadGeneratingConnection, 1)
+) (stabilizerCommunicationChannel chan ThroughputDataPoint) { // Send back all the instantaneous throughputs that we generate.
+ stabilizerCommunicationChannel = make(chan ThroughputDataPoint)
go func() {
flowsCreated := uint64(0)
@@ -252,32 +314,12 @@ func LoadGenerator(
debugging.Level,
)
- // We have at least a single load-generating channel. This channel will be the one that
- // the self probes use.
- go func() {
- loadGeneratingConnectionsCollection.Lock.Lock()
- zerothConnection, err := loadGeneratingConnectionsCollection.Get(0)
- loadGeneratingConnectionsCollection.Lock.Unlock()
- if err != nil {
- panic("Could not get the zeroth connection!\n")
- }
- // We are going to wait until it is started.
- if !(*zerothConnection).WaitUntilStarted(loadGeneratorCtx) {
- fmt.Fprintf(os.Stderr, "Could not wait until the zeroth load-generating connection was started!\n")
- return
- }
- // Now that it is started, we will send it back to the caller so that
- // they can pass it on to the CombinedProber which will use it for the
- // self probes.
- probeConnectionCommunicationChannel <- *zerothConnection
- }()
-
nextSampleStartTime := time.Now().Add(rampupInterval)
for currentInterval := uint64(0); true; currentInterval++ {
- // If the loadGeneratorCtx is canceled, then that means our work here is done ...
- if loadGeneratorCtx.Err() != nil {
+ // If the throughputCtx is canceled, then that means our work here is done ...
+ if throughputCtx.Err() != nil {
break
}
@@ -297,6 +339,12 @@ func LoadGenerator(
}
nextSampleStartTime = time.Now().Add(time.Second)
+ // Waiting is the hardest part -- that was a long time asleep
+ // and we may have been cancelled during that time!
+ if throughputCtx.Err() != nil {
+ break
+ }
+
// Compute "instantaneous aggregate" goodput which is the number of
// bytes transferred within the last second.
var instantaneousThroughputTotal float64 = 0
@@ -406,16 +454,50 @@ func LoadGenerator(
len(*loadGeneratingConnectionsCollection.LGCs),
granularThroughputDatapoints,
}
- throughputCalculations <- throughputDataPoint
+ stabilizerCommunicationChannel <- throughputDataPoint
- // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now!
- flowsCreated += addFlows(
- networkActivityCtx,
- constants.AdditiveNumberOfLoadGeneratingConnections,
- loadGeneratingConnectionsCollection,
- lgcGenerator,
- debugging.Level,
- )
+ if generateLoadCtx.Err() != nil {
+ // No need to add additional data points because the controller told us
+ // that we were stable. But, we want to continue taking measurements!
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: Throughput is stable; not adding any additional load-generating connections.\n",
+ debugging,
+ )
+ }
+ continue
+ }
+
+ loadGeneratingConnectionsCollection.Lock.Lock()
+ currentParallelConnectionCount, err :=
+ loadGeneratingConnectionsCollection.Len()
+ loadGeneratingConnectionsCollection.Lock.Unlock()
+
+ if err != nil {
+ if debug.IsWarn(debugging.Level) {
+ fmt.Printf(
+ "%v: Failed to get a count of the number of parallel load-generating connections: %v.\n",
+ debugging,
+ err,
+ )
+ }
+ }
+ if currentParallelConnectionCount < mnp {
+ // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now!
+ flowsCreated += addFlows(
+ networkActivityCtx,
+ constants.AdditiveNumberOfLoadGeneratingConnections,
+ loadGeneratingConnectionsCollection,
+ lgcGenerator,
+ debugging.Level,
+ )
+ } else if debug.IsWarn(debugging.Level) {
+ fmt.Printf(
+ "%v: Maximum number of parallel transport-layer connections reached (%d). Not adding another.\n",
+ debugging,
+ mnp,
+ )
+ }
}
if debug.IsDebug(debugging.Level) {