summaryrefslogtreecommitdiff
path: root/rpm/rpm.go
diff options
context:
space:
mode:
Diffstat (limited to 'rpm/rpm.go')
-rw-r--r--rpm/rpm.go80
1 files changed, 57 insertions, 23 deletions
diff --git a/rpm/rpm.go b/rpm/rpm.go
index eb10ec2..bf1270a 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -40,19 +40,23 @@ func addFlows(
lgcc *lgc.LoadGeneratingConnectionCollection,
lgcGenerator func() lgc.LoadGeneratingConnection,
debug debug.DebugLevel,
-) {
+) uint64 {
lgcc.Lock.Lock()
defer lgcc.Lock.Unlock()
for i := uint64(0); i < toAdd; i++ {
+ // First, generate the connection.
*lgcc.LGCs = append(*lgcc.LGCs, lgcGenerator())
+ // Second, try to start the connection.
if !(*lgcc.LGCs)[len(*lgcc.LGCs)-1].Start(ctx, debug) {
+ // If there was an error, we'll make sure that the caller knows it.
fmt.Printf(
"Error starting lgc with id %d!\n",
(*lgcc.LGCs)[len(*lgcc.LGCs)-1].ClientId(),
)
- return
+ return i
}
}
+ return toAdd
}
type ProbeConfiguration struct {
@@ -70,10 +74,12 @@ type ProbeDataPoint struct {
}
type GranularThroughputDataPoint struct {
- Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"`
- Throughput float64 `Description:"Instantaneous throughput (B/s)."`
- ConnID uint32 `Description:"Position of connection (ID)."`
- Direction string `Description:"Direction of Throughput."`
+ Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"`
+ Throughput float64 `Description:"Instantaneous throughput (B/s)."`
+ ConnID uint32 `Description:"Position of connection (ID)."`
+ TCPRtt time.Duration `Description:"The underlying connection's RTT at probe time." Formatter:"Seconds"`
+ TCPCwnd uint32 `Description:"The underlying connection's congestion window at probe time."`
+ Direction string `Description:"Direction of Throughput."`
}
type ThroughputDataPoint struct {
@@ -98,6 +104,15 @@ const (
Foreign
)
+type ProbeRoundTripCountType uint16
+
+const (
+ DefaultDownRoundTripCount ProbeRoundTripCountType = 1
+ SelfUpRoundTripCount ProbeRoundTripCountType = 1
+ SelfDownRoundTripCount ProbeRoundTripCountType = 1
+ ForeignRoundTripCount ProbeRoundTripCountType = 3
+)
+
func (pt ProbeType) Value() string {
if pt == SelfUp {
return "SelfUp"
@@ -108,13 +123,14 @@ func (pt ProbeType) Value() string {
}
func Probe(
- parentProbeCtx context.Context,
+ managingCtx context.Context,
waitGroup *sync.WaitGroup,
client *http.Client,
probeUrl string,
probeHost string, // optional: for use with a test_endpoint
probeType ProbeType,
result *chan ProbeDataPoint,
+ captureExtendedStats bool,
debugging *debug.DebugWithPrefix,
) error {
@@ -131,7 +147,7 @@ func Probe(
probeTracer := NewProbeTracer(client, probeType, probeId, debugging)
time_before_probe := time.Now()
probe_req, err := http.NewRequestWithContext(
- httptrace.WithClientTrace(parentProbeCtx, probeTracer.trace),
+ httptrace.WithClientTrace(managingCtx, probeTracer.trace),
"GET",
probeUrl,
nil,
@@ -195,9 +211,9 @@ func Probe(
totalDelay,
)
}
- roundTripCount := uint64(1)
+ roundTripCount := DefaultDownRoundTripCount
if probeType == Foreign {
- roundTripCount = 3
+ roundTripCount = ForeignRoundTripCount
}
// Careful!!! It's possible that this channel has been closed because the Prober that
// started it has been stopped. Writing to a closed channel will cause a panic. It might not
@@ -218,7 +234,7 @@ func Probe(
tcpRtt := time.Duration(0 * time.Second)
tcpCwnd := uint32(0)
// TODO: Only get the extended stats for a connection if the user has requested them overall.
- if extendedstats.ExtendedStatsAvailable() {
+ if captureExtendedStats && extendedstats.ExtendedStatsAvailable() {
tcpInfo, err := extendedstats.GetTCPInfo(probeTracer.stats.ConnInfo.Conn)
if err == nil {
tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond
@@ -229,7 +245,7 @@ func Probe(
}
dataPoint := ProbeDataPoint{
Time: time_before_probe,
- RoundTripCount: roundTripCount,
+ RoundTripCount: uint64(roundTripCount),
Duration: totalDelay,
TCPRtt: tcpRtt,
TCPCwnd: tcpCwnd,
@@ -241,12 +257,14 @@ func Probe(
func CombinedProber(
proberCtx context.Context,
+ networkActivityCtx context.Context,
foreignProbeConfigurationGenerator func() ProbeConfiguration,
selfProbeConfigurationGenerator func() ProbeConfiguration,
selfDownProbeConnection lgc.LoadGeneratingConnection,
selfUpProbeConnection lgc.LoadGeneratingConnection,
probeInterval time.Duration,
keyLogger io.Writer,
+ captureExtendedStats bool,
debugging *debug.DebugWithPrefix,
) (dataPoints chan ProbeDataPoint) {
@@ -296,31 +314,34 @@ func CombinedProber(
foreignProbeClient := &http.Client{Transport: &transport}
+ // Start Foreign Connection Prober
probeCount++
go Probe(
- proberCtx,
+ networkActivityCtx,
&wg,
foreignProbeClient,
foreignProbeConfiguration.URL,
foreignProbeConfiguration.Host,
Foreign,
&dataPoints,
+ captureExtendedStats,
debugging,
)
- // Start Download Connection Prober
+ // Start Self Download Connection Prober
go Probe(
- proberCtx,
+ networkActivityCtx,
&wg,
selfDownProbeConnection.Client(),
selfProbeConfiguration.URL,
selfProbeConfiguration.Host,
SelfDown,
&dataPoints,
+ captureExtendedStats,
debugging,
)
- // Start Upload Connection Prober
+ // Start Self Upload Connection Prober
go Probe(
proberCtx,
&wg,
@@ -329,6 +350,7 @@ func CombinedProber(
selfProbeConfiguration.Host,
SelfUp,
&dataPoints,
+ captureExtendedStats,
debugging,
)
}
@@ -356,6 +378,7 @@ func LoadGenerator(
rampupInterval time.Duration,
lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection.
loadGeneratingConnections *lgc.LoadGeneratingConnectionCollection,
+ captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections?
debugging *debug.DebugWithPrefix, // How can we forget debugging?
) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes.
throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate.
@@ -371,14 +394,13 @@ func LoadGenerator(
flowsCreated := uint64(0)
- addFlows(
+ flowsCreated += addFlows(
networkActivityCtx,
constants.StartingNumberOfLoadGeneratingConnections,
loadGeneratingConnections,
lgcGenerator,
debugging.Level,
)
- flowsCreated += constants.StartingNumberOfLoadGeneratingConnections
// We have at least a single load-generating channel. This channel will be the one that
// the self probes use. Let's send it back to the caller so that they can pass it on if they need to.
@@ -425,7 +447,7 @@ func LoadGenerator(
)
}
// TODO: Do we add null connection to throughput? and how do we define it? Throughput -1 or 0?
- granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, 0, uint32(i), ""})
+ granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, 0, uint32(i), 0, 0, ""})
continue
}
allInvalid = false
@@ -437,7 +459,21 @@ func LoadGenerator(
currentInterval.Seconds(),
)
instantaneousTotalThroughput += instantaneousConnectionThroughput
- granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, instantaneousConnectionThroughput, uint32(i), ""})
+
+ tcpRtt := time.Duration(0 * time.Second)
+ tcpCwnd := uint32(0)
+ if captureExtendedStats && extendedstats.ExtendedStatsAvailable() {
+ if stats := (*loadGeneratingConnections.LGCs)[i].Stats(); stats != nil {
+ tcpInfo, err := extendedstats.GetTCPInfo(stats.ConnInfo.Conn)
+ if err == nil {
+ tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond
+ tcpCwnd = tcpInfo.Snd_cwnd
+ } else {
+ fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err)
+ }
+ }
+ }
+ granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, instantaneousConnectionThroughput, uint32(i), tcpRtt, tcpCwnd, ""})
}
// For some reason, all the lgcs are invalid. This likely means that
@@ -457,15 +493,13 @@ func LoadGenerator(
throughputCalculations <- throughputDataPoint
// Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now!
- addFlows(
+ flowsCreated += addFlows(
networkActivityCtx,
constants.AdditiveNumberOfLoadGeneratingConnections,
loadGeneratingConnections,
lgcGenerator,
debugging.Level,
)
-
- flowsCreated += constants.AdditiveNumberOfLoadGeneratingConnections
}
if debug.IsDebug(debugging.Level) {