diff options
Diffstat (limited to 'rpm/rpm.go')
| -rw-r--r-- | rpm/rpm.go | 226 |
1 files changed, 109 insertions, 117 deletions
@@ -8,9 +8,11 @@ import ( "net/http" "net/http/httptrace" "os" + "sync" "time" "github.com/network-quality/goresponsiveness/constants" + "github.com/network-quality/goresponsiveness/datalogger" "github.com/network-quality/goresponsiveness/debug" "github.com/network-quality/goresponsiveness/lgc" "github.com/network-quality/goresponsiveness/ma" @@ -40,33 +42,51 @@ func addFlows( } type ProbeConfiguration struct { - URL string - Interval time.Duration + URL string + DataLogger datalogger.DataLogger[DataPoint] + Interval time.Duration } type DataPoint struct { - RoundTripCount uint64 - Duration time.Duration + Time time.Time `Description:"Time of the generation of the data point."` + RoundTripCount uint64 `Description:"The number of round trips measured by this data point."` + Duration time.Duration `Description:"The duration for this measurement."` } -type LGDataCollectionResult struct { +type SelfDataCollectionResult struct { RateBps float64 LGCs []lgc.LoadGeneratingConnection DataPoints []DataPoint } +type ProbeType int64 + +const ( + Self ProbeType = iota + Foreign +) + +func (pt ProbeType) Value() string { + if pt == Self { + return "Self" + } + return "Foreign" +} + func Probe( parentProbeCtx context.Context, + waitGroup *sync.WaitGroup, + logger datalogger.DataLogger[DataPoint], client *http.Client, probeUrl string, - isLGProbe bool, + probeType ProbeType, result *chan DataPoint, debugging *debug.DebugWithPrefix, ) error { - probeTypeLabel := "New-Connection" - if isLGProbe { - probeTypeLabel = "Load-Generating" + if waitGroup != nil { + waitGroup.Add(1) + defer waitGroup.Done() } if client == nil { @@ -74,7 +94,7 @@ func Probe( } probeId := utilities.GenerateUniqueId() - probeTracer := NewProbeTracer(client, isLGProbe, probeId, debugging) + probeTracer := NewProbeTracer(client, probeType, probeId, debugging) time_before_probe := time.Now() probe_req, err := http.NewRequestWithContext( httptrace.WithClientTrace(parentProbeCtx, probeTracer.trace), @@ -104,18 +124,18 @@ func Probe( sanity := time_after_probe.Sub(time_before_probe) - // When the probe is run on a load-generating connection there should + // When the probe is run on a load-generating connection (a self probe) there should // only be a single round trip that is measured. We will take the accumulation of all these // values just to be sure, though. Because of how this traced connection was launched, most // of the values will be 0 (or very small where the time that go takes for delivering callbacks - // and doing context switches pokes through). When it is !isLGProbe then the values will + // and doing context switches pokes through). When it is !isSelfProbe then the values will // be significant and we want to add them regardless! totalDelay := probeTracer.GetTLSAndHttpHeaderDelta() + probeTracer.GetHttpDownloadDelta( time_after_probe, ) + probeTracer.GetTCPDelta() - // We must have reused the connection if we are a load-generating probe! - if isLGProbe && !probeTracer.stats.ConnectionReused { + // We must have reused the connection if we are a self probe! + if probeType == Self && !probeTracer.stats.ConnectionReused { panic(!probeTracer.stats.ConnectionReused) } @@ -123,14 +143,14 @@ func Probe( fmt.Printf( "(%s) (%s Probe %v) sanity vs total: %v vs %v\n", debugging.Prefix, - probeTypeLabel, + probeType.Value(), probeId, sanity, totalDelay, ) } roundTripCount := uint64(1) - if !isLGProbe { + if probeType == Foreign { roundTripCount = 3 } // TODO: Careful!!! It's possible that this channel has been closed because the Prober that @@ -138,40 +158,44 @@ func Probe( // matter because a panic just stops the go thread containing the paniced code and we are in // a go thread that executes only this function. defer func() { - _ = recover() - if debug.IsDebug(debugging.Level) { + isThreadPanicing := recover() + if isThreadPanicing != nil && debug.IsDebug(debugging.Level) { fmt.Printf( "(%s) (%s Probe %v) Probe attempted to write to the result channel after its invoker ended.\n", debugging.Prefix, - probeTypeLabel, + probeType.Value(), probeId, ) } }() - *result <- DataPoint{RoundTripCount: roundTripCount, Duration: totalDelay} - + dataPoint := DataPoint{Time: time.Now(), RoundTripCount: roundTripCount, Duration: totalDelay} + if !utilities.IsInterfaceNil(logger) { + logger.LogRecord(dataPoint) + } + *result <- dataPoint return nil } -func Prober( +func ForeignProber( proberCtx context.Context, - ncProbeConfigurationGenerator func() ProbeConfiguration, + foreignProbeConfigurationGenerator func() ProbeConfiguration, keyLogger io.Writer, debugging *debug.DebugWithPrefix, ) (points chan DataPoint) { points = make(chan DataPoint) - ncProbeConfiguration := ncProbeConfigurationGenerator() + foreignProbeConfiguration := foreignProbeConfigurationGenerator() go func() { + wg := sync.WaitGroup{} probeCount := 0 for proberCtx.Err() == nil { - time.Sleep(ncProbeConfiguration.Interval) + time.Sleep(foreignProbeConfiguration.Interval) if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) About to start new-connection probe number %d!\n", + "(%s) About to start foreign probe number %d!\n", debugging.Prefix, probeCount, ) @@ -182,7 +206,7 @@ func Prober( if !utilities.IsInterfaceNil(keyLogger) { if debug.IsDebug(debugging.Level) { fmt.Printf( - "Using an SSL Key Logger for this new-connection probe.\n", + "Using an SSL Key Logger for this foreign probe.\n", ) } @@ -200,13 +224,19 @@ func Prober( client := &http.Client{Transport: &transport} probeCount++ - go Probe(proberCtx, client, ncProbeConfiguration.URL, false, &points, debugging) + go Probe(proberCtx, &wg, foreignProbeConfiguration.DataLogger, client, foreignProbeConfiguration.URL, Foreign, &points, debugging) } if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) New-connection probing driver is stopping after sending %d probes.\n", + "(%s) Foreign probe driver is going to start waiting for its probes to finish.\n", + debugging.Prefix, + ) + } + utilities.OrTimeout(func() { wg.Wait() }, 2*time.Second) + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) Foreign probe driver is done waiting for its probes to finish.\n", debugging.Prefix, - probeCount, ) } close(points) @@ -214,24 +244,24 @@ func Prober( return } -func LGProber( +func SelfProber( proberCtx context.Context, defaultConnection lgc.LoadGeneratingConnection, altConnections *[]lgc.LoadGeneratingConnection, - lgProbeConfiguration ProbeConfiguration, + selfProbeConfiguration ProbeConfiguration, debugging *debug.DebugWithPrefix, ) (points chan DataPoint) { points = make(chan DataPoint) - debugging = debug.NewDebugWithPrefix(debugging.Level, debugging.Prefix+" load-generating probe") + debugging = debug.NewDebugWithPrefix(debugging.Level, debugging.Prefix+" self probe") go func() { probeCount := 0 for proberCtx.Err() == nil { - time.Sleep(lgProbeConfiguration.Interval) + time.Sleep(selfProbeConfiguration.Interval) if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) About to start load-generating probe number %d!\n", + "(%s) About to start self probe number %d!\n", debugging.Prefix, probeCount, ) @@ -243,16 +273,18 @@ func LGProber( // yet. go Probe( proberCtx, + nil, + selfProbeConfiguration.DataLogger, defaultConnection.Client(), - lgProbeConfiguration.URL, - true, + selfProbeConfiguration.URL, + Self, &points, debugging, ) } if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) Load-generating probing driver is stopping after sending %d probes.\n", + "(%s) self probing driver is stopping after sending %d probes.\n", debugging.Prefix, probeCount, ) @@ -266,10 +298,10 @@ func LGCollectData( lgDataCollectionCtx context.Context, operatingCtx context.Context, lgcGenerator func() lgc.LoadGeneratingConnection, - lgProbeConfigurationGenerator func() ProbeConfiguration, + selfProbeConfigurationGenerator func() ProbeConfiguration, debugging *debug.DebugWithPrefix, -) (resulted chan LGDataCollectionResult) { - resulted = make(chan LGDataCollectionResult) +) (resulted chan SelfDataCollectionResult) { + resulted = make(chan SelfDataCollectionResult) go func() { lgcs := make([]lgc.LoadGeneratingConnection, 0) @@ -282,11 +314,11 @@ func LGCollectData( debugging.Level, ) - lgProbeCtx, lgProbeCtxCancel := context.WithCancel(lgDataCollectionCtx) - probeDataPointsChannel := LGProber(lgProbeCtx, + selfProbeCtx, selfProbeCtxCancel := context.WithCancel(lgDataCollectionCtx) + probeDataPointsChannel := SelfProber(selfProbeCtx, lgcs[0], &lgcs, - lgProbeConfigurationGenerator(), + selfProbeConfigurationGenerator(), debugging, ) @@ -314,7 +346,7 @@ func LGCollectData( // When the program stops operating, then stop. When our invoker tells // us to stop, then stop. if operatingCtx.Err() != nil || lgDataCollectionCtx.Err() != nil { - lgProbeCtxCancel() + selfProbeCtxCancel() return } @@ -462,30 +494,30 @@ func LGCollectData( } } - lgProbeCtxCancel() - probeDataPoints := make([]DataPoint, 0) + selfProbeCtxCancel() + selfProbeDataPoints := make([]DataPoint, 0) for dataPoint := range probeDataPointsChannel { - probeDataPoints = append(probeDataPoints, dataPoint) + selfProbeDataPoints = append(selfProbeDataPoints, dataPoint) } if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) Collected %d load-generating probe data points\n", + "(%s) Collected %d self data points\n", debugging.Prefix, - len(probeDataPoints), + len(selfProbeDataPoints), ) } - resulted <- LGDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, DataPoints: probeDataPoints} + resulted <- SelfDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, DataPoints: selfProbeDataPoints} }() return } type ProbeTracer struct { - client *http.Client - stats *stats.TraceStats - trace *httptrace.ClientTrace - debug debug.DebugLevel - probeid uint64 - isLG bool + client *http.Client + stats *stats.TraceStats + trace *httptrace.ClientTrace + debug debug.DebugLevel + probeid uint64 + probeType ProbeType } func (p *ProbeTracer) String() string { @@ -576,17 +608,17 @@ func (p *ProbeTracer) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration func NewProbeTracer( client *http.Client, - isLG bool, + probeType ProbeType, probeId uint64, debugging *debug.DebugWithPrefix, ) *ProbeTracer { probe := &ProbeTracer{ - client: client, - stats: &stats.TraceStats{}, - trace: nil, - debug: debugging.Level, - probeid: probeId, - isLG: isLG, + client: client, + stats: &stats.TraceStats{}, + trace: nil, + debug: debugging.Level, + probeid: probeId, + probeType: probeType, } trace := traceable.GenerateHttpTimingTracer(probe, debugging.Level) @@ -601,13 +633,9 @@ func (probe *ProbeTracer) SetDnsStartTimeInfo( probe.stats.DnsStartTime = now probe.stats.DnsStart = dnsStartInfo if debug.IsDebug(probe.debug) { - probeTypeLabel := "New-Connection" - if probe.isLG { - probeTypeLabel = "Load-Generating" - } fmt.Printf( "(%s Probe) DNS Start for Probe %v: %v\n", - probeTypeLabel, + probe.probeType.Value(), probe.ProbeId(), dnsStartInfo, ) @@ -621,13 +649,9 @@ func (probe *ProbeTracer) SetDnsDoneTimeInfo( probe.stats.DnsDoneTime = now probe.stats.DnsDone = dnsDoneInfo if debug.IsDebug(probe.debug) { - probeTypeLabel := "New-Connection" - if probe.isLG { - probeTypeLabel = "Load-Generating" - } fmt.Printf( "(%s Probe) DNS Done for Probe %v: %v\n", - probeTypeLabel, + probe.probeType.Value(), probe.ProbeId(), probe.stats.DnsDone, ) @@ -639,13 +663,9 @@ func (probe *ProbeTracer) SetConnectStartTime( ) { probe.stats.ConnectStartTime = now if debug.IsDebug(probe.debug) { - probeTypeLabel := "New-Connection" - if probe.isLG { - probeTypeLabel = "Load-Generating" - } fmt.Printf( "(%s Probe) TCP Start for Probe %v at %v\n", - probeTypeLabel, + probe.probeType.Value(), probe.ProbeId(), probe.stats.ConnectStartTime, ) @@ -659,13 +679,9 @@ func (probe *ProbeTracer) SetConnectDoneTimeError( probe.stats.ConnectDoneTime = now probe.stats.ConnectDoneError = err if debug.IsDebug(probe.debug) { - probeTypeLabel := "New-Connection" - if probe.isLG { - probeTypeLabel = "Load-Generating" - } fmt.Printf( "(%s Probe) TCP Done for Probe %v (with error %v) @ %v\n", - probeTypeLabel, + probe.probeType.Value(), probe.ProbeId(), probe.stats.ConnectDoneError, probe.stats.ConnectDoneTime, @@ -676,13 +692,9 @@ func (probe *ProbeTracer) SetConnectDoneTimeError( func (probe *ProbeTracer) SetGetConnTime(now time.Time) { probe.stats.GetConnectionStartTime = now if debug.IsDebug(probe.debug) { - probeTypeLabel := "New-Connection" - if probe.isLG { - probeTypeLabel = "Load-Generating" - } fmt.Printf( "(%s Probe) Started getting connection for Probe %v @ %v\n", - probeTypeLabel, + probe.probeType.Value(), probe.ProbeId(), probe.stats.GetConnectionStartTime, ) @@ -696,22 +708,18 @@ func (probe *ProbeTracer) SetGotConnTimeInfo( probe.stats.GetConnectionDoneTime = now probe.stats.ConnInfo = gotConnInfo probe.stats.ConnectionReused = gotConnInfo.Reused - if probe.isLG && !gotConnInfo.Reused { + if probe.probeType == Self && !gotConnInfo.Reused { fmt.Fprintf( os.Stderr, - "A probe sent on an load-generating connection used a new connection!\n", + "A self probe sent used a new connection!\n", ) } else if debug.IsDebug(probe.debug) { - fmt.Printf("Properly reused a connection when probing on a load-generating connection!\n") + fmt.Printf("Properly reused a connection when doing a self probe!\n") } if debug.IsDebug(probe.debug) { - probeTypeLabel := "New-Connection" - if probe.isLG { - probeTypeLabel = "Load-Generating" - } fmt.Printf( "(%s Probe) Got a reused connection for Probe %v at %v with info %v\n", - probeTypeLabel, + probe.probeType.Value(), probe.ProbeId(), probe.stats.GetConnectionDoneTime, probe.stats.ConnInfo, @@ -724,13 +732,9 @@ func (probe *ProbeTracer) SetTLSHandshakeStartTime( ) { probe.stats.TLSStartTime = utilities.Some(now) if debug.IsDebug(probe.debug) { - probeTypeLabel := "New-Connection" - if probe.isLG { - probeTypeLabel = "Load-Generating" - } fmt.Printf( "(%s Probe) Started TLS Handshake for Probe %v @ %v\n", - probeTypeLabel, + probe.probeType.Value(), probe.ProbeId(), probe.stats.TLSStartTime, ) @@ -744,13 +748,9 @@ func (probe *ProbeTracer) SetTLSHandshakeDoneTimeState( probe.stats.TLSDoneTime = utilities.Some(now) probe.stats.TLSConnInfo = connectionState if debug.IsDebug(probe.debug) { - probeTypeLabel := "New-Connection" - if probe.isLG { - probeTypeLabel = "Load-Generating" - } fmt.Printf( "(%s Probe) Completed TLS handshake for Probe %v at %v with info %v\n", - probeTypeLabel, + probe.probeType.Value(), probe.ProbeId(), probe.stats.TLSDoneTime, probe.stats.TLSConnInfo, @@ -765,13 +765,9 @@ func (probe *ProbeTracer) SetHttpWroteRequestTimeInfo( probe.stats.HttpWroteRequestTime = now probe.stats.HttpInfo = info if debug.IsDebug(probe.debug) { - probeTypeLabel := "New-Connection" - if probe.isLG { - probeTypeLabel = "Load-Generating" - } fmt.Printf( "(%s Probe) Http finished writing request for Probe %v at %v with info %v\n", - probeTypeLabel, + probe.probeType.Value(), probe.ProbeId(), probe.stats.HttpWroteRequestTime, probe.stats.HttpInfo, @@ -784,13 +780,9 @@ func (probe *ProbeTracer) SetHttpResponseReadyTime( ) { probe.stats.HttpResponseReadyTime = now if debug.IsDebug(probe.debug) { - probeTypeLabel := "New-Connection" - if probe.isLG { - probeTypeLabel = "Load-Generating" - } fmt.Printf( "(%s Probe) Http response is ready for Probe %v at %v\n", - probeTypeLabel, + probe.probeType.Value(), probe.ProbeId(), probe.stats.HttpResponseReadyTime, ) |
