diff options
| author | Will Hawkins <[email protected]> | 2022-11-07 01:54:24 -0500 |
|---|---|---|
| committer | Will Hawkins <[email protected]> | 2022-12-11 16:53:59 -0500 |
| commit | 094eb99990f1cf734efb8104e0089dbc6920547e (patch) | |
| tree | 8b8bb26481690f852115ebc236be169669f1b4f0 /rpm/rpm.go | |
| parent | 096b9d30559f86e07117ff5459c720900a408a11 (diff) | |
[Feature] Rev 3 of Stability (Basic implementation)
Diffstat (limited to 'rpm/rpm.go')
| -rw-r--r-- | rpm/rpm.go | 120 |
1 files changed, 51 insertions, 69 deletions
@@ -25,7 +25,6 @@ import ( "time" "github.com/network-quality/goresponsiveness/constants" - "github.com/network-quality/goresponsiveness/datalogger" "github.com/network-quality/goresponsiveness/debug" "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" @@ -38,16 +37,18 @@ import ( func addFlows( ctx context.Context, toAdd uint64, - lgcs *[]lgc.LoadGeneratingConnection, + lgcc *lgc.LoadGeneratingConnectionCollection, lgcGenerator func() lgc.LoadGeneratingConnection, debug debug.DebugLevel, ) { + lgcc.Lock.Lock() + defer lgcc.Lock.Unlock() for i := uint64(0); i < toAdd; i++ { - *lgcs = append(*lgcs, lgcGenerator()) - if !(*lgcs)[len(*lgcs)-1].Start(ctx, debug) { + *lgcc.LGCs = append(*lgcc.LGCs, lgcGenerator()) + if !(*lgcc.LGCs)[len(*lgcc.LGCs)-1].Start(ctx, debug) { fmt.Printf( "Error starting lgc with id %d!\n", - (*lgcs)[len(*lgcs)-1].ClientId(), + (*lgcc.LGCs)[len(*lgcc.LGCs)-1].ClientId(), ) return } @@ -55,8 +56,7 @@ func addFlows( } type ProbeConfiguration struct { - URL string - DataLogger datalogger.DataLogger[ProbeDataPoint] + URL string } type ProbeDataPoint struct { @@ -65,6 +65,7 @@ type ProbeDataPoint struct { Duration time.Duration `Description:"The duration for this measurement." Formatter:"Seconds"` TCPRtt time.Duration `Description:"The underlying connection's RTT at probe time." Formatter:"Seconds"` TCPCwnd uint32 `Description:"The underlying connection's congestion window at probe time."` + Type ProbeType `Description:"The type of the probe." Formatter:"Value"` } type ThroughputDataPoint struct { @@ -97,7 +98,6 @@ func (pt ProbeType) Value() string { func Probe( parentProbeCtx context.Context, waitGroup *sync.WaitGroup, - logger datalogger.DataLogger[ProbeDataPoint], client *http.Client, probeUrl string, probeType ProbeType, @@ -111,7 +111,7 @@ func Probe( } if client == nil { - return fmt.Errorf("Cannot start a probe with a nil client") + return fmt.Errorf("cannot start a probe with a nil client") } probeId := utilities.GenerateUniqueId() @@ -182,7 +182,7 @@ func Probe( if probeType == Foreign { roundTripCount = 3 } - // TODO: Careful!!! It's possible that this channel has been closed because the Prober that + // Careful!!! It's possible that this channel has been closed because the Prober that // started it has been stopped. Writing to a closed channel will cause a panic. It might not // matter because a panic just stops the go thread containing the paniced code and we are in // a go thread that executes only this function. @@ -200,6 +200,7 @@ func Probe( }() tcpRtt := time.Duration(0 * time.Second) tcpCwnd := uint32(0) + // TODO: Only get the extended stats for a connection if the user has requested them overall. if extendedstats.ExtendedStatsAvailable() { tcpInfo, err := extendedstats.GetTCPInfo(probeTracer.stats.ConnInfo.Conn) if err == nil { @@ -215,9 +216,7 @@ func Probe( Duration: totalDelay, TCPRtt: tcpRtt, TCPCwnd: tcpCwnd, - } - if !utilities.IsInterfaceNil(logger) { - logger.LogRecord(dataPoint) + Type: probeType, } *result <- dataPoint return nil @@ -231,13 +230,17 @@ func CombinedProber( probeInterval time.Duration, keyLogger io.Writer, debugging *debug.DebugWithPrefix, -) (points chan ProbeDataPoint) { - points = make(chan ProbeDataPoint) +) (dataPoints chan ProbeDataPoint) { + + // Make a channel to send back all the generated data points + // when we are probing. + dataPoints = make(chan ProbeDataPoint) go func() { wg := sync.WaitGroup{} probeCount := 0 + // As long as our context says that we can continue to probe! for proberCtx.Err() == nil { time.Sleep(probeInterval) @@ -247,9 +250,9 @@ func CombinedProber( if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) About to send of the %d round of probes!\n", + "(%s) About to send round %d of probes!\n", debugging.Prefix, - probeCount, + probeCount+1, ) } transport := http2.Transport{} @@ -273,28 +276,37 @@ func CombinedProber( } transport.TLSClientConfig.InsecureSkipVerify = true - client := &http.Client{Transport: &transport} + foreignProbeClient := &http.Client{Transport: &transport} probeCount++ go Probe( proberCtx, &wg, - foreignProbeConfiguration.DataLogger, - client, + foreignProbeClient, foreignProbeConfiguration.URL, Foreign, - &points, + &dataPoints, debugging, ) go Probe( proberCtx, &wg, - selfProbeConfiguration.DataLogger, selfProbeConnection.Client(), selfProbeConfiguration.URL, - Self, - &points, + SelfDown, + &dataPoints, + debugging, + ) + + // Start Upload Connection Prober + go Probe( + proberCtx, + &wg, + selfUpProbeConnection.Client(), + selfProbeConfiguration.URL, + SelfUp, + &dataPoints, debugging, ) } @@ -311,22 +323,20 @@ func CombinedProber( debugging.Prefix, ) } - close(points) + close(dataPoints) }() return } func LoadGenerator( networkActivityCtx context.Context, // Create all network connections in this context. - saturationCtx context.Context, // Continue logging, but stop adding flows when this context is canceled! loadGeneratorCtx context.Context, // Stop our activity when we no longer need to generate load. rampupInterval time.Duration, lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection. loadGeneratingConnections *lgc.LoadGeneratingConnectionCollection, debugging *debug.DebugWithPrefix, // How can we forget debugging? -) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to use for self probes. +) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes. throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate. - lgcs []lgc.LoadGeneratingConnection, // The caller will want to look at this if they are interested in doing extended stats. ) { throughputCalculations = make(chan ThroughputDataPoint) @@ -334,25 +344,18 @@ func LoadGenerator( // be read by the caller. We don't want to wait around until they are ready before we start doing our work. // So, we'll make it buffered. probeConnectionCommunicationChannel = make(chan lgc.LoadGeneratingConnection, 1) - lgcs = make([]lgc.LoadGeneratingConnection, 0) go func() { - isSaturated := false - - lgcs := make([]lgc.LoadGeneratingConnection, 0) - flowsCreated := uint64(0) - loadGeneratingConnections.Lock.Lock() addFlows( networkActivityCtx, constants.StartingNumberOfLoadGeneratingConnections, - loadGeneratingConnections.LGCs, + loadGeneratingConnections, lgcGenerator, debugging.Level, ) - loadGeneratingConnections.Lock.Unlock() flowsCreated += constants.StartingNumberOfLoadGeneratingConnections // We have at least a single load-generating channel. This channel will be the one that @@ -363,20 +366,11 @@ func LoadGenerator( for currentInterval := uint64(0); true; currentInterval++ { - // If the operationalCtx is canceled, then that means our work here is done ... + // If the loadGeneratorCtx is canceled, then that means our work here is done ... if loadGeneratorCtx.Err() != nil { break } - if saturationCtx.Err() != nil { - isSaturated = true - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Received the saturated signal; continuing only to log from now on.\n", - debugging, - ) - } - } now := time.Now() // At each 1-second interval if nextSampleStartTime.Sub(now) > 0 { @@ -435,26 +429,14 @@ func LoadGenerator( throughputDataPoint := ThroughputDataPoint{time.Now(), instantaneousTotalThroughput, len(*loadGeneratingConnections.LGCs)} throughputCalculations <- throughputDataPoint - // Log that, if we are configured for logging. - if !utilities.IsInterfaceNil(throughputDataLogger) { - throughputDataLogger.LogRecord(throughputDataPoint) - } - - // We don't actually want to create a new connection if we are saturated! - if isSaturated { - continue - } - // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now! - loadGeneratingConnections.Lock.Lock() addFlows( networkActivityCtx, constants.AdditiveNumberOfLoadGeneratingConnections, - loadGeneratingConnections.LGCs, + loadGeneratingConnections, lgcGenerator, debugging.Level, ) - loadGeneratingConnections.Lock.Unlock() flowsCreated += constants.AdditiveNumberOfLoadGeneratingConnections } @@ -670,17 +652,17 @@ func (probe *ProbeTracer) SetGotConnTimeInfo( os.Stderr, "A self probe sent used a new connection!\n", ) - } else if debug.IsDebug(probe.debug) { - fmt.Printf("Properly reused a connection when doing a self probe!\n") } - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) Got a reused connection for Probe %v at %v with info %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.GetConnectionDoneTime, - probe.stats.ConnInfo, - ) + if gotConnInfo.Reused { + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(%s Probe) Got a reused connection for Probe %v at %v with info %v\n", + probe.probeType.Value(), + probe.ProbeId(), + probe.stats.GetConnectionDoneTime, + probe.stats.ConnInfo, + ) + } } } |
