diff options
| -rw-r--r-- | datalogger/logger.go | 12 | ||||
| -rw-r--r-- | networkQuality.go | 84 | ||||
| -rw-r--r-- | rpm/rpm.go | 40 |
3 files changed, 102 insertions, 34 deletions
diff --git a/datalogger/logger.go b/datalogger/logger.go index 4c3b080..249a059 100644 --- a/datalogger/logger.go +++ b/datalogger/logger.go @@ -108,17 +108,27 @@ func (logger *CSVDataLogger[T]) Export() bool { return false } + toOmit := make([]int, 0) visibleFields := reflect.VisibleFields(reflect.TypeOf((*T)(nil)).Elem()) - for _, v := range visibleFields { + for i, v := range visibleFields { description, success := v.Tag.Lookup("Description") columnName := fmt.Sprintf("%s", v.Name) if success { + if description == "[OMIT]" { + toOmit = append(toOmit, i) + continue + } columnName = fmt.Sprintf("%s", description) } logger.destination.Write([]byte(fmt.Sprintf("%s, ", columnName))) } logger.destination.Write([]byte("\n")) + // Remove the Omitted fields + for _, i := range toOmit { + visibleFields = append(visibleFields[:i], visibleFields[i+1:]...) + } + for _, d := range logger.data { for _, v := range visibleFields { data := reflect.ValueOf(d) diff --git a/networkQuality.go b/networkQuality.go index 9235a90..6f6f8ea 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -204,8 +204,9 @@ func main() { var foreignProbeDataLogger datalogger.DataLogger[rpm.ProbeDataPoint] = nil var downloadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil var uploadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil + var granularThroughputDataLogger datalogger.DataLogger[rpm.GranularThroughputDataPoint] = nil - // User wants to log data from each probe! + // User wants to log data if *dataLoggerBaseFileName != "" { var err error = nil unique := time.Now().UTC().Format("01-02-2006-15-04-05") @@ -217,11 +218,15 @@ func main() { ) dataLoggerDownloadThroughputFilename := utilities.FilenameAppend( *dataLoggerBaseFileName, - "-throughput-download"+unique, + "-throughput-download-"+unique, ) dataLoggerUploadThroughputFilename := utilities.FilenameAppend( *dataLoggerBaseFileName, - "-throughput-upload"+unique, + "-throughput-upload-"+unique, + ) + dataLoggerGranularThroughputFilename := utilities.FilenameAppend( + *dataLoggerBaseFileName, + "-throughput-granular-"+unique, ) selfProbeDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ProbeDataPoint]( @@ -267,6 +272,17 @@ func main() { ) uploadThroughputDataLogger = nil } + + granularThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.GranularThroughputDataPoint]( + dataLoggerGranularThroughputFilename, + ) + if err != nil { + fmt.Printf( + "Warning: Could not create the file for storing granular throughput results (%s). Disabling functionality.\n", + dataLoggerGranularThroughputFilename, + ) + granularThroughputDataLogger = nil + } } // If, for some reason, the data loggers are nil, make them Null Data Loggers so that we don't have conditional // code later. @@ -282,6 +298,9 @@ func main() { if uploadThroughputDataLogger == nil { uploadThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]() } + if granularThroughputDataLogger == nil { + granularThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.GranularThroughputDataPoint]() + } /* * Create (and then, ironically, name) two anonymous functions that, when invoked, @@ -323,7 +342,7 @@ func main() { // data collection go routines stops well before the other, they will continue to send probes and we can // generate additional information! - selfProbeConnectionCommunicationChannel, downloadThroughputChannel := rpm.LoadGenerator( + selfDownProbeConnectionCommunicationChannel, downloadThroughputChannel := rpm.LoadGenerator( lgNetworkActivityCtx, downloadLoadGeneratorOperatorCtx, time.Second, @@ -331,7 +350,7 @@ func main() { &downloadLoadGeneratingConnectionCollection, downloadDebugging, ) - _, uploadThroughputChannel := rpm.LoadGenerator( + selfUpProbeConnectionCommunicationChannel, uploadThroughputChannel := rpm.LoadGenerator( lgNetworkActivityCtx, uploadLoadGeneratorOperatorCtx, time.Second, @@ -340,8 +359,6 @@ func main() { uploadDebugging, ) - // start here. - selfDownProbeConnection := <-selfDownProbeConnectionCommunicationChannel selfUpProbeConnection := <-selfUpProbeConnectionCommunicationChannel @@ -349,7 +366,8 @@ func main() { proberCtx, generateForeignProbeConfiguration, generateSelfProbeConfiguration, - selfProbeConnection, + selfDownProbeConnection, + selfUpProbeConnection, time.Millisecond*100, sslKeyFileConcurrentWriter, combinedProbeDebugging, @@ -417,6 +435,11 @@ timeout: "################# Download is instantaneously %s.\n", utilities.Conditional(downloadThroughputIsStable, "stable", "unstable")) } downloadThroughputDataLogger.LogRecord(downloadThroughputMeasurement) + for i := range downloadThroughputMeasurement.GranularThroughputDataPoints { + datapoint := downloadThroughputMeasurement.GranularThroughputDataPoints[i] + datapoint.Direction = "Download" + granularThroughputDataLogger.LogRecord(datapoint) + } lastDownloadThroughputRate = downloadThroughputMeasurement.Throughput lastDownloadThroughputOpenConnectionCount = downloadThroughputMeasurement.Connections @@ -431,6 +454,11 @@ timeout: "################# Upload is instantaneously %s.\n", utilities.Conditional(uploadThroughputIsStable, "stable", "unstable")) } uploadThroughputDataLogger.LogRecord(uploadThroughputMeasurement) + for i := range uploadThroughputMeasurement.GranularThroughputDataPoints { + datapoint := uploadThroughputMeasurement.GranularThroughputDataPoints[i] + datapoint.Direction = "Upload" + granularThroughputDataLogger.LogRecord(datapoint) + } lastUploadThroughputRate = uploadThroughputMeasurement.Throughput lastUploadThroughputOpenConnectionCount = uploadThroughputMeasurement.Connections @@ -452,7 +480,7 @@ timeout: foreignRtts.AddElement(probeMeasurement.Duration.Seconds() / float64(probeMeasurement.RoundTripCount)) } - } else if probeMeasurement.Type == rpm.Self { + } else if probeMeasurement.Type == rpm.SelfDown || probeMeasurement.Type == rpm.SelfUp { selfRtts.AddElement(probeMeasurement.Duration.Seconds()) } @@ -462,7 +490,7 @@ timeout: if probeMeasurement.Type == rpm.Foreign { foreignProbeDataLogger.LogRecord(probeMeasurement) - } else if probeMeasurement.Type == rpm.Self { + } else if probeMeasurement.Type == rpm.SelfDown || probeMeasurement.Type == rpm.SelfUp { selfProbeDataLogger.LogRecord(probeMeasurement) } } @@ -473,6 +501,8 @@ timeout: } } + // TODO: Reset timeout to RPM timeout stat? + // Did the test run to stability? testRanToStability := (downloadThroughputIsStable && uploadThroughputIsStable && responsivenessIsStable) @@ -538,6 +568,7 @@ timeout: // we already did that roughly-equal split up when we added them to the foreignRtts IMS. foreignProbeRoundTripTimeP90 := foreignRtts.Percentile(90) + // This is 60 because we measure in seconds not ms rpm := 60.0 / (float64(selfProbeRoundTripTimeP90+foreignProbeRoundTripTimeP90) / 2.0) if *debugCliFlag { @@ -550,8 +581,24 @@ timeout: ) } + if !testRanToStability { + fmt.Printf("Test did not run to stability, these results are estimates:\n") + } fmt.Printf("RPM: %5.0f\n", rpm) + fmt.Printf( + "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", + utilities.ToMbps(lastDownloadThroughputRate), + utilities.ToMBps(lastDownloadThroughputRate), + lastDownloadThroughputOpenConnectionCount, + ) + fmt.Printf( + "Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", + utilities.ToMbps(lastUploadThroughputRate), + utilities.ToMBps(lastUploadThroughputRate), + lastUploadThroughputOpenConnectionCount, + ) + if *calculateExtendedStats { fmt.Println(extendedStats.Repr()) } @@ -580,18 +627,11 @@ timeout: } uploadThroughputDataLogger.Close() - fmt.Printf( - "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", - utilities.ToMbps(lastDownloadThroughputRate), - utilities.ToMBps(lastDownloadThroughputRate), - lastDownloadThroughputOpenConnectionCount, - ) - fmt.Printf( - "Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", - utilities.ToMbps(lastUploadThroughputRate), - utilities.ToMBps(lastUploadThroughputRate), - lastUploadThroughputOpenConnectionCount, - ) + granularThroughputDataLogger.Export() + if *debugCliFlag { + fmt.Printf("Closing the granular throughput data logger.\n") + } + granularThroughputDataLogger.Close() if *debugCliFlag { fmt.Printf("In debugging mode, we will cool down.\n") @@ -68,10 +68,18 @@ type ProbeDataPoint struct { Type ProbeType `Description:"The type of the probe." Formatter:"Value"` } +type GranularThroughputDataPoint struct { + Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"` + Throughput float64 `Description:"Instantaneous throughput (B/s)."` + ConnID uint32 `Description:"Position of connection (ID)."` + Direction string `Description:"Direction of Throughput."` +} + type ThroughputDataPoint struct { - Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"` - Throughput float64 `Description:"Instantaneous throughput (b/s)."` - Connections int `Description: Number of parallel connections."` + Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"` + Throughput float64 `Description:"Instantaneous throughput (B/s)."` + Connections int `Description:"Number of parallel connections."` + GranularThroughputDataPoints []GranularThroughputDataPoint `Description:"[OMIT]"` } type SelfDataCollectionResult struct { @@ -84,13 +92,16 @@ type SelfDataCollectionResult struct { type ProbeType int64 const ( - Self ProbeType = iota + SelfUp ProbeType = iota + SelfDown Foreign ) func (pt ProbeType) Value() string { - if pt == Self { - return "Self" + if pt == SelfUp { + return "SelfUp" + } else if pt == SelfDown { + return "SelfDown" } return "Foreign" } @@ -164,7 +175,7 @@ func Probe( ) + probeTracer.GetTCPDelta() // We must have reused the connection if we are a self probe! - if probeType == Self && !probeTracer.stats.ConnectionReused { + if (probeType == SelfUp || probeType == SelfDown) && !probeTracer.stats.ConnectionReused { panic(!probeTracer.stats.ConnectionReused) } @@ -226,7 +237,8 @@ func CombinedProber( proberCtx context.Context, foreignProbeConfigurationGenerator func() ProbeConfiguration, selfProbeConfigurationGenerator func() ProbeConfiguration, - selfProbeConnection lgc.LoadGeneratingConnection, + selfDownProbeConnection lgc.LoadGeneratingConnection, + selfUpProbeConnection lgc.LoadGeneratingConnection, probeInterval time.Duration, keyLogger io.Writer, debugging *debug.DebugWithPrefix, @@ -289,10 +301,11 @@ func CombinedProber( debugging, ) + // Start Download Connection Prober go Probe( proberCtx, &wg, - selfProbeConnection.Client(), + selfDownProbeConnection.Client(), selfProbeConfiguration.URL, SelfDown, &dataPoints, @@ -390,6 +403,8 @@ func LoadGenerator( // Compute "instantaneous aggregate" goodput which is the number of // bytes transferred within the last second. var instantaneousTotalThroughput float64 = 0 + granularThroughputDatapoints := make([]GranularThroughputDataPoint, 0) + now = time.Now() // Used to align granular throughput data allInvalid := true for i := range *loadGeneratingConnections.LGCs { if !(*loadGeneratingConnections.LGCs)[i].IsValid() { @@ -400,6 +415,8 @@ func LoadGenerator( (*loadGeneratingConnections.LGCs)[i].ClientId(), ) } + // TODO: Do we add null connection to throughput? and how do we define it? Throughput -1 or 0? + granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, 0, uint32(i), ""}) continue } allInvalid = false @@ -411,6 +428,7 @@ func LoadGenerator( currentInterval.Seconds(), ) instantaneousTotalThroughput += instantaneousConnectionThroughput + granularThroughputDatapoints = append(granularThroughputDatapoints, GranularThroughputDataPoint{now, instantaneousConnectionThroughput, uint32(i), ""}) } // For some reason, all the lgcs are invalid. This likely means that @@ -426,7 +444,7 @@ func LoadGenerator( } // We have generated a throughput calculation -- let's send it back to the coordinator - throughputDataPoint := ThroughputDataPoint{time.Now(), instantaneousTotalThroughput, len(*loadGeneratingConnections.LGCs)} + throughputDataPoint := ThroughputDataPoint{time.Now(), instantaneousTotalThroughput, len(*loadGeneratingConnections.LGCs), granularThroughputDatapoints} throughputCalculations <- throughputDataPoint // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now! @@ -647,7 +665,7 @@ func (probe *ProbeTracer) SetGotConnTimeInfo( probe.stats.GetConnectionDoneTime = now probe.stats.ConnInfo = gotConnInfo probe.stats.ConnectionReused = gotConnInfo.Reused - if probe.probeType == Self && !gotConnInfo.Reused { + if (probe.probeType == SelfUp || probe.probeType == SelfDown) && !gotConnInfo.Reused { fmt.Fprintf( os.Stderr, "A self probe sent used a new connection!\n", |
