diff options
Diffstat (limited to 'rpm')
| -rw-r--r-- | rpm/rpm.go | 49 |
1 files changed, 35 insertions, 14 deletions
@@ -69,6 +69,7 @@ type GranularThroughputDataPoint struct { type ThroughputDataPoint struct { Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"` Throughput float64 `Description:"Instantaneous throughput (B/s)."` + ActiveConnections int `Description:"Number of active parallel connections."` Connections int `Description:"Number of parallel connections."` GranularThroughputDataPoints []GranularThroughputDataPoint `Description:"[OMIT]"` } @@ -92,7 +93,6 @@ func CombinedProber( captureExtendedStats bool, debugging *debug.DebugWithPrefix, ) (dataPoints chan probe.ProbeDataPoint) { - // Make a channel to send back all the generated data points // when we are probing. dataPoints = make(chan probe.ProbeDataPoint) @@ -137,9 +137,11 @@ func CombinedProber( transport.TLSClientConfig.KeyLogWriter = keyLogger } - transport.TLSClientConfig.InsecureSkipVerify = foreignProbeConfiguration.InsecureSkipVerify + transport.TLSClientConfig.InsecureSkipVerify = + foreignProbeConfiguration.InsecureSkipVerify - utilities.OverrideHostTransport(transport, foreignProbeConfiguration.ConnectToAddr) + utilities.OverrideHostTransport(transport, + foreignProbeConfiguration.ConnectToAddr) foreignProbeClient := &http.Client{Transport: transport} @@ -149,6 +151,7 @@ func CombinedProber( networkActivityCtx, &wg, foreignProbeClient, + nil, foreignProbeConfiguration.URL, foreignProbeConfiguration.Host, probe.Foreign, @@ -167,6 +170,7 @@ func CombinedProber( networkActivityCtx, &wg, selfDownProbeConnection.Client(), + selfDownProbeConnection, selfProbeConfiguration.URL, selfProbeConfiguration.Host, probe.SelfDown, @@ -175,7 +179,8 @@ func CombinedProber( debugging, ) } else { - panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n", debugging.Prefix, selfDownProbeConnection.Status())) + panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n", + debugging.Prefix, selfDownProbeConnection.Status())) } // Start Self Upload Connection Prober @@ -188,6 +193,7 @@ func CombinedProber( proberCtx, &wg, selfUpProbeConnection.Client(), + nil, selfProbeConfiguration.URL, selfProbeConfiguration.Host, probe.SelfUp, @@ -196,7 +202,8 @@ func CombinedProber( debugging, ) } else { - panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n", debugging.Prefix, selfUpProbeConnection.Status())) + panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n", + debugging.Prefix, selfUpProbeConnection.Status())) } } if debug.IsDebug(debugging.Level) { @@ -228,7 +235,6 @@ func LoadGenerator( ) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes. throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate. ) { - throughputCalculations = make(chan ThroughputDataPoint) // The channel that we are going to use to send back the connection to use for probing may not immediately // be read by the caller. We don't want to wait around until they are ready before we start doing our work. @@ -236,7 +242,6 @@ func LoadGenerator( probeConnectionCommunicationChannel = make(chan lgc.LoadGeneratingConnection, 1) go func() { - flowsCreated := uint64(0) flowsCreated += addFlows( @@ -248,7 +253,7 @@ func LoadGenerator( ) // We have at least a single load-generating channel. This channel will be the one that - // the self probes use. Let's send it back to the caller so that they can pass it on if they need to. + // the self probes use. go func() { loadGeneratingConnectionsCollection.Lock.Lock() zerothConnection, err := loadGeneratingConnectionsCollection.Get(0) @@ -256,10 +261,14 @@ func LoadGenerator( if err != nil { panic("Could not get the zeroth connection!\n") } + // We are going to wait until it is started. if !(*zerothConnection).WaitUntilStarted(loadGeneratorCtx) { fmt.Fprintf(os.Stderr, "Could not wait until the zeroth load-generating connection was started!\n") return } + // Now that it is started, we will send it back to the caller so that + // they can pass it on to the CombinedProber which will use it for the + // self probes. probeConnectionCommunicationChannel <- *zerothConnection }() @@ -290,7 +299,8 @@ func LoadGenerator( // Compute "instantaneous aggregate" goodput which is the number of // bytes transferred within the last second. - var instantaneousTotalThroughput float64 = 0 + var instantaneousThroughputTotal float64 = 0 + var instantaneousThroughputDataPoints uint = 0 granularThroughputDatapoints := make([]GranularThroughputDataPoint, 0) now = time.Now() // Used to align granular throughput data allInvalid := true @@ -323,19 +333,30 @@ func LoadGenerator( granularThroughputDatapoints, GranularThroughputDataPoint{now, 0, uint32(i), 0, 0, ""}, ) - continue + } + case lgc.LGC_STATUS_NOT_STARTED: + { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Load-generating connection with id %d has not finished starting; "+ + "it will not contribute throughput during this interval.\n", + debugging, + (*loadGeneratingConnectionsCollection.LGCs)[i].ClientId()) + } } case lgc.LGC_STATUS_RUNNING: { allInvalid = false - currentTransferred, currentInterval := (*loadGeneratingConnectionsCollection.LGCs)[i].TransferredInInterval() + currentTransferred, currentInterval := + (*loadGeneratingConnectionsCollection.LGCs)[i].TransferredInInterval() // normalize to a second-long interval! instantaneousConnectionThroughput := float64( currentTransferred, ) / float64( currentInterval.Seconds(), ) - instantaneousTotalThroughput += instantaneousConnectionThroughput + instantaneousThroughputTotal += instantaneousConnectionThroughput + instantaneousThroughputDataPoints++ tcpRtt := time.Duration(0 * time.Second) tcpCwnd := uint32(0) @@ -362,7 +383,6 @@ func LoadGenerator( }, ) } - } } @@ -381,7 +401,8 @@ func LoadGenerator( // We have generated a throughput calculation -- let's send it back to the coordinator throughputDataPoint := ThroughputDataPoint{ time.Now(), - instantaneousTotalThroughput, + instantaneousThroughputTotal, + int(instantaneousThroughputDataPoints), len(*loadGeneratingConnectionsCollection.LGCs), granularThroughputDatapoints, } |
