diff options
Diffstat (limited to 'rpm/rpm.go')
| -rw-r--r-- | rpm/rpm.go | 80 |
1 files changed, 57 insertions, 23 deletions
@@ -40,19 +40,23 @@ func addFlows( lgcc *lgc.LoadGeneratingConnectionCollection, lgcGenerator func() lgc.LoadGeneratingConnection, debug debug.DebugLevel, -) { +) uint64 { lgcc.Lock.Lock() defer lgcc.Lock.Unlock() for i := uint64(0); i < toAdd; i++ { + // First, generate the connection. *lgcc.LGCs = append(*lgcc.LGCs, lgcGenerator()) + // Second, try to start the connection. if !(*lgcc.LGCs)[len(*lgcc.LGCs)-1].Start(ctx, debug) { + // If there was an error, we'll make sure that the caller knows it. fmt.Printf( "Error starting lgc with id %d!\n", (*lgcc.LGCs)[len(*lgcc.LGCs)-1].ClientId(), ) - return + return i } } + return toAdd } type ProbeConfiguration struct { @@ -70,10 +74,12 @@ type ProbeDataPoint struct { } type GranularThroughputDataPoint struct { - Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"` - Throughput float64 `Description:"Instantaneous throughput (B/s)."` - ConnID uint32 `Description:"Position of connection (ID)."` - Direction string `Description:"Direction of Throughput."` + Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"` + Throughput float64 `Description:"Instantaneous throughput (B/s)."` + ConnID uint32 `Description:"Position of connection (ID)."` + TCPRtt time.Duration `Description:"The underlying connection's RTT at probe time." Formatter:"Seconds"` + TCPCwnd uint32 `Description:"The underlying connection's congestion window at probe time."` + Direction string `Description:"Direction of Throughput."` } type ThroughputDataPoint struct { @@ -98,6 +104,15 @@ const ( Foreign ) +type ProbeRoundTripCountType uint16 + +const ( + DefaultDownRoundTripCount ProbeRoundTripCountType = 1 + SelfUpRoundTripCount ProbeRoundTripCountType = 1 + SelfDownRoundTripCount ProbeRoundTripCountType = 1 + ForeignRoundTripCount ProbeRoundTripCountType = 3 +) + func (pt ProbeType) Value() string { if pt == SelfUp { return "SelfUp" @@ -108,13 +123,14 @@ func (pt ProbeType) Value() string { } func Probe( - parentProbeCtx context.Context, + managingCtx context.Context, waitGroup *sync.WaitGroup, client *http.Client, probeUrl string, probeHost string, // optional: for use with a test_endpoint probeType ProbeType, result *chan ProbeDataPoint, + captureExtendedStats bool, debugging *debug.DebugWithPrefix, ) error { @@ -131,7 +147,7 @@ func Probe( probeTracer := NewProbeTracer(client, probeType, probeId, debugging) time_before_probe := time.Now() probe_req, err := http.NewRequestWithContext( - httptrace.WithClientTrace(parentProbeCtx, probeTracer.trace), + httptrace.WithClientTrace(managingCtx, probeTracer.trace), "GET", probeUrl, nil, @@ -195,9 +211,9 @@ func Probe( totalDelay, ) } - roundTripCount := uint64(1) + roundTripCount := DefaultDownRoundTripCount if probeType == Foreign { - roundTripCount = 3 + roundTripCount = ForeignRoundTripCount } // Careful!!! It's possible that this channel has been closed because the Prober that // started it has been stopped. Writing to a closed channel will cause a panic. It might not @@ -218,7 +234,7 @@ func Probe( tcpRtt := time.Duration(0 * time.Second) tcpCwnd := uint32(0) // TODO: Only get the extended stats for a connection if the user has requested them overall. - if extendedstats.ExtendedStatsAvailable() { + if captureExtendedStats && extendedstats.ExtendedStatsAvailable() { tcpInfo, err := extendedstats.GetTCPInfo(probeTracer.stats.ConnInfo.Conn) if err == nil { tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond @@ -229,7 +245,7 @@ func Probe( } dataPoint := ProbeDataPoint{ Time: time_before_probe, - RoundTripCount: roundTripCount, + RoundTripCount: uint64(roundTripCount), Duration: totalDelay, TCPRtt: tcpRtt, TCPCwnd: tcpCwnd, @@ -241,12 +257,14 @@ func Probe( func CombinedProber( proberCtx context.Context, + networkActivityCtx context.Context, foreignProbeConfigurationGenerator func() ProbeConfiguration, selfProbeConfigurationGenerator func() ProbeConfiguration, selfDownProbeConnection lgc.LoadGeneratingConnection, selfUpProbeConnection lgc.LoadGeneratingConnection, probeInterval time.Duration, keyLogger io.Writer, + captureExtendedStats bool, debugging *debug.DebugWithPrefix, ) (dataPoints chan ProbeDataPoint) { @@ -296,31 +314,34 @@ func CombinedProber( foreignProbeClient := &http.Client{Transport: &transport} + // Start Foreign Connection Prober probeCount++ go Probe( - proberCtx, + networkActivityCtx, &wg, foreignProbeClient, foreignProbeConfiguration.URL, foreignProbeConfiguration.Host, Foreign, &dataPoints, + captureExtendedStats, debugging, ) - // Start Download Connection Prober + // Start Self Download Connection Prober go Probe( - proberCtx, + networkActivityCtx, &wg, selfDownProbeConnection.Client(), selfProbeConfiguration.URL, selfProbeConfiguration.Host, SelfDown, &dataPoints, + captureExtendedStats, debugging, ) - // Start Upload Connection Prober + // Start Self Upload Connection Prober go Probe( proberCtx, &wg, @@ -329,6 +350,7 @@ func CombinedProber( selfProbeConfiguration.Host, SelfUp, &dataPoints, + captureExtendedStats, debugging, ) } @@ -356,6 +378,7 @@ func LoadGenerator( rampupInterval time.Duration, lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection. loadGeneratingConnections *lgc.LoadGeneratingConnectionCollection, + captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections? debugging *debug.DebugWithPrefix, // How can we forget debugging? ) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes. throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate. @@ -371,14 +394,13 @@ func LoadGenerator( flowsCreated := uint64(0) - addFlows( + flowsCreated += addFlows( networkActivityCtx, constants.StartingNumberOfLoadGeneratingConnections, loadGeneratingConnections, lgcGenerator, debugging.Level, ) - flowsCreated += constants.StartingNumberOfLoadGeneratingConnections // We have at least a single load-generating channel. This channel will be the one that // the self probes use. Let's send it back to the caller so that they can pass it on if they need to. @@ -425,7 +447,7 @@ func LoadGenerator( ) } // TODO: Do we add null connection to throughput? and how do we define it? Throughput -1 or 0? - granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, 0, uint32(i), ""}) + granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, 0, uint32(i), 0, 0, ""}) continue } allInvalid = false @@ -437,7 +459,21 @@ func LoadGenerator( currentInterval.Seconds(), ) instantaneousTotalThroughput += instantaneousConnectionThroughput - granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, instantaneousConnectionThroughput, uint32(i), ""}) + + tcpRtt := time.Duration(0 * time.Second) + tcpCwnd := uint32(0) + if captureExtendedStats && extendedstats.ExtendedStatsAvailable() { + if stats := (*loadGeneratingConnections.LGCs)[i].Stats(); stats != nil { + tcpInfo, err := extendedstats.GetTCPInfo(stats.ConnInfo.Conn) + if err == nil { + tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond + tcpCwnd = tcpInfo.Snd_cwnd + } else { + fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err) + } + } + } + granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, instantaneousConnectionThroughput, uint32(i), tcpRtt, tcpCwnd, ""}) } // For some reason, all the lgcs are invalid. This likely means that @@ -457,15 +493,13 @@ func LoadGenerator( throughputCalculations <- throughputDataPoint // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now! - addFlows( + flowsCreated += addFlows( networkActivityCtx, constants.AdditiveNumberOfLoadGeneratingConnections, loadGeneratingConnections, lgcGenerator, debugging.Level, ) - - flowsCreated += constants.AdditiveNumberOfLoadGeneratingConnections } if debug.IsDebug(debugging.Level) { |
