diff options
Diffstat (limited to 'rpm/rpm.go')
| -rw-r--r-- | rpm/rpm.go | 324 |
1 files changed, 203 insertions, 121 deletions
@@ -37,19 +37,22 @@ func addFlows( toAdd uint64, lgcc *lgc.LoadGeneratingConnectionCollection, lgcGenerator func() lgc.LoadGeneratingConnection, - debug debug.DebugLevel, + debugging debug.DebugLevel, ) uint64 { lgcc.Lock.Lock() defer lgcc.Lock.Unlock() for i := uint64(0); i < toAdd; i++ { // First, generate the connection. - newGenerator := lgcGenerator() - lgcc.Append(newGenerator) + newConnection := lgcGenerator() + lgcc.Append(newConnection) + if debug.IsDebug(debugging) { + fmt.Printf("Added a new %s load-generating connection.\n", newConnection.Direction()) + } // Second, try to start the connection. - if !newGenerator.Start(ctx, debug) { + if !newConnection.Start(ctx, debugging) { // If there was an error, we'll make sure that the caller knows it. fmt.Printf( - "Error starting lgc with id %d!\n", newGenerator.ClientId(), + "Error starting lgc with id %d!\n", newConnection.ClientId(), ) return i } @@ -81,49 +84,60 @@ type SelfDataCollectionResult struct { LoggingContinuation func() } -func CombinedProber( +func ResponsivenessProber( proberCtx context.Context, networkActivityCtx context.Context, foreignProbeConfigurationGenerator func() probe.ProbeConfiguration, selfProbeConfigurationGenerator func() probe.ProbeConfiguration, - selfDownProbeConnection lgc.LoadGeneratingConnection, - selfUpProbeConnection lgc.LoadGeneratingConnection, + selfProbeConnectionCollection *lgc.LoadGeneratingConnectionCollection, + probeDirection lgc.LgcDirection, probeInterval time.Duration, keyLogger io.Writer, captureExtendedStats bool, debugging *debug.DebugWithPrefix, -) (dataPoints chan probe.ProbeDataPoint) { +) (dataPoints chan utilities.Pair[*probe.ProbeDataPoint]) { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) Starting to collect responsiveness information at an interval of %v!\n", + debugging.Prefix, + probeInterval, + ) + } + // Make a channel to send back all the generated data points // when we are probing. - dataPoints = make(chan probe.ProbeDataPoint) + dataPoints = make(chan utilities.Pair[*probe.ProbeDataPoint]) go func() { wg := sync.WaitGroup{} - probeCount := 0 + probeCount := uint(0) + + dataPointsLock := sync.Mutex{} // As long as our context says that we can continue to probe! for proberCtx.Err() == nil { - time.Sleep(probeInterval) - foreignProbeConfiguration := foreignProbeConfigurationGenerator() - selfProbeConfiguration := selfProbeConfigurationGenerator() - - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "(%s) About to send round %d of probes!\n", - debugging.Prefix, - probeCount+1, - ) + // We may have slept for a very long time. So, let's check to see if we are + // still active, just for fun! + if proberCtx.Err() != nil { + break } - transport := &http.Transport{} - transport.TLSClientConfig = &tls.Config{} - transport.Proxy = http.ProxyFromEnvironment - if !utilities.IsInterfaceNil(keyLogger) { + wg.Add(1) + go func() { + defer wg.Done() + probeCount++ + probeCount := probeCount + + foreignProbeConfiguration := foreignProbeConfigurationGenerator() + selfProbeConfiguration := selfProbeConfigurationGenerator() + if debug.IsDebug(debugging.Level) { fmt.Printf( - "Using an SSL Key Logger for this foreign probe.\n", + "(%s) About to send round %d of probes!\n", + debugging.Prefix, + probeCount, ) } @@ -134,112 +148,160 @@ func CombinedProber( // depend on whether the url contains // https:// or http://: // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L74 - transport.TLSClientConfig.KeyLogWriter = keyLogger - } + transport := &http.Transport{} + transport.TLSClientConfig = &tls.Config{} + transport.Proxy = http.ProxyFromEnvironment - transport.TLSClientConfig.InsecureSkipVerify = - foreignProbeConfiguration.InsecureSkipVerify + if !utilities.IsInterfaceNil(keyLogger) { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "Using an SSL Key Logger for a foreign probe.\n", + ) + } - utilities.OverrideHostTransport(transport, - foreignProbeConfiguration.ConnectToAddr) + transport.TLSClientConfig.KeyLogWriter = keyLogger + } - foreignProbeClient := &http.Client{Transport: transport} + transport.TLSClientConfig.InsecureSkipVerify = + foreignProbeConfiguration.InsecureSkipVerify - // Start Foreign Connection Prober - probeCount++ - go probe.Probe( - networkActivityCtx, - &wg, - foreignProbeClient, - nil, - foreignProbeConfiguration.URL, - foreignProbeConfiguration.Host, - probe.Foreign, - &dataPoints, - captureExtendedStats, - debugging, - ) + utilities.OverrideHostTransport(transport, + foreignProbeConfiguration.ConnectToAddr) - // Start Self Download Connection Prober + foreignProbeClient := &http.Client{Transport: transport} - // TODO: Make the following sanity check more than just a check. - // We only want to start a SelfDown probe on a connection that is - // in the RUNNING state. - if selfDownProbeConnection.Status() == lgc.LGC_STATUS_RUNNING { - go probe.Probe( + // Start Foreign Connection Prober + foreignProbeDataPoint, err := probe.Probe( networkActivityCtx, - &wg, - selfDownProbeConnection.Client(), - selfDownProbeConnection, - selfProbeConfiguration.URL, - selfProbeConfiguration.Host, - probe.SelfDown, - &dataPoints, + foreignProbeClient, + foreignProbeConfiguration.URL, + foreignProbeConfiguration.Host, + probeDirection, + probe.Foreign, + probeCount, captureExtendedStats, debugging, ) - } else { - panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n", - debugging.Prefix, selfDownProbeConnection.Status())) - } + if err != nil { + return + } + + var selfProbeConnection *lgc.LoadGeneratingConnection = nil + func() { + selfProbeConnectionCollection.Lock.Lock() + defer selfProbeConnectionCollection.Lock.Unlock() + selfProbeConnection, err = selfProbeConnectionCollection.GetRandom() + if err != nil { + if debug.IsWarn(debugging.Level) { + fmt.Printf( + "(%s) Failed to get a random %s load-generating connection on which to send a probe: %v.\n", + debugging.Prefix, + utilities.Conditional(probeDirection == lgc.LGC_DOWN, "download", "upload"), + err, + ) + } + return + } + }() + if selfProbeConnection == nil { + return + } - // Start Self Upload Connection Prober + // TODO: Make the following sanity check more than just a check. + // We only want to start a SelfUp probe on a connection that is + // in the RUNNING state. + if (*selfProbeConnection).Status() != lgc.LGC_STATUS_RUNNING { + if debug.IsWarn(debugging.Level) { + fmt.Printf( + "(%s) The selected random %s load-generating connection on which to send a probe was not running.\n", + debugging.Prefix, + utilities.Conditional(probeDirection == lgc.LGC_DOWN, "download", "upload"), + ) + } + return + } - // TODO: Make the following sanity check more than just a check. - // We only want to start a SelfDown probe on a connection that is - // in the RUNNING state. - if selfUpProbeConnection.Status() == lgc.LGC_STATUS_RUNNING { - go probe.Probe( + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) Selected %s load-generating connection with ID %d to send a self probe with Id %d.\n", + debugging.Prefix, + utilities.Conditional(probeDirection == lgc.LGC_DOWN, "download", "upload"), + (*selfProbeConnection).ClientId(), + probeCount, + ) + } + selfProbeDataPoint, err := probe.Probe( proberCtx, - &wg, - selfUpProbeConnection.Client(), - nil, + (*selfProbeConnection).Client(), selfProbeConfiguration.URL, selfProbeConfiguration.Host, - probe.SelfUp, - &dataPoints, + probeDirection, + utilities.Conditional(probeDirection == lgc.LGC_DOWN, probe.SelfDown, probe.SelfUp), + probeCount, captureExtendedStats, debugging, ) - } else { - panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n", - debugging.Prefix, selfUpProbeConnection.Status())) - } + if err != nil { + fmt.Printf( + "(%s) There was an error sending a self probe with Id %d: %v\n", + debugging.Prefix, + probeCount, + err, + ) + return + } + + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) About to report results for round %d of probes!\n", + debugging.Prefix, + probeCount, + ) + } + + dataPointsLock.Lock() + // Now we have our four data points (three in the foreign probe data point and one in the self probe data point) + if dataPoints != nil { + dataPoints <- utilities.Pair[*probe.ProbeDataPoint]{ + First: foreignProbeDataPoint, Second: selfProbeDataPoint, + } + } + dataPointsLock.Unlock() + }() } if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) Combined probe driver is going to start waiting for its probes to finish.\n", + "(%s) Probe driver is going to start waiting for its probes to finish.\n", debugging.Prefix, ) } utilities.OrTimeout(func() { wg.Wait() }, 2*time.Second) if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) Combined probe driver is done waiting for its probes to finish.\n", + "(%s) Probe driver is done waiting for its probes to finish.\n", debugging.Prefix, ) } + dataPointsLock.Lock() close(dataPoints) + dataPoints = nil + dataPointsLock.Unlock() }() return } func LoadGenerator( + throughputCtx context.Context, // Stop our activity when we no longer need any throughput networkActivityCtx context.Context, // Create all network connections in this context. - loadGeneratorCtx context.Context, // Stop our activity when we no longer need to generate load. + generateLoadCtx context.Context, // Stop adding additional throughput when we are stable. rampupInterval time.Duration, lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection. loadGeneratingConnectionsCollection *lgc.LoadGeneratingConnectionCollection, + mnp int, captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections? debugging *debug.DebugWithPrefix, // How can we forget debugging? -) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes. - throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate. -) { - throughputCalculations = make(chan ThroughputDataPoint) - // The channel that we are going to use to send back the connection to use for probing may not immediately - // be read by the caller. We don't want to wait around until they are ready before we start doing our work. - // So, we'll make it buffered. - probeConnectionCommunicationChannel = make(chan lgc.LoadGeneratingConnection, 1) +) (stabilizerCommunicationChannel chan ThroughputDataPoint) { // Send back all the instantaneous throughputs that we generate. + stabilizerCommunicationChannel = make(chan ThroughputDataPoint) go func() { flowsCreated := uint64(0) @@ -252,32 +314,12 @@ func LoadGenerator( debugging.Level, ) - // We have at least a single load-generating channel. This channel will be the one that - // the self probes use. - go func() { - loadGeneratingConnectionsCollection.Lock.Lock() - zerothConnection, err := loadGeneratingConnectionsCollection.Get(0) - loadGeneratingConnectionsCollection.Lock.Unlock() - if err != nil { - panic("Could not get the zeroth connection!\n") - } - // We are going to wait until it is started. - if !(*zerothConnection).WaitUntilStarted(loadGeneratorCtx) { - fmt.Fprintf(os.Stderr, "Could not wait until the zeroth load-generating connection was started!\n") - return - } - // Now that it is started, we will send it back to the caller so that - // they can pass it on to the CombinedProber which will use it for the - // self probes. - probeConnectionCommunicationChannel <- *zerothConnection - }() - nextSampleStartTime := time.Now().Add(rampupInterval) for currentInterval := uint64(0); true; currentInterval++ { - // If the loadGeneratorCtx is canceled, then that means our work here is done ... - if loadGeneratorCtx.Err() != nil { + // If the throughputCtx is canceled, then that means our work here is done ... + if throughputCtx.Err() != nil { break } @@ -297,6 +339,12 @@ func LoadGenerator( } nextSampleStartTime = time.Now().Add(time.Second) + // Waiting is the hardest part -- that was a long time asleep + // and we may have been cancelled during that time! + if throughputCtx.Err() != nil { + break + } + // Compute "instantaneous aggregate" goodput which is the number of // bytes transferred within the last second. var instantaneousThroughputTotal float64 = 0 @@ -406,16 +454,50 @@ func LoadGenerator( len(*loadGeneratingConnectionsCollection.LGCs), granularThroughputDatapoints, } - throughputCalculations <- throughputDataPoint + stabilizerCommunicationChannel <- throughputDataPoint - // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now! - flowsCreated += addFlows( - networkActivityCtx, - constants.AdditiveNumberOfLoadGeneratingConnections, - loadGeneratingConnectionsCollection, - lgcGenerator, - debugging.Level, - ) + if generateLoadCtx.Err() != nil { + // No need to add additional data points because the controller told us + // that we were stable. But, we want to continue taking measurements! + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Throughput is stable; not adding any additional load-generating connections.\n", + debugging, + ) + } + continue + } + + loadGeneratingConnectionsCollection.Lock.Lock() + currentParallelConnectionCount, err := + loadGeneratingConnectionsCollection.Len() + loadGeneratingConnectionsCollection.Lock.Unlock() + + if err != nil { + if debug.IsWarn(debugging.Level) { + fmt.Printf( + "%v: Failed to get a count of the number of parallel load-generating connections: %v.\n", + debugging, + err, + ) + } + } + if currentParallelConnectionCount < mnp { + // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now! + flowsCreated += addFlows( + networkActivityCtx, + constants.AdditiveNumberOfLoadGeneratingConnections, + loadGeneratingConnectionsCollection, + lgcGenerator, + debugging.Level, + ) + } else if debug.IsWarn(debugging.Level) { + fmt.Printf( + "%v: Maximum number of parallel transport-layer connections reached (%d). Not adding another.\n", + debugging, + mnp, + ) + } } if debug.IsDebug(debugging.Level) { |
