summaryrefslogtreecommitdiff
path: root/rpm/rpm.go
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2022-11-07 01:54:24 -0500
committerWill Hawkins <[email protected]>2022-12-11 16:53:59 -0500
commit094eb99990f1cf734efb8104e0089dbc6920547e (patch)
tree8b8bb26481690f852115ebc236be169669f1b4f0 /rpm/rpm.go
parent096b9d30559f86e07117ff5459c720900a408a11 (diff)
[Feature] Rev 3 of Stability (Basic implementation)
Diffstat (limited to 'rpm/rpm.go')
-rw-r--r--rpm/rpm.go120
1 files changed, 51 insertions, 69 deletions
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 3c2fa7f..740db2b 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -25,7 +25,6 @@ import (
"time"
"github.com/network-quality/goresponsiveness/constants"
- "github.com/network-quality/goresponsiveness/datalogger"
"github.com/network-quality/goresponsiveness/debug"
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
@@ -38,16 +37,18 @@ import (
func addFlows(
ctx context.Context,
toAdd uint64,
- lgcs *[]lgc.LoadGeneratingConnection,
+ lgcc *lgc.LoadGeneratingConnectionCollection,
lgcGenerator func() lgc.LoadGeneratingConnection,
debug debug.DebugLevel,
) {
+ lgcc.Lock.Lock()
+ defer lgcc.Lock.Unlock()
for i := uint64(0); i < toAdd; i++ {
- *lgcs = append(*lgcs, lgcGenerator())
- if !(*lgcs)[len(*lgcs)-1].Start(ctx, debug) {
+ *lgcc.LGCs = append(*lgcc.LGCs, lgcGenerator())
+ if !(*lgcc.LGCs)[len(*lgcc.LGCs)-1].Start(ctx, debug) {
fmt.Printf(
"Error starting lgc with id %d!\n",
- (*lgcs)[len(*lgcs)-1].ClientId(),
+ (*lgcc.LGCs)[len(*lgcc.LGCs)-1].ClientId(),
)
return
}
@@ -55,8 +56,7 @@ func addFlows(
}
type ProbeConfiguration struct {
- URL string
- DataLogger datalogger.DataLogger[ProbeDataPoint]
+ URL string
}
type ProbeDataPoint struct {
@@ -65,6 +65,7 @@ type ProbeDataPoint struct {
Duration time.Duration `Description:"The duration for this measurement." Formatter:"Seconds"`
TCPRtt time.Duration `Description:"The underlying connection's RTT at probe time." Formatter:"Seconds"`
TCPCwnd uint32 `Description:"The underlying connection's congestion window at probe time."`
+ Type ProbeType `Description:"The type of the probe." Formatter:"Value"`
}
type ThroughputDataPoint struct {
@@ -97,7 +98,6 @@ func (pt ProbeType) Value() string {
func Probe(
parentProbeCtx context.Context,
waitGroup *sync.WaitGroup,
- logger datalogger.DataLogger[ProbeDataPoint],
client *http.Client,
probeUrl string,
probeType ProbeType,
@@ -111,7 +111,7 @@ func Probe(
}
if client == nil {
- return fmt.Errorf("Cannot start a probe with a nil client")
+ return fmt.Errorf("cannot start a probe with a nil client")
}
probeId := utilities.GenerateUniqueId()
@@ -182,7 +182,7 @@ func Probe(
if probeType == Foreign {
roundTripCount = 3
}
- // TODO: Careful!!! It's possible that this channel has been closed because the Prober that
+ // Careful!!! It's possible that this channel has been closed because the Prober that
// started it has been stopped. Writing to a closed channel will cause a panic. It might not
// matter because a panic just stops the go thread containing the paniced code and we are in
// a go thread that executes only this function.
@@ -200,6 +200,7 @@ func Probe(
}()
tcpRtt := time.Duration(0 * time.Second)
tcpCwnd := uint32(0)
+ // TODO: Only get the extended stats for a connection if the user has requested them overall.
if extendedstats.ExtendedStatsAvailable() {
tcpInfo, err := extendedstats.GetTCPInfo(probeTracer.stats.ConnInfo.Conn)
if err == nil {
@@ -215,9 +216,7 @@ func Probe(
Duration: totalDelay,
TCPRtt: tcpRtt,
TCPCwnd: tcpCwnd,
- }
- if !utilities.IsInterfaceNil(logger) {
- logger.LogRecord(dataPoint)
+ Type: probeType,
}
*result <- dataPoint
return nil
@@ -231,13 +230,17 @@ func CombinedProber(
probeInterval time.Duration,
keyLogger io.Writer,
debugging *debug.DebugWithPrefix,
-) (points chan ProbeDataPoint) {
- points = make(chan ProbeDataPoint)
+) (dataPoints chan ProbeDataPoint) {
+
+ // Make a channel to send back all the generated data points
+ // when we are probing.
+ dataPoints = make(chan ProbeDataPoint)
go func() {
wg := sync.WaitGroup{}
probeCount := 0
+ // As long as our context says that we can continue to probe!
for proberCtx.Err() == nil {
time.Sleep(probeInterval)
@@ -247,9 +250,9 @@ func CombinedProber(
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) About to send of the %d round of probes!\n",
+ "(%s) About to send round %d of probes!\n",
debugging.Prefix,
- probeCount,
+ probeCount+1,
)
}
transport := http2.Transport{}
@@ -273,28 +276,37 @@ func CombinedProber(
}
transport.TLSClientConfig.InsecureSkipVerify = true
- client := &http.Client{Transport: &transport}
+ foreignProbeClient := &http.Client{Transport: &transport}
probeCount++
go Probe(
proberCtx,
&wg,
- foreignProbeConfiguration.DataLogger,
- client,
+ foreignProbeClient,
foreignProbeConfiguration.URL,
Foreign,
- &points,
+ &dataPoints,
debugging,
)
go Probe(
proberCtx,
&wg,
- selfProbeConfiguration.DataLogger,
selfProbeConnection.Client(),
selfProbeConfiguration.URL,
- Self,
- &points,
+ SelfDown,
+ &dataPoints,
+ debugging,
+ )
+
+ // Start Upload Connection Prober
+ go Probe(
+ proberCtx,
+ &wg,
+ selfUpProbeConnection.Client(),
+ selfProbeConfiguration.URL,
+ SelfUp,
+ &dataPoints,
debugging,
)
}
@@ -311,22 +323,20 @@ func CombinedProber(
debugging.Prefix,
)
}
- close(points)
+ close(dataPoints)
}()
return
}
func LoadGenerator(
networkActivityCtx context.Context, // Create all network connections in this context.
- saturationCtx context.Context, // Continue logging, but stop adding flows when this context is canceled!
loadGeneratorCtx context.Context, // Stop our activity when we no longer need to generate load.
rampupInterval time.Duration,
lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection.
loadGeneratingConnections *lgc.LoadGeneratingConnectionCollection,
debugging *debug.DebugWithPrefix, // How can we forget debugging?
-) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to use for self probes.
+) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes.
throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate.
- lgcs []lgc.LoadGeneratingConnection, // The caller will want to look at this if they are interested in doing extended stats.
) {
throughputCalculations = make(chan ThroughputDataPoint)
@@ -334,25 +344,18 @@ func LoadGenerator(
// be read by the caller. We don't want to wait around until they are ready before we start doing our work.
// So, we'll make it buffered.
probeConnectionCommunicationChannel = make(chan lgc.LoadGeneratingConnection, 1)
- lgcs = make([]lgc.LoadGeneratingConnection, 0)
go func() {
- isSaturated := false
-
- lgcs := make([]lgc.LoadGeneratingConnection, 0)
-
flowsCreated := uint64(0)
- loadGeneratingConnections.Lock.Lock()
addFlows(
networkActivityCtx,
constants.StartingNumberOfLoadGeneratingConnections,
- loadGeneratingConnections.LGCs,
+ loadGeneratingConnections,
lgcGenerator,
debugging.Level,
)
- loadGeneratingConnections.Lock.Unlock()
flowsCreated += constants.StartingNumberOfLoadGeneratingConnections
// We have at least a single load-generating channel. This channel will be the one that
@@ -363,20 +366,11 @@ func LoadGenerator(
for currentInterval := uint64(0); true; currentInterval++ {
- // If the operationalCtx is canceled, then that means our work here is done ...
+ // If the loadGeneratorCtx is canceled, then that means our work here is done ...
if loadGeneratorCtx.Err() != nil {
break
}
- if saturationCtx.Err() != nil {
- isSaturated = true
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "%v: Received the saturated signal; continuing only to log from now on.\n",
- debugging,
- )
- }
- }
now := time.Now()
// At each 1-second interval
if nextSampleStartTime.Sub(now) > 0 {
@@ -435,26 +429,14 @@ func LoadGenerator(
throughputDataPoint := ThroughputDataPoint{time.Now(), instantaneousTotalThroughput, len(*loadGeneratingConnections.LGCs)}
throughputCalculations <- throughputDataPoint
- // Log that, if we are configured for logging.
- if !utilities.IsInterfaceNil(throughputDataLogger) {
- throughputDataLogger.LogRecord(throughputDataPoint)
- }
-
- // We don't actually want to create a new connection if we are saturated!
- if isSaturated {
- continue
- }
-
// Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now!
- loadGeneratingConnections.Lock.Lock()
addFlows(
networkActivityCtx,
constants.AdditiveNumberOfLoadGeneratingConnections,
- loadGeneratingConnections.LGCs,
+ loadGeneratingConnections,
lgcGenerator,
debugging.Level,
)
- loadGeneratingConnections.Lock.Unlock()
flowsCreated += constants.AdditiveNumberOfLoadGeneratingConnections
}
@@ -670,17 +652,17 @@ func (probe *ProbeTracer) SetGotConnTimeInfo(
os.Stderr,
"A self probe sent used a new connection!\n",
)
- } else if debug.IsDebug(probe.debug) {
- fmt.Printf("Properly reused a connection when doing a self probe!\n")
}
- if debug.IsDebug(probe.debug) {
- fmt.Printf(
- "(%s Probe) Got a reused connection for Probe %v at %v with info %v\n",
- probe.probeType.Value(),
- probe.ProbeId(),
- probe.stats.GetConnectionDoneTime,
- probe.stats.ConnInfo,
- )
+ if gotConnInfo.Reused {
+ if debug.IsDebug(probe.debug) {
+ fmt.Printf(
+ "(%s Probe) Got a reused connection for Probe %v at %v with info %v\n",
+ probe.probeType.Value(),
+ probe.ProbeId(),
+ probe.stats.GetConnectionDoneTime,
+ probe.stats.ConnInfo,
+ )
+ }
}
}