summaryrefslogtreecommitdiff
path: root/rpm/rpm.go
diff options
context:
space:
mode:
Diffstat (limited to 'rpm/rpm.go')
-rw-r--r--rpm/rpm.go324
1 files changed, 203 insertions, 121 deletions
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 07bc787..23ed9f4 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -37,19 +37,22 @@ func addFlows(
toAdd uint64,
lgcc *lgc.LoadGeneratingConnectionCollection,
lgcGenerator func() lgc.LoadGeneratingConnection,
- debug debug.DebugLevel,
+ debugging debug.DebugLevel,
) uint64 {
lgcc.Lock.Lock()
defer lgcc.Lock.Unlock()
for i := uint64(0); i < toAdd; i++ {
// First, generate the connection.
- newGenerator := lgcGenerator()
- lgcc.Append(newGenerator)
+ newConnection := lgcGenerator()
+ lgcc.Append(newConnection)
+ if debug.IsDebug(debugging) {
+ fmt.Printf("Added a new %s load-generating connection.\n", newConnection.Direction())
+ }
// Second, try to start the connection.
- if !newGenerator.Start(ctx, debug) {
+ if !newConnection.Start(ctx, debugging) {
// If there was an error, we'll make sure that the caller knows it.
fmt.Printf(
- "Error starting lgc with id %d!\n", newGenerator.ClientId(),
+ "Error starting lgc with id %d!\n", newConnection.ClientId(),
)
return i
}
@@ -81,49 +84,60 @@ type SelfDataCollectionResult struct {
LoggingContinuation func()
}
-func CombinedProber(
+func ResponsivenessProber(
proberCtx context.Context,
networkActivityCtx context.Context,
foreignProbeConfigurationGenerator func() probe.ProbeConfiguration,
selfProbeConfigurationGenerator func() probe.ProbeConfiguration,
- selfDownProbeConnection lgc.LoadGeneratingConnection,
- selfUpProbeConnection lgc.LoadGeneratingConnection,
+ selfProbeConnectionCollection *lgc.LoadGeneratingConnectionCollection,
+ probeDirection lgc.LgcDirection,
probeInterval time.Duration,
keyLogger io.Writer,
captureExtendedStats bool,
debugging *debug.DebugWithPrefix,
-) (dataPoints chan probe.ProbeDataPoint) {
+) (dataPoints chan utilities.Pair[*probe.ProbeDataPoint]) {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) Starting to collect responsiveness information at an interval of %v!\n",
+ debugging.Prefix,
+ probeInterval,
+ )
+ }
+
// Make a channel to send back all the generated data points
// when we are probing.
- dataPoints = make(chan probe.ProbeDataPoint)
+ dataPoints = make(chan utilities.Pair[*probe.ProbeDataPoint])
go func() {
wg := sync.WaitGroup{}
- probeCount := 0
+ probeCount := uint(0)
+
+ dataPointsLock := sync.Mutex{}
// As long as our context says that we can continue to probe!
for proberCtx.Err() == nil {
-
time.Sleep(probeInterval)
- foreignProbeConfiguration := foreignProbeConfigurationGenerator()
- selfProbeConfiguration := selfProbeConfigurationGenerator()
-
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "(%s) About to send round %d of probes!\n",
- debugging.Prefix,
- probeCount+1,
- )
+ // We may have slept for a very long time. So, let's check to see if we are
+ // still active, just for fun!
+ if proberCtx.Err() != nil {
+ break
}
- transport := &http.Transport{}
- transport.TLSClientConfig = &tls.Config{}
- transport.Proxy = http.ProxyFromEnvironment
- if !utilities.IsInterfaceNil(keyLogger) {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ probeCount++
+ probeCount := probeCount
+
+ foreignProbeConfiguration := foreignProbeConfigurationGenerator()
+ selfProbeConfiguration := selfProbeConfigurationGenerator()
+
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "Using an SSL Key Logger for this foreign probe.\n",
+ "(%s) About to send round %d of probes!\n",
+ debugging.Prefix,
+ probeCount,
)
}
@@ -134,112 +148,160 @@ func CombinedProber(
// depend on whether the url contains
// https:// or http://:
// https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L74
- transport.TLSClientConfig.KeyLogWriter = keyLogger
- }
+ transport := &http.Transport{}
+ transport.TLSClientConfig = &tls.Config{}
+ transport.Proxy = http.ProxyFromEnvironment
- transport.TLSClientConfig.InsecureSkipVerify =
- foreignProbeConfiguration.InsecureSkipVerify
+ if !utilities.IsInterfaceNil(keyLogger) {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "Using an SSL Key Logger for a foreign probe.\n",
+ )
+ }
- utilities.OverrideHostTransport(transport,
- foreignProbeConfiguration.ConnectToAddr)
+ transport.TLSClientConfig.KeyLogWriter = keyLogger
+ }
- foreignProbeClient := &http.Client{Transport: transport}
+ transport.TLSClientConfig.InsecureSkipVerify =
+ foreignProbeConfiguration.InsecureSkipVerify
- // Start Foreign Connection Prober
- probeCount++
- go probe.Probe(
- networkActivityCtx,
- &wg,
- foreignProbeClient,
- nil,
- foreignProbeConfiguration.URL,
- foreignProbeConfiguration.Host,
- probe.Foreign,
- &dataPoints,
- captureExtendedStats,
- debugging,
- )
+ utilities.OverrideHostTransport(transport,
+ foreignProbeConfiguration.ConnectToAddr)
- // Start Self Download Connection Prober
+ foreignProbeClient := &http.Client{Transport: transport}
- // TODO: Make the following sanity check more than just a check.
- // We only want to start a SelfDown probe on a connection that is
- // in the RUNNING state.
- if selfDownProbeConnection.Status() == lgc.LGC_STATUS_RUNNING {
- go probe.Probe(
+ // Start Foreign Connection Prober
+ foreignProbeDataPoint, err := probe.Probe(
networkActivityCtx,
- &wg,
- selfDownProbeConnection.Client(),
- selfDownProbeConnection,
- selfProbeConfiguration.URL,
- selfProbeConfiguration.Host,
- probe.SelfDown,
- &dataPoints,
+ foreignProbeClient,
+ foreignProbeConfiguration.URL,
+ foreignProbeConfiguration.Host,
+ probeDirection,
+ probe.Foreign,
+ probeCount,
captureExtendedStats,
debugging,
)
- } else {
- panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n",
- debugging.Prefix, selfDownProbeConnection.Status()))
- }
+ if err != nil {
+ return
+ }
+
+ var selfProbeConnection *lgc.LoadGeneratingConnection = nil
+ func() {
+ selfProbeConnectionCollection.Lock.Lock()
+ defer selfProbeConnectionCollection.Lock.Unlock()
+ selfProbeConnection, err = selfProbeConnectionCollection.GetRandom()
+ if err != nil {
+ if debug.IsWarn(debugging.Level) {
+ fmt.Printf(
+ "(%s) Failed to get a random %s load-generating connection on which to send a probe: %v.\n",
+ debugging.Prefix,
+ utilities.Conditional(probeDirection == lgc.LGC_DOWN, "download", "upload"),
+ err,
+ )
+ }
+ return
+ }
+ }()
+ if selfProbeConnection == nil {
+ return
+ }
- // Start Self Upload Connection Prober
+ // TODO: Make the following sanity check more than just a check.
+ // We only want to start a SelfUp probe on a connection that is
+ // in the RUNNING state.
+ if (*selfProbeConnection).Status() != lgc.LGC_STATUS_RUNNING {
+ if debug.IsWarn(debugging.Level) {
+ fmt.Printf(
+ "(%s) The selected random %s load-generating connection on which to send a probe was not running.\n",
+ debugging.Prefix,
+ utilities.Conditional(probeDirection == lgc.LGC_DOWN, "download", "upload"),
+ )
+ }
+ return
+ }
- // TODO: Make the following sanity check more than just a check.
- // We only want to start a SelfDown probe on a connection that is
- // in the RUNNING state.
- if selfUpProbeConnection.Status() == lgc.LGC_STATUS_RUNNING {
- go probe.Probe(
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) Selected %s load-generating connection with ID %d to send a self probe with Id %d.\n",
+ debugging.Prefix,
+ utilities.Conditional(probeDirection == lgc.LGC_DOWN, "download", "upload"),
+ (*selfProbeConnection).ClientId(),
+ probeCount,
+ )
+ }
+ selfProbeDataPoint, err := probe.Probe(
proberCtx,
- &wg,
- selfUpProbeConnection.Client(),
- nil,
+ (*selfProbeConnection).Client(),
selfProbeConfiguration.URL,
selfProbeConfiguration.Host,
- probe.SelfUp,
- &dataPoints,
+ probeDirection,
+ utilities.Conditional(probeDirection == lgc.LGC_DOWN, probe.SelfDown, probe.SelfUp),
+ probeCount,
captureExtendedStats,
debugging,
)
- } else {
- panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n",
- debugging.Prefix, selfUpProbeConnection.Status()))
- }
+ if err != nil {
+ fmt.Printf(
+ "(%s) There was an error sending a self probe with Id %d: %v\n",
+ debugging.Prefix,
+ probeCount,
+ err,
+ )
+ return
+ }
+
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) About to report results for round %d of probes!\n",
+ debugging.Prefix,
+ probeCount,
+ )
+ }
+
+ dataPointsLock.Lock()
+ // Now we have our four data points (three in the foreign probe data point and one in the self probe data point)
+ if dataPoints != nil {
+ dataPoints <- utilities.Pair[*probe.ProbeDataPoint]{
+ First: foreignProbeDataPoint, Second: selfProbeDataPoint,
+ }
+ }
+ dataPointsLock.Unlock()
+ }()
}
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) Combined probe driver is going to start waiting for its probes to finish.\n",
+ "(%s) Probe driver is going to start waiting for its probes to finish.\n",
debugging.Prefix,
)
}
utilities.OrTimeout(func() { wg.Wait() }, 2*time.Second)
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) Combined probe driver is done waiting for its probes to finish.\n",
+ "(%s) Probe driver is done waiting for its probes to finish.\n",
debugging.Prefix,
)
}
+ dataPointsLock.Lock()
close(dataPoints)
+ dataPoints = nil
+ dataPointsLock.Unlock()
}()
return
}
func LoadGenerator(
+ throughputCtx context.Context, // Stop our activity when we no longer need any throughput
networkActivityCtx context.Context, // Create all network connections in this context.
- loadGeneratorCtx context.Context, // Stop our activity when we no longer need to generate load.
+ generateLoadCtx context.Context, // Stop adding additional throughput when we are stable.
rampupInterval time.Duration,
lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection.
loadGeneratingConnectionsCollection *lgc.LoadGeneratingConnectionCollection,
+ mnp int,
captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections?
debugging *debug.DebugWithPrefix, // How can we forget debugging?
-) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes.
- throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate.
-) {
- throughputCalculations = make(chan ThroughputDataPoint)
- // The channel that we are going to use to send back the connection to use for probing may not immediately
- // be read by the caller. We don't want to wait around until they are ready before we start doing our work.
- // So, we'll make it buffered.
- probeConnectionCommunicationChannel = make(chan lgc.LoadGeneratingConnection, 1)
+) (stabilizerCommunicationChannel chan ThroughputDataPoint) { // Send back all the instantaneous throughputs that we generate.
+ stabilizerCommunicationChannel = make(chan ThroughputDataPoint)
go func() {
flowsCreated := uint64(0)
@@ -252,32 +314,12 @@ func LoadGenerator(
debugging.Level,
)
- // We have at least a single load-generating channel. This channel will be the one that
- // the self probes use.
- go func() {
- loadGeneratingConnectionsCollection.Lock.Lock()
- zerothConnection, err := loadGeneratingConnectionsCollection.Get(0)
- loadGeneratingConnectionsCollection.Lock.Unlock()
- if err != nil {
- panic("Could not get the zeroth connection!\n")
- }
- // We are going to wait until it is started.
- if !(*zerothConnection).WaitUntilStarted(loadGeneratorCtx) {
- fmt.Fprintf(os.Stderr, "Could not wait until the zeroth load-generating connection was started!\n")
- return
- }
- // Now that it is started, we will send it back to the caller so that
- // they can pass it on to the CombinedProber which will use it for the
- // self probes.
- probeConnectionCommunicationChannel <- *zerothConnection
- }()
-
nextSampleStartTime := time.Now().Add(rampupInterval)
for currentInterval := uint64(0); true; currentInterval++ {
- // If the loadGeneratorCtx is canceled, then that means our work here is done ...
- if loadGeneratorCtx.Err() != nil {
+ // If the throughputCtx is canceled, then that means our work here is done ...
+ if throughputCtx.Err() != nil {
break
}
@@ -297,6 +339,12 @@ func LoadGenerator(
}
nextSampleStartTime = time.Now().Add(time.Second)
+ // Waiting is the hardest part -- that was a long time asleep
+ // and we may have been cancelled during that time!
+ if throughputCtx.Err() != nil {
+ break
+ }
+
// Compute "instantaneous aggregate" goodput which is the number of
// bytes transferred within the last second.
var instantaneousThroughputTotal float64 = 0
@@ -406,16 +454,50 @@ func LoadGenerator(
len(*loadGeneratingConnectionsCollection.LGCs),
granularThroughputDatapoints,
}
- throughputCalculations <- throughputDataPoint
+ stabilizerCommunicationChannel <- throughputDataPoint
- // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now!
- flowsCreated += addFlows(
- networkActivityCtx,
- constants.AdditiveNumberOfLoadGeneratingConnections,
- loadGeneratingConnectionsCollection,
- lgcGenerator,
- debugging.Level,
- )
+ if generateLoadCtx.Err() != nil {
+ // No need to add additional data points because the controller told us
+ // that we were stable. But, we want to continue taking measurements!
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: Throughput is stable; not adding any additional load-generating connections.\n",
+ debugging,
+ )
+ }
+ continue
+ }
+
+ loadGeneratingConnectionsCollection.Lock.Lock()
+ currentParallelConnectionCount, err :=
+ loadGeneratingConnectionsCollection.Len()
+ loadGeneratingConnectionsCollection.Lock.Unlock()
+
+ if err != nil {
+ if debug.IsWarn(debugging.Level) {
+ fmt.Printf(
+ "%v: Failed to get a count of the number of parallel load-generating connections: %v.\n",
+ debugging,
+ err,
+ )
+ }
+ }
+ if currentParallelConnectionCount < mnp {
+ // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now!
+ flowsCreated += addFlows(
+ networkActivityCtx,
+ constants.AdditiveNumberOfLoadGeneratingConnections,
+ loadGeneratingConnectionsCollection,
+ lgcGenerator,
+ debugging.Level,
+ )
+ } else if debug.IsWarn(debugging.Level) {
+ fmt.Printf(
+ "%v: Maximum number of parallel transport-layer connections reached (%d). Not adding another.\n",
+ debugging,
+ mnp,
+ )
+ }
}
if debug.IsDebug(debugging.Level) {