summaryrefslogtreecommitdiff
path: root/rpm
diff options
context:
space:
mode:
Diffstat (limited to 'rpm')
-rw-r--r--rpm/rpm.go49
1 files changed, 35 insertions, 14 deletions
diff --git a/rpm/rpm.go b/rpm/rpm.go
index c900642..07bc787 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -69,6 +69,7 @@ type GranularThroughputDataPoint struct {
type ThroughputDataPoint struct {
Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"`
Throughput float64 `Description:"Instantaneous throughput (B/s)."`
+ ActiveConnections int `Description:"Number of active parallel connections."`
Connections int `Description:"Number of parallel connections."`
GranularThroughputDataPoints []GranularThroughputDataPoint `Description:"[OMIT]"`
}
@@ -92,7 +93,6 @@ func CombinedProber(
captureExtendedStats bool,
debugging *debug.DebugWithPrefix,
) (dataPoints chan probe.ProbeDataPoint) {
-
// Make a channel to send back all the generated data points
// when we are probing.
dataPoints = make(chan probe.ProbeDataPoint)
@@ -137,9 +137,11 @@ func CombinedProber(
transport.TLSClientConfig.KeyLogWriter = keyLogger
}
- transport.TLSClientConfig.InsecureSkipVerify = foreignProbeConfiguration.InsecureSkipVerify
+ transport.TLSClientConfig.InsecureSkipVerify =
+ foreignProbeConfiguration.InsecureSkipVerify
- utilities.OverrideHostTransport(transport, foreignProbeConfiguration.ConnectToAddr)
+ utilities.OverrideHostTransport(transport,
+ foreignProbeConfiguration.ConnectToAddr)
foreignProbeClient := &http.Client{Transport: transport}
@@ -149,6 +151,7 @@ func CombinedProber(
networkActivityCtx,
&wg,
foreignProbeClient,
+ nil,
foreignProbeConfiguration.URL,
foreignProbeConfiguration.Host,
probe.Foreign,
@@ -167,6 +170,7 @@ func CombinedProber(
networkActivityCtx,
&wg,
selfDownProbeConnection.Client(),
+ selfDownProbeConnection,
selfProbeConfiguration.URL,
selfProbeConfiguration.Host,
probe.SelfDown,
@@ -175,7 +179,8 @@ func CombinedProber(
debugging,
)
} else {
- panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n", debugging.Prefix, selfDownProbeConnection.Status()))
+ panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n",
+ debugging.Prefix, selfDownProbeConnection.Status()))
}
// Start Self Upload Connection Prober
@@ -188,6 +193,7 @@ func CombinedProber(
proberCtx,
&wg,
selfUpProbeConnection.Client(),
+ nil,
selfProbeConfiguration.URL,
selfProbeConfiguration.Host,
probe.SelfUp,
@@ -196,7 +202,8 @@ func CombinedProber(
debugging,
)
} else {
- panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n", debugging.Prefix, selfUpProbeConnection.Status()))
+ panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n",
+ debugging.Prefix, selfUpProbeConnection.Status()))
}
}
if debug.IsDebug(debugging.Level) {
@@ -228,7 +235,6 @@ func LoadGenerator(
) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes.
throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate.
) {
-
throughputCalculations = make(chan ThroughputDataPoint)
// The channel that we are going to use to send back the connection to use for probing may not immediately
// be read by the caller. We don't want to wait around until they are ready before we start doing our work.
@@ -236,7 +242,6 @@ func LoadGenerator(
probeConnectionCommunicationChannel = make(chan lgc.LoadGeneratingConnection, 1)
go func() {
-
flowsCreated := uint64(0)
flowsCreated += addFlows(
@@ -248,7 +253,7 @@ func LoadGenerator(
)
// We have at least a single load-generating channel. This channel will be the one that
- // the self probes use. Let's send it back to the caller so that they can pass it on if they need to.
+ // the self probes use.
go func() {
loadGeneratingConnectionsCollection.Lock.Lock()
zerothConnection, err := loadGeneratingConnectionsCollection.Get(0)
@@ -256,10 +261,14 @@ func LoadGenerator(
if err != nil {
panic("Could not get the zeroth connection!\n")
}
+ // We are going to wait until it is started.
if !(*zerothConnection).WaitUntilStarted(loadGeneratorCtx) {
fmt.Fprintf(os.Stderr, "Could not wait until the zeroth load-generating connection was started!\n")
return
}
+ // Now that it is started, we will send it back to the caller so that
+ // they can pass it on to the CombinedProber which will use it for the
+ // self probes.
probeConnectionCommunicationChannel <- *zerothConnection
}()
@@ -290,7 +299,8 @@ func LoadGenerator(
// Compute "instantaneous aggregate" goodput which is the number of
// bytes transferred within the last second.
- var instantaneousTotalThroughput float64 = 0
+ var instantaneousThroughputTotal float64 = 0
+ var instantaneousThroughputDataPoints uint = 0
granularThroughputDatapoints := make([]GranularThroughputDataPoint, 0)
now = time.Now() // Used to align granular throughput data
allInvalid := true
@@ -323,19 +333,30 @@ func LoadGenerator(
granularThroughputDatapoints,
GranularThroughputDataPoint{now, 0, uint32(i), 0, 0, ""},
)
- continue
+ }
+ case lgc.LGC_STATUS_NOT_STARTED:
+ {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: Load-generating connection with id %d has not finished starting; "+
+ "it will not contribute throughput during this interval.\n",
+ debugging,
+ (*loadGeneratingConnectionsCollection.LGCs)[i].ClientId())
+ }
}
case lgc.LGC_STATUS_RUNNING:
{
allInvalid = false
- currentTransferred, currentInterval := (*loadGeneratingConnectionsCollection.LGCs)[i].TransferredInInterval()
+ currentTransferred, currentInterval :=
+ (*loadGeneratingConnectionsCollection.LGCs)[i].TransferredInInterval()
// normalize to a second-long interval!
instantaneousConnectionThroughput := float64(
currentTransferred,
) / float64(
currentInterval.Seconds(),
)
- instantaneousTotalThroughput += instantaneousConnectionThroughput
+ instantaneousThroughputTotal += instantaneousConnectionThroughput
+ instantaneousThroughputDataPoints++
tcpRtt := time.Duration(0 * time.Second)
tcpCwnd := uint32(0)
@@ -362,7 +383,6 @@ func LoadGenerator(
},
)
}
-
}
}
@@ -381,7 +401,8 @@ func LoadGenerator(
// We have generated a throughput calculation -- let's send it back to the coordinator
throughputDataPoint := ThroughputDataPoint{
time.Now(),
- instantaneousTotalThroughput,
+ instantaneousThroughputTotal,
+ int(instantaneousThroughputDataPoints),
len(*loadGeneratingConnectionsCollection.LGCs),
granularThroughputDatapoints,
}