diff options
Diffstat (limited to 'rpm/rpm.go')
| -rw-r--r-- | rpm/rpm.go | 596 |
1 files changed, 82 insertions, 514 deletions
@@ -20,7 +20,6 @@ import ( "fmt" "io" "net/http" - "net/http/httptrace" "os" "sync" "time" @@ -29,8 +28,7 @@ import ( "github.com/network-quality/goresponsiveness/debug" "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" - "github.com/network-quality/goresponsiveness/stats" - "github.com/network-quality/goresponsiveness/traceable" + "github.com/network-quality/goresponsiveness/probe" "github.com/network-quality/goresponsiveness/utilities" ) @@ -59,22 +57,6 @@ func addFlows( return toAdd } -type ProbeConfiguration struct { - ConnectToAddr string - URL string - Host string - InsecureSkipVerify bool -} - -type ProbeDataPoint struct { - Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"` - RoundTripCount uint64 `Description:"The number of round trips measured by this data point."` - Duration time.Duration `Description:"The duration for this measurement." Formatter:"Seconds"` - TCPRtt time.Duration `Description:"The underlying connection's RTT at probe time." Formatter:"Seconds"` - TCPCwnd uint32 `Description:"The underlying connection's congestion window at probe time."` - Type ProbeType `Description:"The type of the probe." Formatter:"Value"` -} - type GranularThroughputDataPoint struct { Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"` Throughput float64 `Description:"Instantaneous throughput (B/s)."` @@ -94,182 +76,26 @@ type ThroughputDataPoint struct { type SelfDataCollectionResult struct { RateBps float64 LGCs []lgc.LoadGeneratingConnection - ProbeDataPoints []ProbeDataPoint + ProbeDataPoints []probe.ProbeDataPoint LoggingContinuation func() } -type ProbeType int64 - -const ( - SelfUp ProbeType = iota - SelfDown - Foreign -) - -type ProbeRoundTripCountType uint16 - -const ( - DefaultDownRoundTripCount ProbeRoundTripCountType = 1 - SelfUpRoundTripCount ProbeRoundTripCountType = 1 - SelfDownRoundTripCount ProbeRoundTripCountType = 1 - ForeignRoundTripCount ProbeRoundTripCountType = 3 -) - -func (pt ProbeType) Value() string { - if pt == SelfUp { - return "SelfUp" - } else if pt == SelfDown { - return "SelfDown" - } - return "Foreign" -} - -func Probe( - managingCtx context.Context, - waitGroup *sync.WaitGroup, - client *http.Client, - probeUrl string, - probeHost string, // optional: for use with a test_endpoint - probeType ProbeType, - result *chan ProbeDataPoint, - captureExtendedStats bool, - debugging *debug.DebugWithPrefix, -) error { - - if waitGroup != nil { - waitGroup.Add(1) - defer waitGroup.Done() - } - - if client == nil { - return fmt.Errorf("cannot start a probe with a nil client") - } - - probeId := utilities.GenerateUniqueId() - probeTracer := NewProbeTracer(client, probeType, probeId, debugging) - time_before_probe := time.Now() - probe_req, err := http.NewRequestWithContext( - httptrace.WithClientTrace(managingCtx, probeTracer.trace), - "GET", - probeUrl, - nil, - ) - if err != nil { - return err - } - - // Used to disable compression - probe_req.Header.Set("Accept-Encoding", "identity") - probe_req.Header.Set("User-Agent", utilities.UserAgent()) - - probe_resp, err := client.Do(probe_req) - if err != nil { - return err - } - - // Header.Get returns "" when not set - if probe_resp.Header.Get("Content-Encoding") != "" { - return fmt.Errorf("Content-Encoding header was set (compression not allowed)") - } - - // TODO: Make this interruptable somehow by using _ctx_. - _, err = io.ReadAll(probe_resp.Body) - if err != nil { - return err - } - time_after_probe := time.Now() - - // Depending on whether we think that Close() requires another RTT (via TCP), we - // may need to move this before/after capturing the after time. - probe_resp.Body.Close() - - sanity := time_after_probe.Sub(time_before_probe) - - // When the probe is run on a load-generating connection (a self probe) there should - // only be a single round trip that is measured. We will take the accumulation of all these - // values just to be sure, though. Because of how this traced connection was launched, most - // of the values will be 0 (or very small where the time that go takes for delivering callbacks - // and doing context switches pokes through). When it is !isSelfProbe then the values will - // be significant and we want to add them regardless! - totalDelay := probeTracer.GetTLSAndHttpHeaderDelta() + probeTracer.GetHttpDownloadDelta( - time_after_probe, - ) + probeTracer.GetTCPDelta() - - // We must have reused the connection if we are a self probe! - if (probeType == SelfUp || probeType == SelfDown) && !probeTracer.stats.ConnectionReused { - panic(!probeTracer.stats.ConnectionReused) - } - - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "(%s) (%s Probe %v) sanity vs total: %v vs %v\n", - debugging.Prefix, - probeType.Value(), - probeId, - sanity, - totalDelay, - ) - } - roundTripCount := DefaultDownRoundTripCount - if probeType == Foreign { - roundTripCount = ForeignRoundTripCount - } - // Careful!!! It's possible that this channel has been closed because the Prober that - // started it has been stopped. Writing to a closed channel will cause a panic. It might not - // matter because a panic just stops the go thread containing the paniced code and we are in - // a go thread that executes only this function. - defer func() { - isThreadPanicing := recover() - if isThreadPanicing != nil && debug.IsDebug(debugging.Level) { - fmt.Printf( - "(%s) (%s Probe %v) Probe attempted to write to the result channel after its invoker ended (official reason: %v).\n", - debugging.Prefix, - probeType.Value(), - probeId, - isThreadPanicing, - ) - } - }() - tcpRtt := time.Duration(0 * time.Second) - tcpCwnd := uint32(0) - // TODO: Only get the extended stats for a connection if the user has requested them overall. - if captureExtendedStats && extendedstats.ExtendedStatsAvailable() { - tcpInfo, err := extendedstats.GetTCPInfo(probeTracer.stats.ConnInfo.Conn) - if err == nil { - tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond - tcpCwnd = tcpInfo.Snd_cwnd - } else { - fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err) - } - } - dataPoint := ProbeDataPoint{ - Time: time_before_probe, - RoundTripCount: uint64(roundTripCount), - Duration: totalDelay, - TCPRtt: tcpRtt, - TCPCwnd: tcpCwnd, - Type: probeType, - } - *result <- dataPoint - return nil -} - func CombinedProber( proberCtx context.Context, networkActivityCtx context.Context, - foreignProbeConfigurationGenerator func() ProbeConfiguration, - selfProbeConfigurationGenerator func() ProbeConfiguration, + foreignProbeConfigurationGenerator func() probe.ProbeConfiguration, + selfProbeConfigurationGenerator func() probe.ProbeConfiguration, selfDownProbeConnection lgc.LoadGeneratingConnection, selfUpProbeConnection lgc.LoadGeneratingConnection, probeInterval time.Duration, keyLogger io.Writer, captureExtendedStats bool, debugging *debug.DebugWithPrefix, -) (dataPoints chan ProbeDataPoint) { +) (dataPoints chan probe.ProbeDataPoint) { // Make a channel to send back all the generated data points // when we are probing. - dataPoints = make(chan ProbeDataPoint) + dataPoints = make(chan probe.ProbeDataPoint) go func() { wg := sync.WaitGroup{} @@ -319,39 +145,39 @@ func CombinedProber( // Start Foreign Connection Prober probeCount++ - go Probe( + go probe.Probe( networkActivityCtx, &wg, foreignProbeClient, foreignProbeConfiguration.URL, foreignProbeConfiguration.Host, - Foreign, + probe.Foreign, &dataPoints, captureExtendedStats, debugging, ) // Start Self Download Connection Prober - go Probe( + go probe.Probe( networkActivityCtx, &wg, selfDownProbeConnection.Client(), selfProbeConfiguration.URL, selfProbeConfiguration.Host, - SelfDown, + probe.SelfDown, &dataPoints, captureExtendedStats, debugging, ) // Start Self Upload Connection Prober - go Probe( + go probe.Probe( proberCtx, &wg, selfUpProbeConnection.Client(), selfProbeConfiguration.URL, selfProbeConfiguration.Host, - SelfUp, + probe.SelfUp, &dataPoints, captureExtendedStats, debugging, @@ -380,7 +206,7 @@ func LoadGenerator( loadGeneratorCtx context.Context, // Stop our activity when we no longer need to generate load. rampupInterval time.Duration, lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection. - loadGeneratingConnections *lgc.LoadGeneratingConnectionCollection, + loadGeneratingConnectionsCollection *lgc.LoadGeneratingConnectionCollection, captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections? debugging *debug.DebugWithPrefix, // How can we forget debugging? ) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes. @@ -400,7 +226,7 @@ func LoadGenerator( flowsCreated += addFlows( networkActivityCtx, constants.StartingNumberOfLoadGeneratingConnections, - loadGeneratingConnections, + loadGeneratingConnectionsCollection, lgcGenerator, debugging.Level, ) @@ -408,9 +234,9 @@ func LoadGenerator( // We have at least a single load-generating channel. This channel will be the one that // the self probes use. Let's send it back to the caller so that they can pass it on if they need to. go func() { - loadGeneratingConnections.Lock.Lock() - zerothConnection, err := loadGeneratingConnections.Get(0) - loadGeneratingConnections.Lock.Unlock() + loadGeneratingConnectionsCollection.Lock.Lock() + zerothConnection, err := loadGeneratingConnectionsCollection.Get(0) + loadGeneratingConnectionsCollection.Lock.Unlock() if err != nil { panic("Could not get the zeroth connection!\n") } @@ -452,56 +278,76 @@ func LoadGenerator( granularThroughputDatapoints := make([]GranularThroughputDataPoint, 0) now = time.Now() // Used to align granular throughput data allInvalid := true - for i := range *loadGeneratingConnections.LGCs { - if !(*loadGeneratingConnections.LGCs)[i].IsValid() { - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "%v: Load-generating connection with id %d is invalid ... skipping.\n", + for i := range *loadGeneratingConnectionsCollection.LGCs { + loadGeneratingConnectionsCollection.Lock.Lock() + connectionState := (*loadGeneratingConnectionsCollection.LGCs)[i].Status() + loadGeneratingConnectionsCollection.Lock.Unlock() + switch connectionState { + default: + { + error := fmt.Sprintf( + "%v: Load-generating connection with id %d is in an unrecognizable state.\n", debugging, - (*loadGeneratingConnections.LGCs)[i].ClientId(), + (*loadGeneratingConnectionsCollection.LGCs)[i].ClientId()) + fmt.Fprintf(os.Stderr, "%s", error) + panic(error) + } + case lgc.LGC_STATUS_ERROR, + lgc.LGC_STATUS_DONE: + { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Load-generating connection with id %d is invalid or complete ... skipping.\n", + debugging, + (*loadGeneratingConnectionsCollection.LGCs)[i].ClientId(), + ) + } + // TODO: Do we add null connection to throughput? and how do we define it? Throughput -1 or 0? + granularThroughputDatapoints = append( + granularThroughputDatapoints, + GranularThroughputDataPoint{now, 0, uint32(i), 0, 0, ""}, ) + continue } - // TODO: Do we add null connection to throughput? and how do we define it? Throughput -1 or 0? - granularThroughputDatapoints = append( - granularThroughputDatapoints, - GranularThroughputDataPoint{now, 0, uint32(i), 0, 0, ""}, - ) - continue - } - allInvalid = false - currentTransferred, currentInterval := (*loadGeneratingConnections.LGCs)[i].TransferredInInterval() - // normalize to a second-long interval! - instantaneousConnectionThroughput := float64( - currentTransferred, - ) / float64( - currentInterval.Seconds(), - ) - instantaneousTotalThroughput += instantaneousConnectionThroughput + case lgc.LGC_STATUS_RUNNING: + { + allInvalid = false + currentTransferred, currentInterval := (*loadGeneratingConnectionsCollection.LGCs)[i].TransferredInInterval() + // normalize to a second-long interval! + instantaneousConnectionThroughput := float64( + currentTransferred, + ) / float64( + currentInterval.Seconds(), + ) + instantaneousTotalThroughput += instantaneousConnectionThroughput - tcpRtt := time.Duration(0 * time.Second) - tcpCwnd := uint32(0) - if captureExtendedStats && extendedstats.ExtendedStatsAvailable() { - if stats := (*loadGeneratingConnections.LGCs)[i].Stats(); stats != nil { - tcpInfo, err := extendedstats.GetTCPInfo(stats.ConnInfo.Conn) - if err == nil { - tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond - tcpCwnd = tcpInfo.Snd_cwnd - } else { - fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err) + tcpRtt := time.Duration(0 * time.Second) + tcpCwnd := uint32(0) + if captureExtendedStats && extendedstats.ExtendedStatsAvailable() { + if stats := (*loadGeneratingConnectionsCollection.LGCs)[i].Stats(); stats != nil { + tcpInfo, err := extendedstats.GetTCPInfo(stats.ConnInfo.Conn) + if err == nil { + tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond + tcpCwnd = tcpInfo.Snd_cwnd + } else { + fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err) + } + } } + granularThroughputDatapoints = append( + granularThroughputDatapoints, + GranularThroughputDataPoint{ + now, + instantaneousConnectionThroughput, + uint32(i), + tcpRtt, + tcpCwnd, + "", + }, + ) } + } - granularThroughputDatapoints = append( - granularThroughputDatapoints, - GranularThroughputDataPoint{ - now, - instantaneousConnectionThroughput, - uint32(i), - tcpRtt, - tcpCwnd, - "", - }, - ) } // For some reason, all the lgcs are invalid. This likely means that @@ -520,7 +366,7 @@ func LoadGenerator( throughputDataPoint := ThroughputDataPoint{ time.Now(), instantaneousTotalThroughput, - len(*loadGeneratingConnections.LGCs), + len(*loadGeneratingConnectionsCollection.LGCs), granularThroughputDatapoints, } throughputCalculations <- throughputDataPoint @@ -529,7 +375,7 @@ func LoadGenerator( flowsCreated += addFlows( networkActivityCtx, constants.AdditiveNumberOfLoadGeneratingConnections, - loadGeneratingConnections, + loadGeneratingConnectionsCollection, lgcGenerator, debugging.Level, ) @@ -543,281 +389,3 @@ func LoadGenerator( }() return } - -type ProbeTracer struct { - client *http.Client - stats *stats.TraceStats - trace *httptrace.ClientTrace - debug debug.DebugLevel - probeid uint64 - probeType ProbeType -} - -func (p *ProbeTracer) String() string { - return fmt.Sprintf("(Probe %v): stats: %v\n", p.probeid, p.stats) -} - -func (p *ProbeTracer) ProbeId() uint64 { - return p.probeid -} - -func (p *ProbeTracer) GetTrace() *httptrace.ClientTrace { - return p.trace -} - -func (p *ProbeTracer) GetDnsDelta() time.Duration { - if p.stats.ConnectionReused { - return time.Duration(0) - } - delta := p.stats.DnsDoneTime.Sub(p.stats.DnsStartTime) - if debug.IsDebug(p.debug) { - fmt.Printf("(Probe %v): DNS Time: %v\n", p.probeid, delta) - } - return delta -} - -func (p *ProbeTracer) GetTCPDelta() time.Duration { - if p.stats.ConnectionReused { - return time.Duration(0) - } - delta := p.stats.ConnectDoneTime.Sub(p.stats.ConnectStartTime) - if debug.IsDebug(p.debug) { - fmt.Printf("(Probe %v): TCP Connection Time: %v\n", p.probeid, delta) - } - return delta -} - -func (p *ProbeTracer) GetTLSDelta() time.Duration { - if utilities.IsSome(p.stats.TLSDoneTime) { - panic("There should not be TLS information, but there is.") - } - delta := time.Duration(0) - if debug.IsDebug(p.debug) { - fmt.Printf("(Probe %v): TLS Time: %v\n", p.probeid, delta) - } - return delta -} - -func (p *ProbeTracer) GetTLSAndHttpHeaderDelta() time.Duration { - // Because the TLS handshake occurs *after* the TCP connection (ConnectDoneTime) - // and *before* the HTTP transaction, we know that the delta between the time - // that the first HTTP response byte is available and the time that the TCP - // connection was established includes both the time for the HTTP header RTT - // *and* the TLS handshake RTT, whether we can specifically measure the latter - // or not. Eventually when TLS handshake tracing is fixed, we can break these - // into separate buckets, but for now this workaround is reasonable. - before := p.stats.ConnectDoneTime - if p.stats.ConnectionReused { - // When we reuse a connection there will be no time logged for when the - // TCP connection was established (obviously). So, fall back to the time - // when we were notified about reusing a connection (as a close approximation!). - before = p.stats.GetConnectionDoneTime - } - delta := p.stats.HttpResponseReadyTime.Sub(before) - if debug.IsDebug(p.debug) { - fmt.Printf("(Probe %v): Http TLS and Header Time: %v\n", p.probeid, delta) - } - return delta -} - -func (p *ProbeTracer) GetHttpHeaderDelta() time.Duration { - panic( - "Unusable until TLS tracing support is enabled! Use GetTLSAndHttpHeaderDelta() instead.\n", - ) - delta := p.stats.HttpResponseReadyTime.Sub(utilities.GetSome(p.stats.TLSDoneTime)) - if debug.IsDebug(p.debug) { - fmt.Printf("(Probe %v): Http Header Time: %v\n", p.probeid, delta) - } - return delta -} - -func (p *ProbeTracer) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration { - delta := httpDoneTime.Sub(p.stats.HttpResponseReadyTime) - if debug.IsDebug(p.debug) { - fmt.Printf("(Probe %v): Http Download Time: %v\n", p.probeid, delta) - } - return delta -} - -func NewProbeTracer( - client *http.Client, - probeType ProbeType, - probeId uint64, - debugging *debug.DebugWithPrefix, -) *ProbeTracer { - probe := &ProbeTracer{ - client: client, - stats: &stats.TraceStats{}, - trace: nil, - debug: debugging.Level, - probeid: probeId, - probeType: probeType, - } - trace := traceable.GenerateHttpTimingTracer(probe, debugging.Level) - - probe.trace = trace - return probe -} - -func (probe *ProbeTracer) SetDnsStartTimeInfo( - now time.Time, - dnsStartInfo httptrace.DNSStartInfo, -) { - probe.stats.DnsStartTime = now - probe.stats.DnsStart = dnsStartInfo - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) DNS Start for Probe %v: %v\n", - probe.probeType.Value(), - probe.ProbeId(), - dnsStartInfo, - ) - } -} - -func (probe *ProbeTracer) SetDnsDoneTimeInfo( - now time.Time, - dnsDoneInfo httptrace.DNSDoneInfo, -) { - probe.stats.DnsDoneTime = now - probe.stats.DnsDone = dnsDoneInfo - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) DNS Done for Probe %v: %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.DnsDone, - ) - } -} - -func (probe *ProbeTracer) SetConnectStartTime( - now time.Time, -) { - probe.stats.ConnectStartTime = now - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) TCP Start for Probe %v at %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.ConnectStartTime, - ) - } -} - -func (probe *ProbeTracer) SetConnectDoneTimeError( - now time.Time, - err error, -) { - probe.stats.ConnectDoneTime = now - probe.stats.ConnectDoneError = err - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) TCP Done for Probe %v (with error %v) @ %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.ConnectDoneError, - probe.stats.ConnectDoneTime, - ) - } -} - -func (probe *ProbeTracer) SetGetConnTime(now time.Time) { - probe.stats.GetConnectionStartTime = now - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) Started getting connection for Probe %v @ %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.GetConnectionStartTime, - ) - } -} - -func (probe *ProbeTracer) SetGotConnTimeInfo( - now time.Time, - gotConnInfo httptrace.GotConnInfo, -) { - probe.stats.GetConnectionDoneTime = now - probe.stats.ConnInfo = gotConnInfo - probe.stats.ConnectionReused = gotConnInfo.Reused - if (probe.probeType == SelfUp || probe.probeType == SelfDown) && !gotConnInfo.Reused { - fmt.Fprintf( - os.Stderr, - "A self probe sent using a new connection!\n", - ) - } - if gotConnInfo.Reused { - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) Got a reused connection for Probe %v at %v with info %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.GetConnectionDoneTime, - probe.stats.ConnInfo, - ) - } - } -} - -func (probe *ProbeTracer) SetTLSHandshakeStartTime( - now time.Time, -) { - probe.stats.TLSStartTime = utilities.Some(now) - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) Started TLS Handshake for Probe %v @ %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.TLSStartTime, - ) - } -} - -func (probe *ProbeTracer) SetTLSHandshakeDoneTimeState( - now time.Time, - connectionState tls.ConnectionState, -) { - probe.stats.TLSDoneTime = utilities.Some(now) - probe.stats.TLSConnInfo = connectionState - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) Completed TLS handshake for Probe %v at %v with info %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.TLSDoneTime, - probe.stats.TLSConnInfo, - ) - } -} - -func (probe *ProbeTracer) SetHttpWroteRequestTimeInfo( - now time.Time, - info httptrace.WroteRequestInfo, -) { - probe.stats.HttpWroteRequestTime = now - probe.stats.HttpInfo = info - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) Http finished writing request for Probe %v at %v with info %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.HttpWroteRequestTime, - probe.stats.HttpInfo, - ) - } -} - -func (probe *ProbeTracer) SetHttpResponseReadyTime( - now time.Time, -) { - probe.stats.HttpResponseReadyTime = now - if debug.IsDebug(probe.debug) { - fmt.Printf( - "(%s Probe) Http response is ready for Probe %v at %v\n", - probe.probeType.Value(), - probe.ProbeId(), - probe.stats.HttpResponseReadyTime, - ) - } -} |
