diff options
| -rw-r--r-- | datalogger/logger.go | 39 | ||||
| -rw-r--r-- | networkQuality.go | 64 | ||||
| -rw-r--r-- | rpm/rpm.go | 34 |
3 files changed, 113 insertions, 24 deletions
diff --git a/datalogger/logger.go b/datalogger/logger.go index dd071a1..1a4cd20 100644 --- a/datalogger/logger.go +++ b/datalogger/logger.go @@ -11,7 +11,6 @@ * You should have received a copy of the GNU General Public License along * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. */ - package datalogger import ( @@ -20,6 +19,8 @@ import ( "os" "reflect" "sync" + + "github.com/network-quality/goresponsiveness/utilities" ) type DataLogger[T any] interface { @@ -55,6 +56,33 @@ func (logger *CSVDataLogger[T]) LogRecord(record T) { logger.data = append(logger.data, record) } +func doCustomFormatting(value reflect.Value, tag reflect.StructTag) (string, error) { + if utilities.IsInterfaceNil(value) { + return "", fmt.Errorf("Cannot format an empty interface value") + } + formatMethodName, success := tag.Lookup("Formatter") + if !success { + return "", fmt.Errorf("Could not find the formatter name") + } + formatMethodArgument, success := tag.Lookup("FormatterArgument") + if !success { + return "", fmt.Errorf("Could not find the formatter name") + } + + formatMethod := value.MethodByName(formatMethodName) + if formatMethod == reflect.ValueOf(0) { + return "", fmt.Errorf("Type %v does not support a method named %v", value.Type(), formatMethodName) + } + + formatMethodArgumentUsable := make([]reflect.Value, 1) + formatMethodArgumentUsable[0] = reflect.ValueOf(formatMethodArgument) + result := formatMethod.Call(formatMethodArgumentUsable) + if len(result) == 1 { + return result[0].String(), nil + } + return "", fmt.Errorf("Too many results returned by the format method's invocation.") +} + func (logger *CSVDataLogger[T]) Export() bool { logger.mut.Lock() defer logger.mut.Unlock() @@ -62,8 +90,7 @@ func (logger *CSVDataLogger[T]) Export() bool { return false } - t := new(T) - visibleFields := reflect.VisibleFields(reflect.TypeOf(t).Elem()) + visibleFields := reflect.VisibleFields(reflect.TypeOf((*T)(nil)).Elem()) for _, v := range visibleFields { description, success := v.Tag.Lookup("Description") columnName := fmt.Sprintf("%s", v.Name) @@ -78,7 +105,11 @@ func (logger *CSVDataLogger[T]) Export() bool { for _, v := range visibleFields { data := reflect.ValueOf(d) toWrite := data.FieldByIndex(v.Index) - logger.destination.Write([]byte(fmt.Sprintf("%v, ", toWrite))) + if formattedToWrite, err := doCustomFormatting(toWrite, v.Tag); err == nil { + logger.destination.Write([]byte(fmt.Sprintf("%s,", formattedToWrite))) + } else { + logger.destination.Write([]byte(fmt.Sprintf("%v, ", toWrite))) + } } logger.destination.Write([]byte("\n")) } diff --git a/networkQuality.go b/networkQuality.go index 731f755..73f87de 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -89,7 +89,7 @@ var ( dataLoggerBaseFileName = flag.String( "logger-filename", "", - "Store information about the results of each probe in files with this basename. Time and probe type will be appended (before the first .) to create two separate log files. Disabled by default.", + "Store granular information about tests results in files with this basename. Time and information type will be appended (before the first .) to create separate log files. Disabled by default.", ) ) @@ -188,18 +188,24 @@ func main() { } } - var selfDataLogger datalogger.DataLogger[rpm.DataPoint] = nil - var foreignDataLogger datalogger.DataLogger[rpm.DataPoint] = nil + var selfDataLogger datalogger.DataLogger[rpm.ProbeDataPoint] = nil + var foreignDataLogger datalogger.DataLogger[rpm.ProbeDataPoint] = nil + var downloadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil + var uploadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil // User wants to log data from each probe! if *dataLoggerBaseFileName != "" { var err error = nil unique := time.Now().UTC().Format("01-02-2006-15-04-05") + dataLoggerSelfFilename := utilities.FilenameAppend(*dataLoggerBaseFileName, "-self-"+unique) dataLoggerForeignFilename := utilities.FilenameAppend( *dataLoggerBaseFileName, "-foreign-"+unique, ) - selfDataLogger, err = datalogger.CreateCSVDataLogger[rpm.DataPoint](dataLoggerSelfFilename) + dataLoggerDownloadThroughputFilename := utilities.FilenameAppend(*dataLoggerBaseFileName, "-throughput-download"+unique) + dataLoggerUploadThroughputFilename := utilities.FilenameAppend(*dataLoggerBaseFileName, "-throughput-upload"+unique) + + selfDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ProbeDataPoint](dataLoggerSelfFilename) if err != nil { fmt.Printf( "Warning: Could not create the file for storing self probe results (%s). Disabling functionality.\n", @@ -207,7 +213,8 @@ func main() { ) selfDataLogger = nil } - foreignDataLogger, err = datalogger.CreateCSVDataLogger[rpm.DataPoint]( + + foreignDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ProbeDataPoint]( dataLoggerForeignFilename, ) if err != nil { @@ -217,6 +224,28 @@ func main() { ) foreignDataLogger = nil } + + downloadThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint]( + dataLoggerDownloadThroughputFilename, + ) + if err != nil { + fmt.Printf( + "Warning: Could not create the file for storing download throughput results (%s). Disabling functionality.\n", + dataLoggerDownloadThroughputFilename, + ) + downloadThroughputDataLogger = nil + } + + uploadThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint]( + dataLoggerUploadThroughputFilename, + ) + if err != nil { + fmt.Printf( + "Warning: Could not create the file for storing upload throughput results (%s). Disabling functionality.\n", + dataLoggerUploadThroughputFilename, + ) + uploadThroughputDataLogger = nil + } } /* @@ -265,6 +294,7 @@ func main() { operatingCtx, generate_lgd, generateSelfProbeConfiguration, + downloadThroughputDataLogger, downloadDebugging, ) uploadDataCollectionChannel := rpm.LGCollectData( @@ -272,6 +302,7 @@ func main() { operatingCtx, generate_lgu, generateSelfProbeConfiguration, + uploadThroughputDataLogger, uploadDebugging, ) @@ -403,17 +434,17 @@ func main() { totalForeignRoundTrips := len(foreignProbeDataPoints) foreignProbeRoundTripTimes := utilities.Fmap( foreignProbeDataPoints, - func(dp rpm.DataPoint) float64 { return dp.Duration.Seconds() }, + func(dp rpm.ProbeDataPoint) float64 { return dp.Duration.Seconds() }, ) foreignProbeRoundTripTimeP90 := utilities.CalculatePercentile(foreignProbeRoundTripTimes, 90) downloadRoundTripTimes := utilities.Fmap( downloadDataCollectionResult.DataPoints, - func(dcr rpm.DataPoint) float64 { return dcr.Duration.Seconds() }, + func(dcr rpm.ProbeDataPoint) float64 { return dcr.Duration.Seconds() }, ) uploadRoundTripTimes := utilities.Fmap( uploadDataCollectionResult.DataPoints, - func(dcr rpm.DataPoint) float64 { return dcr.Duration.Seconds() }, + func(dcr rpm.ProbeDataPoint) float64 { return dcr.Duration.Seconds() }, ) selfProbeRoundTripTimes := append(downloadRoundTripTimes, uploadRoundTripTimes...) totalSelfRoundTrips := len(selfProbeRoundTripTimes) @@ -452,6 +483,23 @@ func main() { } foreignDataLogger.Close() } + + if !utilities.IsInterfaceNil(downloadThroughputDataLogger) { + downloadThroughputDataLogger.Export() + if *debugCliFlag { + fmt.Printf("Closing the download throughput data logger.\n") + } + downloadThroughputDataLogger.Close() + } + + if !utilities.IsInterfaceNil(uploadThroughputDataLogger) { + uploadThroughputDataLogger.Export() + if *debugCliFlag { + fmt.Printf("Closing the upload throughput data logger.\n") + } + uploadThroughputDataLogger.Close() + } + cancelOperatingCtx() if *debugCliFlag { fmt.Printf("In debugging mode, we will cool down.\n") @@ -56,20 +56,25 @@ func addFlows( type ProbeConfiguration struct { URL string - DataLogger datalogger.DataLogger[DataPoint] + DataLogger datalogger.DataLogger[ProbeDataPoint] Interval time.Duration } -type DataPoint struct { - Time time.Time `Description:"Time of the generation of the data point."` +type ProbeDataPoint struct { + Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"` RoundTripCount uint64 `Description:"The number of round trips measured by this data point."` Duration time.Duration `Description:"The duration for this measurement."` } +type ThroughputDataPoint struct { + Time time.Time `Description:"Time of the generation of the data point." Formatter:"Format" FormatterArgument:"01-02-2006-15-04-05.000"` + Throughput float64 `Description:"Instantaneous throughput (b/s)."` +} + type SelfDataCollectionResult struct { RateBps float64 LGCs []lgc.LoadGeneratingConnection - DataPoints []DataPoint + DataPoints []ProbeDataPoint } type ProbeType int64 @@ -89,11 +94,11 @@ func (pt ProbeType) Value() string { func Probe( parentProbeCtx context.Context, waitGroup *sync.WaitGroup, - logger datalogger.DataLogger[DataPoint], + logger datalogger.DataLogger[ProbeDataPoint], client *http.Client, probeUrl string, probeType ProbeType, - result *chan DataPoint, + result *chan ProbeDataPoint, debugging *debug.DebugWithPrefix, ) error { @@ -181,7 +186,7 @@ func Probe( ) } }() - dataPoint := DataPoint{Time: time.Now(), RoundTripCount: roundTripCount, Duration: totalDelay} + dataPoint := ProbeDataPoint{Time: time_before_probe, RoundTripCount: roundTripCount, Duration: totalDelay} if !utilities.IsInterfaceNil(logger) { logger.LogRecord(dataPoint) } @@ -194,8 +199,8 @@ func ForeignProber( foreignProbeConfigurationGenerator func() ProbeConfiguration, keyLogger io.Writer, debugging *debug.DebugWithPrefix, -) (points chan DataPoint) { - points = make(chan DataPoint) +) (points chan ProbeDataPoint) { + points = make(chan ProbeDataPoint) foreignProbeConfiguration := foreignProbeConfigurationGenerator() @@ -272,8 +277,8 @@ func SelfProber( altConnections *[]lgc.LoadGeneratingConnection, selfProbeConfiguration ProbeConfiguration, debugging *debug.DebugWithPrefix, -) (points chan DataPoint) { - points = make(chan DataPoint) +) (points chan ProbeDataPoint) { + points = make(chan ProbeDataPoint) debugging = debug.NewDebugWithPrefix(debugging.Level, debugging.Prefix+" self probe") @@ -321,6 +326,7 @@ func LGCollectData( operatingCtx context.Context, lgcGenerator func() lgc.LoadGeneratingConnection, selfProbeConfigurationGenerator func() ProbeConfiguration, + throughputDataLogger datalogger.DataLogger[ThroughputDataPoint], debugging *debug.DebugWithPrefix, ) (resulted chan SelfDataCollectionResult) { resulted = make(chan SelfDataCollectionResult) @@ -437,6 +443,10 @@ func LGCollectData( previousMovingAverage, ) + if !utilities.IsInterfaceNil(throughputDataLogger) { + throughputDataLogger.LogRecord(ThroughputDataPoint{time.Now(), currentMovingAverage}) + } + if debug.IsDebug(debugging.Level) { fmt.Printf( "%v: Instantaneous goodput: %f MB.\n", @@ -517,7 +527,7 @@ func LGCollectData( } selfProbeCtxCancel() - selfProbeDataPoints := make([]DataPoint, 0) + selfProbeDataPoints := make([]ProbeDataPoint, 0) for dataPoint := range probeDataPointsChannel { selfProbeDataPoints = append(selfProbeDataPoints, dataPoint) } |
