summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--datalogger/logger.go12
-rw-r--r--networkQuality.go84
-rw-r--r--rpm/rpm.go40
3 files changed, 102 insertions, 34 deletions
diff --git a/datalogger/logger.go b/datalogger/logger.go
index 4c3b080..249a059 100644
--- a/datalogger/logger.go
+++ b/datalogger/logger.go
@@ -108,17 +108,27 @@ func (logger *CSVDataLogger[T]) Export() bool {
return false
}
+ toOmit := make([]int, 0)
visibleFields := reflect.VisibleFields(reflect.TypeOf((*T)(nil)).Elem())
- for _, v := range visibleFields {
+ for i, v := range visibleFields {
description, success := v.Tag.Lookup("Description")
columnName := fmt.Sprintf("%s", v.Name)
if success {
+ if description == "[OMIT]" {
+ toOmit = append(toOmit, i)
+ continue
+ }
columnName = fmt.Sprintf("%s", description)
}
logger.destination.Write([]byte(fmt.Sprintf("%s, ", columnName)))
}
logger.destination.Write([]byte("\n"))
+ // Remove the Omitted fields
+ for _, i := range toOmit {
+ visibleFields = append(visibleFields[:i], visibleFields[i+1:]...)
+ }
+
for _, d := range logger.data {
for _, v := range visibleFields {
data := reflect.ValueOf(d)
diff --git a/networkQuality.go b/networkQuality.go
index 9235a90..6f6f8ea 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -204,8 +204,9 @@ func main() {
var foreignProbeDataLogger datalogger.DataLogger[rpm.ProbeDataPoint] = nil
var downloadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil
var uploadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil
+ var granularThroughputDataLogger datalogger.DataLogger[rpm.GranularThroughputDataPoint] = nil
- // User wants to log data from each probe!
+ // User wants to log data
if *dataLoggerBaseFileName != "" {
var err error = nil
unique := time.Now().UTC().Format("01-02-2006-15-04-05")
@@ -217,11 +218,15 @@ func main() {
)
dataLoggerDownloadThroughputFilename := utilities.FilenameAppend(
*dataLoggerBaseFileName,
- "-throughput-download"+unique,
+ "-throughput-download-"+unique,
)
dataLoggerUploadThroughputFilename := utilities.FilenameAppend(
*dataLoggerBaseFileName,
- "-throughput-upload"+unique,
+ "-throughput-upload-"+unique,
+ )
+ dataLoggerGranularThroughputFilename := utilities.FilenameAppend(
+ *dataLoggerBaseFileName,
+ "-throughput-granular-"+unique,
)
selfProbeDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ProbeDataPoint](
@@ -267,6 +272,17 @@ func main() {
)
uploadThroughputDataLogger = nil
}
+
+ granularThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.GranularThroughputDataPoint](
+ dataLoggerGranularThroughputFilename,
+ )
+ if err != nil {
+ fmt.Printf(
+ "Warning: Could not create the file for storing granular throughput results (%s). Disabling functionality.\n",
+ dataLoggerGranularThroughputFilename,
+ )
+ granularThroughputDataLogger = nil
+ }
}
// If, for some reason, the data loggers are nil, make them Null Data Loggers so that we don't have conditional
// code later.
@@ -282,6 +298,9 @@ func main() {
if uploadThroughputDataLogger == nil {
uploadThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]()
}
+ if granularThroughputDataLogger == nil {
+ granularThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.GranularThroughputDataPoint]()
+ }
/*
* Create (and then, ironically, name) two anonymous functions that, when invoked,
@@ -323,7 +342,7 @@ func main() {
// data collection go routines stops well before the other, they will continue to send probes and we can
// generate additional information!
- selfProbeConnectionCommunicationChannel, downloadThroughputChannel := rpm.LoadGenerator(
+ selfDownProbeConnectionCommunicationChannel, downloadThroughputChannel := rpm.LoadGenerator(
lgNetworkActivityCtx,
downloadLoadGeneratorOperatorCtx,
time.Second,
@@ -331,7 +350,7 @@ func main() {
&downloadLoadGeneratingConnectionCollection,
downloadDebugging,
)
- _, uploadThroughputChannel := rpm.LoadGenerator(
+ selfUpProbeConnectionCommunicationChannel, uploadThroughputChannel := rpm.LoadGenerator(
lgNetworkActivityCtx,
uploadLoadGeneratorOperatorCtx,
time.Second,
@@ -340,8 +359,6 @@ func main() {
uploadDebugging,
)
- // start here.
-
selfDownProbeConnection := <-selfDownProbeConnectionCommunicationChannel
selfUpProbeConnection := <-selfUpProbeConnectionCommunicationChannel
@@ -349,7 +366,8 @@ func main() {
proberCtx,
generateForeignProbeConfiguration,
generateSelfProbeConfiguration,
- selfProbeConnection,
+ selfDownProbeConnection,
+ selfUpProbeConnection,
time.Millisecond*100,
sslKeyFileConcurrentWriter,
combinedProbeDebugging,
@@ -417,6 +435,11 @@ timeout:
"################# Download is instantaneously %s.\n", utilities.Conditional(downloadThroughputIsStable, "stable", "unstable"))
}
downloadThroughputDataLogger.LogRecord(downloadThroughputMeasurement)
+ for i := range downloadThroughputMeasurement.GranularThroughputDataPoints {
+ datapoint := downloadThroughputMeasurement.GranularThroughputDataPoints[i]
+ datapoint.Direction = "Download"
+ granularThroughputDataLogger.LogRecord(datapoint)
+ }
lastDownloadThroughputRate = downloadThroughputMeasurement.Throughput
lastDownloadThroughputOpenConnectionCount = downloadThroughputMeasurement.Connections
@@ -431,6 +454,11 @@ timeout:
"################# Upload is instantaneously %s.\n", utilities.Conditional(uploadThroughputIsStable, "stable", "unstable"))
}
uploadThroughputDataLogger.LogRecord(uploadThroughputMeasurement)
+ for i := range uploadThroughputMeasurement.GranularThroughputDataPoints {
+ datapoint := uploadThroughputMeasurement.GranularThroughputDataPoints[i]
+ datapoint.Direction = "Upload"
+ granularThroughputDataLogger.LogRecord(datapoint)
+ }
lastUploadThroughputRate = uploadThroughputMeasurement.Throughput
lastUploadThroughputOpenConnectionCount = uploadThroughputMeasurement.Connections
@@ -452,7 +480,7 @@ timeout:
foreignRtts.AddElement(probeMeasurement.Duration.Seconds() / float64(probeMeasurement.RoundTripCount))
}
- } else if probeMeasurement.Type == rpm.Self {
+ } else if probeMeasurement.Type == rpm.SelfDown || probeMeasurement.Type == rpm.SelfUp {
selfRtts.AddElement(probeMeasurement.Duration.Seconds())
}
@@ -462,7 +490,7 @@ timeout:
if probeMeasurement.Type == rpm.Foreign {
foreignProbeDataLogger.LogRecord(probeMeasurement)
- } else if probeMeasurement.Type == rpm.Self {
+ } else if probeMeasurement.Type == rpm.SelfDown || probeMeasurement.Type == rpm.SelfUp {
selfProbeDataLogger.LogRecord(probeMeasurement)
}
}
@@ -473,6 +501,8 @@ timeout:
}
}
+ // TODO: Reset timeout to RPM timeout stat?
+
// Did the test run to stability?
testRanToStability := (downloadThroughputIsStable && uploadThroughputIsStable && responsivenessIsStable)
@@ -538,6 +568,7 @@ timeout:
// we already did that roughly-equal split up when we added them to the foreignRtts IMS.
foreignProbeRoundTripTimeP90 := foreignRtts.Percentile(90)
+ // This is 60 because we measure in seconds not ms
rpm := 60.0 / (float64(selfProbeRoundTripTimeP90+foreignProbeRoundTripTimeP90) / 2.0)
if *debugCliFlag {
@@ -550,8 +581,24 @@ timeout:
)
}
+ if !testRanToStability {
+ fmt.Printf("Test did not run to stability, these results are estimates:\n")
+ }
fmt.Printf("RPM: %5.0f\n", rpm)
+ fmt.Printf(
+ "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
+ utilities.ToMbps(lastDownloadThroughputRate),
+ utilities.ToMBps(lastDownloadThroughputRate),
+ lastDownloadThroughputOpenConnectionCount,
+ )
+ fmt.Printf(
+ "Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
+ utilities.ToMbps(lastUploadThroughputRate),
+ utilities.ToMBps(lastUploadThroughputRate),
+ lastUploadThroughputOpenConnectionCount,
+ )
+
if *calculateExtendedStats {
fmt.Println(extendedStats.Repr())
}
@@ -580,18 +627,11 @@ timeout:
}
uploadThroughputDataLogger.Close()
- fmt.Printf(
- "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
- utilities.ToMbps(lastDownloadThroughputRate),
- utilities.ToMBps(lastDownloadThroughputRate),
- lastDownloadThroughputOpenConnectionCount,
- )
- fmt.Printf(
- "Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
- utilities.ToMbps(lastUploadThroughputRate),
- utilities.ToMBps(lastUploadThroughputRate),
- lastUploadThroughputOpenConnectionCount,
- )
+ granularThroughputDataLogger.Export()
+ if *debugCliFlag {
+ fmt.Printf("Closing the granular throughput data logger.\n")
+ }
+ granularThroughputDataLogger.Close()
if *debugCliFlag {
fmt.Printf("In debugging mode, we will cool down.\n")
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 740db2b..db92571 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -68,10 +68,18 @@ type ProbeDataPoint struct {
Type ProbeType `Description:"The type of the probe." Formatter:"Value"`
}
+type GranularThroughputDataPoint struct {
+ Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"`
+ Throughput float64 `Description:"Instantaneous throughput (B/s)."`
+ ConnID uint32 `Description:"Position of connection (ID)."`
+ Direction string `Description:"Direction of Throughput."`
+}
+
type ThroughputDataPoint struct {
- Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"`
- Throughput float64 `Description:"Instantaneous throughput (b/s)."`
- Connections int `Description: Number of parallel connections."`
+ Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"`
+ Throughput float64 `Description:"Instantaneous throughput (B/s)."`
+ Connections int `Description:"Number of parallel connections."`
+ GranularThroughputDataPoints []GranularThroughputDataPoint `Description:"[OMIT]"`
}
type SelfDataCollectionResult struct {
@@ -84,13 +92,16 @@ type SelfDataCollectionResult struct {
type ProbeType int64
const (
- Self ProbeType = iota
+ SelfUp ProbeType = iota
+ SelfDown
Foreign
)
func (pt ProbeType) Value() string {
- if pt == Self {
- return "Self"
+ if pt == SelfUp {
+ return "SelfUp"
+ } else if pt == SelfDown {
+ return "SelfDown"
}
return "Foreign"
}
@@ -164,7 +175,7 @@ func Probe(
) + probeTracer.GetTCPDelta()
// We must have reused the connection if we are a self probe!
- if probeType == Self && !probeTracer.stats.ConnectionReused {
+ if (probeType == SelfUp || probeType == SelfDown) && !probeTracer.stats.ConnectionReused {
panic(!probeTracer.stats.ConnectionReused)
}
@@ -226,7 +237,8 @@ func CombinedProber(
proberCtx context.Context,
foreignProbeConfigurationGenerator func() ProbeConfiguration,
selfProbeConfigurationGenerator func() ProbeConfiguration,
- selfProbeConnection lgc.LoadGeneratingConnection,
+ selfDownProbeConnection lgc.LoadGeneratingConnection,
+ selfUpProbeConnection lgc.LoadGeneratingConnection,
probeInterval time.Duration,
keyLogger io.Writer,
debugging *debug.DebugWithPrefix,
@@ -289,10 +301,11 @@ func CombinedProber(
debugging,
)
+ // Start Download Connection Prober
go Probe(
proberCtx,
&wg,
- selfProbeConnection.Client(),
+ selfDownProbeConnection.Client(),
selfProbeConfiguration.URL,
SelfDown,
&dataPoints,
@@ -390,6 +403,8 @@ func LoadGenerator(
// Compute "instantaneous aggregate" goodput which is the number of
// bytes transferred within the last second.
var instantaneousTotalThroughput float64 = 0
+ granularThroughputDatapoints := make([]GranularThroughputDataPoint, 0)
+ now = time.Now() // Used to align granular throughput data
allInvalid := true
for i := range *loadGeneratingConnections.LGCs {
if !(*loadGeneratingConnections.LGCs)[i].IsValid() {
@@ -400,6 +415,8 @@ func LoadGenerator(
(*loadGeneratingConnections.LGCs)[i].ClientId(),
)
}
+ // TODO: Do we add null connection to throughput? and how do we define it? Throughput -1 or 0?
+ granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, 0, uint32(i), ""})
continue
}
allInvalid = false
@@ -411,6 +428,7 @@ func LoadGenerator(
currentInterval.Seconds(),
)
instantaneousTotalThroughput += instantaneousConnectionThroughput
+ granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, instantaneousConnectionThroughput, uint32(i), ""})
}
// For some reason, all the lgcs are invalid. This likely means that
@@ -426,7 +444,7 @@ func LoadGenerator(
}
// We have generated a throughput calculation -- let's send it back to the coordinator
- throughputDataPoint := ThroughputDataPoint{time.Now(), instantaneousTotalThroughput, len(*loadGeneratingConnections.LGCs)}
+ throughputDataPoint := ThroughputDataPoint{time.Now(), instantaneousTotalThroughput, len(*loadGeneratingConnections.LGCs), granularThroughputDatapoints}
throughputCalculations <- throughputDataPoint
// Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now!
@@ -647,7 +665,7 @@ func (probe *ProbeTracer) SetGotConnTimeInfo(
probe.stats.GetConnectionDoneTime = now
probe.stats.ConnInfo = gotConnInfo
probe.stats.ConnectionReused = gotConnInfo.Reused
- if probe.probeType == Self && !gotConnInfo.Reused {
+ if (probe.probeType == SelfUp || probe.probeType == SelfDown) && !gotConnInfo.Reused {
fmt.Fprintf(
os.Stderr,
"A self probe sent used a new connection!\n",