summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--datalogger/logger.go83
-rw-r--r--networkQuality.go130
-rw-r--r--rpm/rpm.go226
-rw-r--r--utilities/utilities.go55
-rw-r--r--utilities/utilities_test.go23
5 files changed, 342 insertions, 175 deletions
diff --git a/datalogger/logger.go b/datalogger/logger.go
new file mode 100644
index 0000000..26717fb
--- /dev/null
+++ b/datalogger/logger.go
@@ -0,0 +1,83 @@
+package datalogger
+
+import (
+ "fmt"
+ "io"
+ "os"
+ "reflect"
+ "sync"
+)
+
+type DataLogger[T any] interface {
+ LogRecord(record T)
+ Export() bool
+ Close() bool
+}
+
+type CSVDataLogger[T any] struct {
+ mut *sync.Mutex
+ recordCount int
+ data []T
+ isOpen bool
+ destination io.WriteCloser
+}
+
+func CreateCSVDataLogger[T any](filename string) (DataLogger[T], error) {
+ fmt.Printf("Creating a CSV data logger: %v!\n", filename)
+ data := make([]T, 0)
+ destination, err := os.Create(filename)
+ if err != nil {
+ return &CSVDataLogger[T]{&sync.Mutex{}, 0, data, true, destination}, err
+ }
+
+ result := CSVDataLogger[T]{&sync.Mutex{}, 0, data, true, destination}
+ return &result, nil
+}
+
+func (logger *CSVDataLogger[T]) LogRecord(record T) {
+ logger.mut.Lock()
+ defer logger.mut.Unlock()
+ logger.recordCount += 1
+ logger.data = append(logger.data, record)
+}
+
+func (logger *CSVDataLogger[T]) Export() bool {
+ logger.mut.Lock()
+ defer logger.mut.Unlock()
+ if !logger.isOpen {
+ return false
+ }
+
+ t := new(T)
+ visibleFields := reflect.VisibleFields(reflect.TypeOf(t).Elem())
+ for _, v := range visibleFields {
+ description, success := v.Tag.Lookup("Description")
+ columnName := fmt.Sprintf("%s", v.Name)
+ if success {
+ columnName = fmt.Sprintf("%s", description)
+ }
+ logger.destination.Write([]byte(fmt.Sprintf("%s, ", columnName)))
+ }
+ logger.destination.Write([]byte("\n"))
+
+ for _, d := range logger.data {
+ for _, v := range visibleFields {
+ data := reflect.ValueOf(d)
+ toWrite := data.FieldByIndex(v.Index)
+ logger.destination.Write([]byte(fmt.Sprintf("%v, ", toWrite)))
+ }
+ logger.destination.Write([]byte("\n"))
+ }
+ return true
+}
+
+func (logger *CSVDataLogger[T]) Close() bool {
+ logger.mut.Lock()
+ defer logger.mut.Unlock()
+ if !logger.isOpen {
+ return false
+ }
+ logger.destination.Close()
+ logger.isOpen = false
+ return true
+}
diff --git a/networkQuality.go b/networkQuality.go
index ca51e51..e875e97 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -25,6 +25,7 @@ import (
"github.com/network-quality/goresponsiveness/ccw"
"github.com/network-quality/goresponsiveness/config"
"github.com/network-quality/goresponsiveness/constants"
+ "github.com/network-quality/goresponsiveness/datalogger"
"github.com/network-quality/goresponsiveness/debug"
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
@@ -80,12 +81,12 @@ var (
"",
"Enable client runtime profiling and specify storage location. Disabled by default.",
)
-
calculateExtendedStats = flag.Bool(
"extended-stats",
false,
"Enable the collection and display of extended statistics -- may not be available on certain platforms.",
)
+ dataLoggerBaseFileName = flag.String("logger-filename", "", "Store information about the results of each probe in files with this basename. Time and probe type will be appended (before the first .) to create two separate log files. Disabled by default.")
)
func main() {
@@ -98,7 +99,7 @@ func main() {
lgDataCollectionCtx, cancelLGDataCollectionCtx := context.WithCancel(
context.Background(),
)
- newConnectionProberCtx, newConnectionProberCtxCancel := context.WithCancel(
+ foreignProbertCtx, foreignProberCtxCancel := context.WithCancel(
context.Background(),
)
config := &config.Config{}
@@ -183,6 +184,26 @@ func main() {
}
}
+ var selfDataLogger datalogger.DataLogger[rpm.DataPoint] = nil
+ var foreignDataLogger datalogger.DataLogger[rpm.DataPoint] = nil
+ // User wants to log data from each probe!
+ if *dataLoggerBaseFileName != "" {
+ var err error = nil
+ unique := time.Now().UTC().Format("01-02-2006-15-04-05")
+ dataLoggerSelfFilename := utilities.FilenameAppend(*dataLoggerBaseFileName, "-self-"+unique)
+ dataLoggerForeignFilename := utilities.FilenameAppend(*dataLoggerBaseFileName, "-foreign-"+unique)
+ selfDataLogger, err = datalogger.CreateCSVDataLogger[rpm.DataPoint](dataLoggerSelfFilename)
+ if err != nil {
+ fmt.Printf("Warning: Could not create the file for storing self probe results (%s). Disabling functionality.\n", dataLoggerSelfFilename)
+ selfDataLogger = nil
+ }
+ foreignDataLogger, err = datalogger.CreateCSVDataLogger[rpm.DataPoint](dataLoggerForeignFilename)
+ if err != nil {
+ fmt.Printf("Warning: Could not create the file for storing foreign probe results (%s). Disabling functionality.\n", dataLoggerForeignFilename)
+ foreignDataLogger = nil
+ }
+ }
+
/*
* Create (and then, ironically, name) two anonymous functions that, when invoked,
* will create load-generating connections for upload/download/
@@ -200,17 +221,17 @@ func main() {
}
}
- generate_lg_probe_configuration := func() rpm.ProbeConfiguration {
- return rpm.ProbeConfiguration{URL: config.Urls.SmallUrl, Interval: 100 * time.Millisecond}
+ generateSelfProbeConfiguration := func() rpm.ProbeConfiguration {
+ return rpm.ProbeConfiguration{URL: config.Urls.SmallUrl, DataLogger: selfDataLogger, Interval: 100 * time.Millisecond}
}
- generate_nc_probe_configuration := func() rpm.ProbeConfiguration {
- return rpm.ProbeConfiguration{URL: config.Urls.SmallUrl, Interval: 100 * time.Millisecond}
+ generateForeignProbeConfiguration := func() rpm.ProbeConfiguration {
+ return rpm.ProbeConfiguration{URL: config.Urls.SmallUrl, DataLogger: foreignDataLogger, Interval: 100 * time.Millisecond}
}
var downloadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "download")
var uploadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "upload")
- var newConnectionDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "new connection probe")
+ var foreignDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "foreign probe")
// TODO: Separate contexts for load generation and data collection. If we do that, if either of the two
// data collection go routines stops well before the other, they will continue to send probes and we can
@@ -220,35 +241,35 @@ func main() {
lgDataCollectionCtx,
operatingCtx,
generate_lgd,
- generate_lg_probe_configuration,
+ generateSelfProbeConfiguration,
downloadDebugging,
)
uploadDataCollectionChannel := rpm.LGCollectData(
lgDataCollectionCtx,
operatingCtx,
generate_lgu,
- generate_lg_probe_configuration,
+ generateSelfProbeConfiguration,
uploadDebugging,
)
- newConnectionProbeDataPoints := rpm.Prober(
- newConnectionProberCtx,
- generate_nc_probe_configuration,
+ foreignProbeDataPointsChannel := rpm.ForeignProber(
+ foreignProbertCtx,
+ generateForeignProbeConfiguration,
sslKeyFileConcurrentWriter,
- newConnectionDebugging,
+ foreignDebugging,
)
dataCollectionTimeout := false
uploadDataCollectionComplete := false
- downloadDataCollectionComple := false
- downloadDataCollectionResult := rpm.LGDataCollectionResult{}
- uploadDataCollectionResult := rpm.LGDataCollectionResult{}
+ downloadDataCollectionComplete := false
+ downloadDataCollectionResult := rpm.SelfDataCollectionResult{}
+ uploadDataCollectionResult := rpm.SelfDataCollectionResult{}
- for !(uploadDataCollectionComplete && downloadDataCollectionComple) {
+ for !(uploadDataCollectionComplete && downloadDataCollectionComplete) {
select {
case downloadDataCollectionResult = <-downloadDataCollectionChannel:
{
- downloadDataCollectionComple = true
+ downloadDataCollectionComplete = true
if *debugCliFlag {
fmt.Printf(
"################# download load-generating data collection is %s complete (%fMBps, %d flows)!\n",
@@ -318,7 +339,7 @@ func main() {
}
// Shutdown the new-connection prober!
- newConnectionProberCtxCancel()
+ foreignProberCtxCancel()
// In the new version we are no longer going to wait to send probes until after
// saturation. When we get here we are now only going to compute the results
@@ -355,57 +376,50 @@ func main() {
len(uploadDataCollectionResult.LGCs),
)
- totalNewConnectionRoundTripTime := float64(0)
- totalNewConnectionRoundTrips := uint64(0)
- for ncDp := range newConnectionProbeDataPoints {
- totalNewConnectionRoundTripTime += ncDp.Duration.Seconds()
- totalNewConnectionRoundTrips += uint64(ncDp.RoundTripCount)
- }
- averageNewConnectionRoundTripTime := totalNewConnectionRoundTripTime / float64(
- totalNewConnectionRoundTrips,
- )
- newConnectionRpm := (1.0 / averageNewConnectionRoundTripTime) * 60.0
- if *debugCliFlag {
- fmt.Printf(
- "Total New-Connection Round Trips: %d, Total New-Connection Round Trip Time: %f, Average New-Connection Round Trip Time (in seconds): %f\n",
- totalNewConnectionRoundTrips,
- totalNewConnectionRoundTripTime,
- averageNewConnectionRoundTripTime,
- )
- fmt.Printf("(New-Connection) RPM: %f\n", newConnectionRpm)
- }
+ foreignProbeDataPoints := utilities.ChannelToSlice(foreignProbeDataPointsChannel)
+ totalForeignRoundTrips := len(foreignProbeDataPoints)
+ foreignProbeRoundTripTimes := utilities.Fmap(foreignProbeDataPoints, func(dp rpm.DataPoint) float64 { return dp.Duration.Seconds() })
+ foreignProbeRoundTripTimeP90 := utilities.CalculatePercentile(foreignProbeRoundTripTimes, 90)
+
+ downloadRoundTripTimes := utilities.Fmap(downloadDataCollectionResult.DataPoints, func(dcr rpm.DataPoint) float64 { return dcr.Duration.Seconds() })
+ uploadRoundTripTimes := utilities.Fmap(uploadDataCollectionResult.DataPoints, func(dcr rpm.DataPoint) float64 { return dcr.Duration.Seconds() })
+ selfProbeRoundTripTimes := append(downloadRoundTripTimes, uploadRoundTripTimes...)
+ totalSelfRoundTrips := len(selfProbeRoundTripTimes)
+ selfProbeRoundTripTimeP90 := utilities.CalculatePercentile(selfProbeRoundTripTimes, 90)
+
+ rpm := 60.0 / (float64(selfProbeRoundTripTimeP90+foreignProbeRoundTripTimeP90) / 2.0)
- totalLoadGeneratingRoundTripTime := float64(0)
- totalLoadGeneratingRoundTrips := uint64(0)
- for _, dp := range downloadDataCollectionResult.DataPoints {
- totalLoadGeneratingRoundTripTime += dp.Duration.Seconds()
- totalLoadGeneratingRoundTrips += uint64(dp.RoundTripCount)
- }
- for _, dp := range uploadDataCollectionResult.DataPoints {
- totalLoadGeneratingRoundTripTime += dp.Duration.Seconds()
- totalLoadGeneratingRoundTrips += uint64(dp.RoundTripCount)
- }
- averageLoadGeneratingRoundTripTime := totalLoadGeneratingRoundTripTime / float64(
- totalLoadGeneratingRoundTrips,
- )
- loadGeneratingRPM := (1.0 / averageLoadGeneratingRoundTripTime) * 60.0
if *debugCliFlag {
fmt.Printf(
- "Total Load-Generating Round Trips: %d, Total New-Connection Round Trip Time: %f, Average New-Connection Round Trip Time (in seconds): %f\n",
- totalLoadGeneratingRoundTrips,
- totalLoadGeneratingRoundTripTime,
- averageLoadGeneratingRoundTripTime,
+ "Total Load-Generating Round Trips: %d, Total New-Connection Round Trips: %d, P90 LG RTT: %f, P90 NC RTT: %f\n",
+ totalSelfRoundTrips,
+ totalForeignRoundTrips,
+ selfProbeRoundTripTimeP90,
+ foreignProbeRoundTripTimeP90,
)
- fmt.Printf("(Load-Generating) RPM: %f\n", loadGeneratingRPM)
}
- rpm := (newConnectionRpm + loadGeneratingRPM) / 2.0
fmt.Printf("RPM: %5.0f\n", rpm)
if *calculateExtendedStats {
fmt.Println(extendedStats.Repr())
}
+ if !utilities.IsInterfaceNil(selfDataLogger) {
+ selfDataLogger.Export()
+ if *debugCliFlag {
+ fmt.Printf("Closing the self data logger.\n")
+ }
+ selfDataLogger.Close()
+ }
+
+ if !utilities.IsInterfaceNil(foreignDataLogger) {
+ foreignDataLogger.Export()
+ if *debugCliFlag {
+ fmt.Printf("Closing the foreign data logger.\n")
+ }
+ foreignDataLogger.Close()
+ }
cancelOperatingCtx()
if *debugCliFlag {
fmt.Printf("In debugging mode, we will cool down.\n")
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 6aa68e8..29c735f 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -8,9 +8,11 @@ import (
"net/http"
"net/http/httptrace"
"os"
+ "sync"
"time"
"github.com/network-quality/goresponsiveness/constants"
+ "github.com/network-quality/goresponsiveness/datalogger"
"github.com/network-quality/goresponsiveness/debug"
"github.com/network-quality/goresponsiveness/lgc"
"github.com/network-quality/goresponsiveness/ma"
@@ -40,33 +42,51 @@ func addFlows(
}
type ProbeConfiguration struct {
- URL string
- Interval time.Duration
+ URL string
+ DataLogger datalogger.DataLogger[DataPoint]
+ Interval time.Duration
}
type DataPoint struct {
- RoundTripCount uint64
- Duration time.Duration
+ Time time.Time `Description:"Time of the generation of the data point."`
+ RoundTripCount uint64 `Description:"The number of round trips measured by this data point."`
+ Duration time.Duration `Description:"The duration for this measurement."`
}
-type LGDataCollectionResult struct {
+type SelfDataCollectionResult struct {
RateBps float64
LGCs []lgc.LoadGeneratingConnection
DataPoints []DataPoint
}
+type ProbeType int64
+
+const (
+ Self ProbeType = iota
+ Foreign
+)
+
+func (pt ProbeType) Value() string {
+ if pt == Self {
+ return "Self"
+ }
+ return "Foreign"
+}
+
func Probe(
parentProbeCtx context.Context,
+ waitGroup *sync.WaitGroup,
+ logger datalogger.DataLogger[DataPoint],
client *http.Client,
probeUrl string,
- isLGProbe bool,
+ probeType ProbeType,
result *chan DataPoint,
debugging *debug.DebugWithPrefix,
) error {
- probeTypeLabel := "New-Connection"
- if isLGProbe {
- probeTypeLabel = "Load-Generating"
+ if waitGroup != nil {
+ waitGroup.Add(1)
+ defer waitGroup.Done()
}
if client == nil {
@@ -74,7 +94,7 @@ func Probe(
}
probeId := utilities.GenerateUniqueId()
- probeTracer := NewProbeTracer(client, isLGProbe, probeId, debugging)
+ probeTracer := NewProbeTracer(client, probeType, probeId, debugging)
time_before_probe := time.Now()
probe_req, err := http.NewRequestWithContext(
httptrace.WithClientTrace(parentProbeCtx, probeTracer.trace),
@@ -104,18 +124,18 @@ func Probe(
sanity := time_after_probe.Sub(time_before_probe)
- // When the probe is run on a load-generating connection there should
+ // When the probe is run on a load-generating connection (a self probe) there should
// only be a single round trip that is measured. We will take the accumulation of all these
// values just to be sure, though. Because of how this traced connection was launched, most
// of the values will be 0 (or very small where the time that go takes for delivering callbacks
- // and doing context switches pokes through). When it is !isLGProbe then the values will
+ // and doing context switches pokes through). When it is !isSelfProbe then the values will
// be significant and we want to add them regardless!
totalDelay := probeTracer.GetTLSAndHttpHeaderDelta() + probeTracer.GetHttpDownloadDelta(
time_after_probe,
) + probeTracer.GetTCPDelta()
- // We must have reused the connection if we are a load-generating probe!
- if isLGProbe && !probeTracer.stats.ConnectionReused {
+ // We must have reused the connection if we are a self probe!
+ if probeType == Self && !probeTracer.stats.ConnectionReused {
panic(!probeTracer.stats.ConnectionReused)
}
@@ -123,14 +143,14 @@ func Probe(
fmt.Printf(
"(%s) (%s Probe %v) sanity vs total: %v vs %v\n",
debugging.Prefix,
- probeTypeLabel,
+ probeType.Value(),
probeId,
sanity,
totalDelay,
)
}
roundTripCount := uint64(1)
- if !isLGProbe {
+ if probeType == Foreign {
roundTripCount = 3
}
// TODO: Careful!!! It's possible that this channel has been closed because the Prober that
@@ -138,40 +158,44 @@ func Probe(
// matter because a panic just stops the go thread containing the paniced code and we are in
// a go thread that executes only this function.
defer func() {
- _ = recover()
- if debug.IsDebug(debugging.Level) {
+ isThreadPanicing := recover()
+ if isThreadPanicing != nil && debug.IsDebug(debugging.Level) {
fmt.Printf(
"(%s) (%s Probe %v) Probe attempted to write to the result channel after its invoker ended.\n",
debugging.Prefix,
- probeTypeLabel,
+ probeType.Value(),
probeId,
)
}
}()
- *result <- DataPoint{RoundTripCount: roundTripCount, Duration: totalDelay}
-
+ dataPoint := DataPoint{Time: time.Now(), RoundTripCount: roundTripCount, Duration: totalDelay}
+ if !utilities.IsInterfaceNil(logger) {
+ logger.LogRecord(dataPoint)
+ }
+ *result <- dataPoint
return nil
}
-func Prober(
+func ForeignProber(
proberCtx context.Context,
- ncProbeConfigurationGenerator func() ProbeConfiguration,
+ foreignProbeConfigurationGenerator func() ProbeConfiguration,
keyLogger io.Writer,
debugging *debug.DebugWithPrefix,
) (points chan DataPoint) {
points = make(chan DataPoint)
- ncProbeConfiguration := ncProbeConfigurationGenerator()
+ foreignProbeConfiguration := foreignProbeConfigurationGenerator()
go func() {
+ wg := sync.WaitGroup{}
probeCount := 0
for proberCtx.Err() == nil {
- time.Sleep(ncProbeConfiguration.Interval)
+ time.Sleep(foreignProbeConfiguration.Interval)
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) About to start new-connection probe number %d!\n",
+ "(%s) About to start foreign probe number %d!\n",
debugging.Prefix,
probeCount,
)
@@ -182,7 +206,7 @@ func Prober(
if !utilities.IsInterfaceNil(keyLogger) {
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "Using an SSL Key Logger for this new-connection probe.\n",
+ "Using an SSL Key Logger for this foreign probe.\n",
)
}
@@ -200,13 +224,19 @@ func Prober(
client := &http.Client{Transport: &transport}
probeCount++
- go Probe(proberCtx, client, ncProbeConfiguration.URL, false, &points, debugging)
+ go Probe(proberCtx, &wg, foreignProbeConfiguration.DataLogger, client, foreignProbeConfiguration.URL, Foreign, &points, debugging)
}
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) New-connection probing driver is stopping after sending %d probes.\n",
+ "(%s) Foreign probe driver is going to start waiting for its probes to finish.\n",
+ debugging.Prefix,
+ )
+ }
+ utilities.OrTimeout(func() { wg.Wait() }, 2*time.Second)
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) Foreign probe driver is done waiting for its probes to finish.\n",
debugging.Prefix,
- probeCount,
)
}
close(points)
@@ -214,24 +244,24 @@ func Prober(
return
}
-func LGProber(
+func SelfProber(
proberCtx context.Context,
defaultConnection lgc.LoadGeneratingConnection,
altConnections *[]lgc.LoadGeneratingConnection,
- lgProbeConfiguration ProbeConfiguration,
+ selfProbeConfiguration ProbeConfiguration,
debugging *debug.DebugWithPrefix,
) (points chan DataPoint) {
points = make(chan DataPoint)
- debugging = debug.NewDebugWithPrefix(debugging.Level, debugging.Prefix+" load-generating probe")
+ debugging = debug.NewDebugWithPrefix(debugging.Level, debugging.Prefix+" self probe")
go func() {
probeCount := 0
for proberCtx.Err() == nil {
- time.Sleep(lgProbeConfiguration.Interval)
+ time.Sleep(selfProbeConfiguration.Interval)
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) About to start load-generating probe number %d!\n",
+ "(%s) About to start self probe number %d!\n",
debugging.Prefix,
probeCount,
)
@@ -243,16 +273,18 @@ func LGProber(
// yet.
go Probe(
proberCtx,
+ nil,
+ selfProbeConfiguration.DataLogger,
defaultConnection.Client(),
- lgProbeConfiguration.URL,
- true,
+ selfProbeConfiguration.URL,
+ Self,
&points,
debugging,
)
}
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) Load-generating probing driver is stopping after sending %d probes.\n",
+ "(%s) self probing driver is stopping after sending %d probes.\n",
debugging.Prefix,
probeCount,
)
@@ -266,10 +298,10 @@ func LGCollectData(
lgDataCollectionCtx context.Context,
operatingCtx context.Context,
lgcGenerator func() lgc.LoadGeneratingConnection,
- lgProbeConfigurationGenerator func() ProbeConfiguration,
+ selfProbeConfigurationGenerator func() ProbeConfiguration,
debugging *debug.DebugWithPrefix,
-) (resulted chan LGDataCollectionResult) {
- resulted = make(chan LGDataCollectionResult)
+) (resulted chan SelfDataCollectionResult) {
+ resulted = make(chan SelfDataCollectionResult)
go func() {
lgcs := make([]lgc.LoadGeneratingConnection, 0)
@@ -282,11 +314,11 @@ func LGCollectData(
debugging.Level,
)
- lgProbeCtx, lgProbeCtxCancel := context.WithCancel(lgDataCollectionCtx)
- probeDataPointsChannel := LGProber(lgProbeCtx,
+ selfProbeCtx, selfProbeCtxCancel := context.WithCancel(lgDataCollectionCtx)
+ probeDataPointsChannel := SelfProber(selfProbeCtx,
lgcs[0],
&lgcs,
- lgProbeConfigurationGenerator(),
+ selfProbeConfigurationGenerator(),
debugging,
)
@@ -314,7 +346,7 @@ func LGCollectData(
// When the program stops operating, then stop. When our invoker tells
// us to stop, then stop.
if operatingCtx.Err() != nil || lgDataCollectionCtx.Err() != nil {
- lgProbeCtxCancel()
+ selfProbeCtxCancel()
return
}
@@ -462,30 +494,30 @@ func LGCollectData(
}
}
- lgProbeCtxCancel()
- probeDataPoints := make([]DataPoint, 0)
+ selfProbeCtxCancel()
+ selfProbeDataPoints := make([]DataPoint, 0)
for dataPoint := range probeDataPointsChannel {
- probeDataPoints = append(probeDataPoints, dataPoint)
+ selfProbeDataPoints = append(selfProbeDataPoints, dataPoint)
}
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) Collected %d load-generating probe data points\n",
+ "(%s) Collected %d self data points\n",
debugging.Prefix,
- len(probeDataPoints),
+ len(selfProbeDataPoints),
)
}
- resulted <- LGDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, DataPoints: probeDataPoints}
+ resulted <- SelfDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, DataPoints: selfProbeDataPoints}
}()
return
}
type ProbeTracer struct {
- client *http.Client
- stats *stats.TraceStats
- trace *httptrace.ClientTrace
- debug debug.DebugLevel
- probeid uint64
- isLG bool
+ client *http.Client
+ stats *stats.TraceStats
+ trace *httptrace.ClientTrace
+ debug debug.DebugLevel
+ probeid uint64
+ probeType ProbeType
}
func (p *ProbeTracer) String() string {
@@ -576,17 +608,17 @@ func (p *ProbeTracer) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration
func NewProbeTracer(
client *http.Client,
- isLG bool,
+ probeType ProbeType,
probeId uint64,
debugging *debug.DebugWithPrefix,
) *ProbeTracer {
probe := &ProbeTracer{
- client: client,
- stats: &stats.TraceStats{},
- trace: nil,
- debug: debugging.Level,
- probeid: probeId,
- isLG: isLG,
+ client: client,
+ stats: &stats.TraceStats{},
+ trace: nil,
+ debug: debugging.Level,
+ probeid: probeId,
+ probeType: probeType,
}
trace := traceable.GenerateHttpTimingTracer(probe, debugging.Level)
@@ -601,13 +633,9 @@ func (probe *ProbeTracer) SetDnsStartTimeInfo(
probe.stats.DnsStartTime = now
probe.stats.DnsStart = dnsStartInfo
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) DNS Start for Probe %v: %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
dnsStartInfo,
)
@@ -621,13 +649,9 @@ func (probe *ProbeTracer) SetDnsDoneTimeInfo(
probe.stats.DnsDoneTime = now
probe.stats.DnsDone = dnsDoneInfo
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) DNS Done for Probe %v: %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
probe.stats.DnsDone,
)
@@ -639,13 +663,9 @@ func (probe *ProbeTracer) SetConnectStartTime(
) {
probe.stats.ConnectStartTime = now
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) TCP Start for Probe %v at %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
probe.stats.ConnectStartTime,
)
@@ -659,13 +679,9 @@ func (probe *ProbeTracer) SetConnectDoneTimeError(
probe.stats.ConnectDoneTime = now
probe.stats.ConnectDoneError = err
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) TCP Done for Probe %v (with error %v) @ %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
probe.stats.ConnectDoneError,
probe.stats.ConnectDoneTime,
@@ -676,13 +692,9 @@ func (probe *ProbeTracer) SetConnectDoneTimeError(
func (probe *ProbeTracer) SetGetConnTime(now time.Time) {
probe.stats.GetConnectionStartTime = now
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) Started getting connection for Probe %v @ %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
probe.stats.GetConnectionStartTime,
)
@@ -696,22 +708,18 @@ func (probe *ProbeTracer) SetGotConnTimeInfo(
probe.stats.GetConnectionDoneTime = now
probe.stats.ConnInfo = gotConnInfo
probe.stats.ConnectionReused = gotConnInfo.Reused
- if probe.isLG && !gotConnInfo.Reused {
+ if probe.probeType == Self && !gotConnInfo.Reused {
fmt.Fprintf(
os.Stderr,
- "A probe sent on an load-generating connection used a new connection!\n",
+ "A self probe sent used a new connection!\n",
)
} else if debug.IsDebug(probe.debug) {
- fmt.Printf("Properly reused a connection when probing on a load-generating connection!\n")
+ fmt.Printf("Properly reused a connection when doing a self probe!\n")
}
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) Got a reused connection for Probe %v at %v with info %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
probe.stats.GetConnectionDoneTime,
probe.stats.ConnInfo,
@@ -724,13 +732,9 @@ func (probe *ProbeTracer) SetTLSHandshakeStartTime(
) {
probe.stats.TLSStartTime = utilities.Some(now)
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) Started TLS Handshake for Probe %v @ %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
probe.stats.TLSStartTime,
)
@@ -744,13 +748,9 @@ func (probe *ProbeTracer) SetTLSHandshakeDoneTimeState(
probe.stats.TLSDoneTime = utilities.Some(now)
probe.stats.TLSConnInfo = connectionState
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) Completed TLS handshake for Probe %v at %v with info %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
probe.stats.TLSDoneTime,
probe.stats.TLSConnInfo,
@@ -765,13 +765,9 @@ func (probe *ProbeTracer) SetHttpWroteRequestTimeInfo(
probe.stats.HttpWroteRequestTime = now
probe.stats.HttpInfo = info
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) Http finished writing request for Probe %v at %v with info %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
probe.stats.HttpWroteRequestTime,
probe.stats.HttpInfo,
@@ -784,13 +780,9 @@ func (probe *ProbeTracer) SetHttpResponseReadyTime(
) {
probe.stats.HttpResponseReadyTime = now
if debug.IsDebug(probe.debug) {
- probeTypeLabel := "New-Connection"
- if probe.isLG {
- probeTypeLabel = "Load-Generating"
- }
fmt.Printf(
"(%s Probe) Http response is ready for Probe %v at %v\n",
- probeTypeLabel,
+ probe.probeType.Value(),
probe.ProbeId(),
probe.stats.HttpResponseReadyTime,
)
diff --git a/utilities/utilities.go b/utilities/utilities.go
index 7b96bef..d4303ec 100644
--- a/utilities/utilities.go
+++ b/utilities/utilities.go
@@ -20,6 +20,8 @@ import (
"math/rand"
"os"
"reflect"
+ "sort"
+ "strings"
"sync/atomic"
"time"
)
@@ -129,3 +131,56 @@ func Max(x, y uint64) uint64 {
}
return y
}
+
+func ChannelToSlice[S any](channel <-chan S) (slice []S) {
+ slice = make([]S, 0)
+ for element := range channel {
+ slice = append(slice, element)
+ }
+ return
+}
+
+func Fmap[S any, F any](elements []S, mapper func(S) F) []F {
+ result := make([]F, 0)
+ for _, s := range elements {
+ result = append(result, mapper(s))
+ }
+ return result
+}
+
+func CalculatePercentile[S float32 | int32 | float64 | int64](elements []S, percentile int) S {
+ sort.Slice(elements, func(a, b int) bool { return elements[a] < elements[b] })
+ elementsCount := len(elements)
+ percentileIdx := elementsCount * (percentile / 100)
+ return elements[percentileIdx]
+}
+
+func OrTimeout(f func(), timeout time.Duration) {
+ completeChannel := func() chan interface{} {
+ completed := make(chan interface{})
+ go func() {
+ // This idea taken from https://stackoverflow.com/a/32843750.
+ // Closing the channel will let the read of it proceed immediately.
+ // Making that operation a defer ensures that it will happen even if
+ // the function f panics during its execution.
+ defer close(completed)
+ f()
+ }()
+ return completed
+ }()
+ select {
+ case _ = <-completeChannel:
+ break
+ case _ = <-time.After(timeout):
+ break
+ }
+}
+
+func FilenameAppend(filename, appendage string) string {
+ pieces := strings.SplitN(filename, ".", 2)
+ result := pieces[0] + appendage
+ if len(pieces) > 1 {
+ result = result + "." + strings.Join(pieces[1:], ".")
+ }
+ return result
+}
diff --git a/utilities/utilities_test.go b/utilities/utilities_test.go
index 72a11fb..703243c 100644
--- a/utilities/utilities_test.go
+++ b/utilities/utilities_test.go
@@ -37,3 +37,26 @@ func TestReadAfterCloseOnBufferedChannel(t *testing.T) {
t.Fatalf("Did not read all sent items from a buffered channel after channel.")
}
}
+
+func TestOrTimeoutStopsInfiniteLoop(t *testing.T) {
+ const TimeoutTime = 2 * time.Second
+ infinity := func() {
+ for {
+ }
+ }
+ timeBefore := time.Now()
+ OrTimeout(infinity, TimeoutTime)
+ timeAfter := time.Now()
+ if timeAfter.Sub(timeBefore) < TimeoutTime {
+ t.Fatalf("OrTimeout failed to keep the infinite loop running for at least %v.", TimeoutTime)
+ }
+}
+
+func TestFilenameAppend(t *testing.T) {
+ const basename = "testing.csv"
+ const expected = "testing-appended.csv"
+ result := FilenameAppend(basename, "-appended")
+ if expected != result {
+ t.Fatalf("%s != %s for FilenameAppend.", expected, result)
+ }
+}