diff options
| author | Will Hawkins <[email protected]> | 2023-04-23 16:33:08 -0400 |
|---|---|---|
| committer | Will Hawkins <[email protected]> | 2023-04-23 16:33:08 -0400 |
| commit | 9e9963f7767719e4c26e2345169d7768ec5151ac (patch) | |
| tree | 1d0d4ae80ecde3f2f56844debaa7053396caa0d3 | |
| parent | e8358eb1133e00fc8eeb4fe54c220b8ad6a91564 (diff) | |
[Bugfix] Missed adding probe package during refactor
| -rw-r--r-- | probe/probe.go | 201 | ||||
| -rw-r--r-- | probe/tracer.go | 311 |
2 files changed, 512 insertions, 0 deletions
diff --git a/probe/probe.go b/probe/probe.go new file mode 100644 index 0000000..5777bb3 --- /dev/null +++ b/probe/probe.go @@ -0,0 +1,201 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package probe + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptrace" + "sync" + "time" + + "github.com/network-quality/goresponsiveness/debug" + "github.com/network-quality/goresponsiveness/extendedstats" + "github.com/network-quality/goresponsiveness/utilities" +) + +type ProbeType int64 + +type ProbeConfiguration struct { + ConnectToAddr string + URL string + Host string + InsecureSkipVerify bool +} + +type ProbeDataPoint struct { + Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"` + RoundTripCount uint64 `Description:"The number of round trips measured by this data point."` + Duration time.Duration `Description:"The duration for this measurement." Formatter:"Seconds"` + TCPRtt time.Duration `Description:"The underlying connection's RTT at probe time." Formatter:"Seconds"` + TCPCwnd uint32 `Description:"The underlying connection's congestion window at probe time."` + Type ProbeType `Description:"The type of the probe." Formatter:"Value"` +} + +const ( + SelfUp ProbeType = iota + SelfDown + Foreign +) + +type ProbeRoundTripCountType uint16 + +const ( + DefaultDownRoundTripCount ProbeRoundTripCountType = 1 + SelfUpRoundTripCount ProbeRoundTripCountType = 1 + SelfDownRoundTripCount ProbeRoundTripCountType = 1 + ForeignRoundTripCount ProbeRoundTripCountType = 3 +) + +func (pt ProbeType) Value() string { + if pt == SelfUp { + return "SelfUp" + } else if pt == SelfDown { + return "SelfDown" + } + return "Foreign" +} + +func Probe( + managingCtx context.Context, + waitGroup *sync.WaitGroup, + client *http.Client, + probeUrl string, + probeHost string, // optional: for use with a test_endpoint + probeType ProbeType, + result *chan ProbeDataPoint, + captureExtendedStats bool, + debugging *debug.DebugWithPrefix, +) error { + + if waitGroup != nil { + waitGroup.Add(1) + defer waitGroup.Done() + } + + if client == nil { + return fmt.Errorf("cannot start a probe with a nil client") + } + + probeId := utilities.GenerateUniqueId() + probeTracer := NewProbeTracer(client, probeType, probeId, debugging) + time_before_probe := time.Now() + probe_req, err := http.NewRequestWithContext( + httptrace.WithClientTrace(managingCtx, probeTracer.trace), + "GET", + probeUrl, + nil, + ) + if err != nil { + return err + } + + // Used to disable compression + probe_req.Header.Set("Accept-Encoding", "identity") + probe_req.Header.Set("User-Agent", utilities.UserAgent()) + + probe_resp, err := client.Do(probe_req) + if err != nil { + return err + } + + // Header.Get returns "" when not set + if probe_resp.Header.Get("Content-Encoding") != "" { + return fmt.Errorf("Content-Encoding header was set (compression not allowed)") + } + + // TODO: Make this interruptable somehow by using _ctx_. + _, err = io.ReadAll(probe_resp.Body) + if err != nil { + return err + } + time_after_probe := time.Now() + + // Depending on whether we think that Close() requires another RTT (via TCP), we + // may need to move this before/after capturing the after time. + probe_resp.Body.Close() + + sanity := time_after_probe.Sub(time_before_probe) + + // When the probe is run on a load-generating connection (a self probe) there should + // only be a single round trip that is measured. We will take the accumulation of all these + // values just to be sure, though. Because of how this traced connection was launched, most + // of the values will be 0 (or very small where the time that go takes for delivering callbacks + // and doing context switches pokes through). When it is !isSelfProbe then the values will + // be significant and we want to add them regardless! + totalDelay := probeTracer.GetTLSAndHttpHeaderDelta() + probeTracer.GetHttpDownloadDelta( + time_after_probe, + ) + probeTracer.GetTCPDelta() + + // We must have reused the connection if we are a self probe! + if (probeType == SelfUp || probeType == SelfDown) && !probeTracer.stats.ConnectionReused { + panic(!probeTracer.stats.ConnectionReused) + } + + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) (%s Probe %v) sanity vs total: %v vs %v\n", + debugging.Prefix, + probeType.Value(), + probeId, + sanity, + totalDelay, + ) + } + roundTripCount := DefaultDownRoundTripCount + if probeType == Foreign { + roundTripCount = ForeignRoundTripCount + } + // Careful!!! It's possible that this channel has been closed because the Prober that + // started it has been stopped. Writing to a closed channel will cause a panic. It might not + // matter because a panic just stops the go thread containing the paniced code and we are in + // a go thread that executes only this function. + defer func() { + isThreadPanicing := recover() + if isThreadPanicing != nil && debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) (%s Probe %v) Probe attempted to write to the result channel after its invoker ended (official reason: %v).\n", + debugging.Prefix, + probeType.Value(), + probeId, + isThreadPanicing, + ) + } + }() + tcpRtt := time.Duration(0 * time.Second) + tcpCwnd := uint32(0) + // TODO: Only get the extended stats for a connection if the user has requested them overall. + if captureExtendedStats && extendedstats.ExtendedStatsAvailable() { + tcpInfo, err := extendedstats.GetTCPInfo(probeTracer.stats.ConnInfo.Conn) + if err == nil { + tcpRtt = time.Duration(tcpInfo.Rtt) * time.Microsecond + tcpCwnd = tcpInfo.Snd_cwnd + } else { + fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err) + } + } + dataPoint := ProbeDataPoint{ + Time: time_before_probe, + RoundTripCount: uint64(roundTripCount), + Duration: totalDelay, + TCPRtt: tcpRtt, + TCPCwnd: tcpCwnd, + Type: probeType, + } + *result <- dataPoint + return nil +} diff --git a/probe/tracer.go b/probe/tracer.go new file mode 100644 index 0000000..bea1334 --- /dev/null +++ b/probe/tracer.go @@ -0,0 +1,311 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package probe + +import ( + "crypto/tls" + "fmt" + "net/http" + "net/http/httptrace" + "os" + "time" + + "github.com/network-quality/goresponsiveness/debug" + "github.com/network-quality/goresponsiveness/stats" + "github.com/network-quality/goresponsiveness/traceable" + "github.com/network-quality/goresponsiveness/utilities" +) + +type ProbeTracer struct { + client *http.Client + stats *stats.TraceStats + trace *httptrace.ClientTrace + debug debug.DebugLevel + probeid uint64 + probeType ProbeType +} + +func (p *ProbeTracer) String() string { + return fmt.Sprintf("(Probe %v): stats: %v\n", p.probeid, p.stats) +} + +func (p *ProbeTracer) ProbeId() uint64 { + return p.probeid +} + +func (p *ProbeTracer) GetTrace() *httptrace.ClientTrace { + return p.trace +} + +func (p *ProbeTracer) GetDnsDelta() time.Duration { + if p.stats.ConnectionReused { + return time.Duration(0) + } + delta := p.stats.DnsDoneTime.Sub(p.stats.DnsStartTime) + if debug.IsDebug(p.debug) { + fmt.Printf("(Probe %v): DNS Time: %v\n", p.probeid, delta) + } + return delta +} + +func (p *ProbeTracer) GetTCPDelta() time.Duration { + if p.stats.ConnectionReused { + return time.Duration(0) + } + delta := p.stats.ConnectDoneTime.Sub(p.stats.ConnectStartTime) + if debug.IsDebug(p.debug) { + fmt.Printf("(Probe %v): TCP Connection Time: %v\n", p.probeid, delta) + } + return delta +} + +func (p *ProbeTracer) GetTLSDelta() time.Duration { + if utilities.IsSome(p.stats.TLSDoneTime) { + panic("There should not be TLS information, but there is.") + } + delta := time.Duration(0) + if debug.IsDebug(p.debug) { + fmt.Printf("(Probe %v): TLS Time: %v\n", p.probeid, delta) + } + return delta +} + +func (p *ProbeTracer) GetTLSAndHttpHeaderDelta() time.Duration { + // Because the TLS handshake occurs *after* the TCP connection (ConnectDoneTime) + // and *before* the HTTP transaction, we know that the delta between the time + // that the first HTTP response byte is available and the time that the TCP + // connection was established includes both the time for the HTTP header RTT + // *and* the TLS handshake RTT, whether we can specifically measure the latter + // or not. Eventually when TLS handshake tracing is fixed, we can break these + // into separate buckets, but for now this workaround is reasonable. + before := p.stats.ConnectDoneTime + if p.stats.ConnectionReused { + // When we reuse a connection there will be no time logged for when the + // TCP connection was established (obviously). So, fall back to the time + // when we were notified about reusing a connection (as a close approximation!). + before = p.stats.GetConnectionDoneTime + } + delta := p.stats.HttpResponseReadyTime.Sub(before) + if debug.IsDebug(p.debug) { + fmt.Printf("(Probe %v): Http TLS and Header Time: %v\n", p.probeid, delta) + } + return delta +} + +func (p *ProbeTracer) GetHttpHeaderDelta() time.Duration { + panic( + "Unusable until TLS tracing support is enabled! Use GetTLSAndHttpHeaderDelta() instead.\n", + ) + delta := p.stats.HttpResponseReadyTime.Sub(utilities.GetSome(p.stats.TLSDoneTime)) + if debug.IsDebug(p.debug) { + fmt.Printf("(Probe %v): Http Header Time: %v\n", p.probeid, delta) + } + return delta +} + +func (p *ProbeTracer) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration { + delta := httpDoneTime.Sub(p.stats.HttpResponseReadyTime) + if debug.IsDebug(p.debug) { + fmt.Printf("(Probe %v): Http Download Time: %v\n", p.probeid, delta) + } + return delta +} + +func (probe *ProbeTracer) SetDnsStartTimeInfo( + now time.Time, + dnsStartInfo httptrace.DNSStartInfo, +) { + probe.stats.DnsStartTime = now + probe.stats.DnsStart = dnsStartInfo + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(%s Probe) DNS Start for Probe %v: %v\n", + probe.probeType.Value(), + probe.ProbeId(), + dnsStartInfo, + ) + } +} + +func (probe *ProbeTracer) SetDnsDoneTimeInfo( + now time.Time, + dnsDoneInfo httptrace.DNSDoneInfo, +) { + probe.stats.DnsDoneTime = now + probe.stats.DnsDone = dnsDoneInfo + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(%s Probe) DNS Done for Probe %v: %v\n", + probe.probeType.Value(), + probe.ProbeId(), + probe.stats.DnsDone, + ) + } +} + +func (probe *ProbeTracer) SetConnectStartTime( + now time.Time, +) { + probe.stats.ConnectStartTime = now + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(%s Probe) TCP Start for Probe %v at %v\n", + probe.probeType.Value(), + probe.ProbeId(), + probe.stats.ConnectStartTime, + ) + } +} + +func (probe *ProbeTracer) SetConnectDoneTimeError( + now time.Time, + err error, +) { + probe.stats.ConnectDoneTime = now + probe.stats.ConnectDoneError = err + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(%s Probe) TCP Done for Probe %v (with error %v) @ %v\n", + probe.probeType.Value(), + probe.ProbeId(), + probe.stats.ConnectDoneError, + probe.stats.ConnectDoneTime, + ) + } +} + +func (probe *ProbeTracer) SetGetConnTime(now time.Time) { + probe.stats.GetConnectionStartTime = now + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(%s Probe) Started getting connection for Probe %v @ %v\n", + probe.probeType.Value(), + probe.ProbeId(), + probe.stats.GetConnectionStartTime, + ) + } +} + +func (probe *ProbeTracer) SetGotConnTimeInfo( + now time.Time, + gotConnInfo httptrace.GotConnInfo, +) { + probe.stats.GetConnectionDoneTime = now + probe.stats.ConnInfo = gotConnInfo + probe.stats.ConnectionReused = gotConnInfo.Reused + if (probe.probeType == SelfUp || probe.probeType == SelfDown) && !gotConnInfo.Reused { + fmt.Fprintf( + os.Stderr, + "(%s Probe) Probe %v at %v with info %v did not properly reuse a connection.\n", + probe.probeType.Value(), + probe.ProbeId(), + probe.stats.GetConnectionDoneTime, + probe.stats.ConnInfo, + ) + } + if gotConnInfo.Reused { + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(%s Probe) Got a reused connection for Probe %v at %v with info %v.\n", + probe.probeType.Value(), + probe.ProbeId(), + probe.stats.GetConnectionDoneTime, + probe.stats.ConnInfo, + ) + } + } +} + +func (probe *ProbeTracer) SetTLSHandshakeStartTime( + now time.Time, +) { + probe.stats.TLSStartTime = utilities.Some(now) + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(%s Probe) Started TLS Handshake for Probe %v @ %v\n", + probe.probeType.Value(), + probe.ProbeId(), + probe.stats.TLSStartTime, + ) + } +} + +func (probe *ProbeTracer) SetTLSHandshakeDoneTimeState( + now time.Time, + connectionState tls.ConnectionState, +) { + probe.stats.TLSDoneTime = utilities.Some(now) + probe.stats.TLSConnInfo = connectionState + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(%s Probe) Completed TLS handshake for Probe %v at %v with info %v\n", + probe.probeType.Value(), + probe.ProbeId(), + probe.stats.TLSDoneTime, + probe.stats.TLSConnInfo, + ) + } +} + +func (probe *ProbeTracer) SetHttpWroteRequestTimeInfo( + now time.Time, + info httptrace.WroteRequestInfo, +) { + probe.stats.HttpWroteRequestTime = now + probe.stats.HttpInfo = info + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(%s Probe) Http finished writing request for Probe %v at %v with info %v\n", + probe.probeType.Value(), + probe.ProbeId(), + probe.stats.HttpWroteRequestTime, + probe.stats.HttpInfo, + ) + } +} + +func (probe *ProbeTracer) SetHttpResponseReadyTime( + now time.Time, +) { + probe.stats.HttpResponseReadyTime = now + if debug.IsDebug(probe.debug) { + fmt.Printf( + "(%s Probe) Http response is ready for Probe %v at %v\n", + probe.probeType.Value(), + probe.ProbeId(), + probe.stats.HttpResponseReadyTime, + ) + } +} + +func NewProbeTracer( + client *http.Client, + probeType ProbeType, + probeId uint64, + debugging *debug.DebugWithPrefix, +) *ProbeTracer { + probe := &ProbeTracer{ + client: client, + stats: &stats.TraceStats{}, + trace: nil, + debug: debugging.Level, + probeid: probeId, + probeType: probeType, + } + trace := traceable.GenerateHttpTimingTracer(probe, debugging.Level) + + probe.trace = trace + return probe +} |
