diff options
Diffstat (limited to 'rpm')
| -rw-r--r-- | rpm/calculations.go | 62 | ||||
| -rw-r--r-- | rpm/rpm.go | 37 |
2 files changed, 71 insertions, 28 deletions
diff --git a/rpm/calculations.go b/rpm/calculations.go index 5387aa7..4b61413 100644 --- a/rpm/calculations.go +++ b/rpm/calculations.go @@ -17,40 +17,60 @@ package rpm import ( "fmt" - "github.com/network-quality/goresponsiveness/ms" + "github.com/network-quality/goresponsiveness/series" + "github.com/network-quality/goresponsiveness/utilities" + "golang.org/x/exp/constraints" ) -type Rpm struct { +type Rpm[Data utilities.Number] struct { SelfRttsTotal int ForeignRttsTotal int SelfRttsTrimmed int ForeignRttsTrimmed int - SelfProbeRttPN float64 - ForeignProbeRttPN float64 + SelfProbeRttPN Data + ForeignProbeRttPN Data SelfProbeRttMean float64 ForeignProbeRttMean float64 PNRpm float64 MeanRpm float64 } -func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.MathematicalSeries[float64], trimming uint, percentile int) Rpm { - // First, let's do a double-sided trim of the top/bottom 10% of our measurements. - selfRttsTotalCount := selfRtts.Len() - foreignRttsTotalCount := foreignRtts.Len() +func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered]( + selfRtts series.WindowSeries[Data, Bucket], aggregatedForeignRtts series.WindowSeries[Data, Bucket], trimming uint, percentile int, +) Rpm[Data] { + // There may be more than one round trip accumulated together. If that is the case, + // we will blow them apart in to three separate measurements and each one will just + // be 1 / 3. + foreignRtts := series.NewWindowSeries[Data, int](series.Forever, 0) + foreignBuckets := 0 + for _, v := range aggregatedForeignRtts.GetValues() { + if utilities.IsSome(v) { + v := utilities.GetSome(v) + foreignRtts.Reserve(foreignBuckets) + foreignRtts.Reserve(foreignBuckets + 1) + foreignRtts.Reserve(foreignBuckets + 2) + foreignRtts.Fill(foreignBuckets, v/3) + foreignRtts.Fill(foreignBuckets+1, v/3) + foreignRtts.Fill(foreignBuckets+2, v/3) + foreignBuckets += 3 + } + } - selfRttsTrimmed := selfRtts.DoubleSidedTrim(trimming) - foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(trimming) + // First, let's do a double-sided trim of the top/bottom 10% of our measurements. + selfRttsTotalCount, _ := selfRtts.Count() + foreignRttsTotalCount, _ := foreignRtts.Count() - selfRttsTrimmedCount := selfRttsTrimmed.Len() - foreignRttsTrimmedCount := foreignRttsTrimmed.Len() + _, selfProbeRoundTripTimeMean, selfRttsTrimmed := + series.TrimmedMean(selfRtts, int(trimming)) + _, foreignProbeRoundTripTimeMean, foreignRttsTrimmed := + series.TrimmedMean(foreignRtts, int(trimming)) - // Then, let's take the mean of those ... - selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage() - foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage() + selfRttsTrimmedCount := len(selfRttsTrimmed) + foreignRttsTrimmedCount := len(foreignRttsTrimmed) // Second, let's do the P90 calculations. - selfProbeRoundTripTimePN := selfRtts.Percentile(percentile) - foreignProbeRoundTripTimePN := foreignRtts.Percentile(percentile) + _, selfProbeRoundTripTimePN := series.Percentile(selfRtts, percentile) + _, foreignProbeRoundTripTimePN := series.Percentile(foreignRtts, percentile) // Note: The specification indicates that we want to calculate the foreign probes as such: // 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign @@ -62,7 +82,7 @@ func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.Mathem pnRpm := 60.0 / (float64(selfProbeRoundTripTimePN+foreignProbeRoundTripTimePN) / 2.0) meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0) - return Rpm{ + return Rpm[Data]{ SelfRttsTotal: selfRttsTotalCount, ForeignRttsTotal: foreignRttsTotalCount, SelfRttsTrimmed: selfRttsTrimmedCount, ForeignRttsTrimmed: foreignRttsTrimmedCount, SelfProbeRttPN: selfProbeRoundTripTimePN, ForeignProbeRttPN: foreignProbeRoundTripTimePN, @@ -71,14 +91,14 @@ func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.Mathem } } -func (rpm *Rpm) ToString() string { +func (rpm *Rpm[Data]) ToString() string { return fmt.Sprintf( `Total Self Probes: %d Total Foreign Probes: %d Trimmed Self Probes Count: %d Trimmed Foreign Probes Count: %d -P90 Self RTT: %f -P90 Foreign RTT: %f +P90 Self RTT: %v +P90 Foreign RTT: %v Trimmed Mean Self RTT: %f Trimmed Mean Foreign RTT: %f `, @@ -29,6 +29,7 @@ import ( "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" "github.com/network-quality/goresponsiveness/probe" + "github.com/network-quality/goresponsiveness/series" "github.com/network-quality/goresponsiveness/utilities" ) @@ -84,6 +85,11 @@ type SelfDataCollectionResult struct { LoggingContinuation func() } +type ResponsivenessProbeResult struct { + Foreign *probe.ProbeDataPoint + Self *probe.ProbeDataPoint +} + func ResponsivenessProber( proberCtx context.Context, networkActivityCtx context.Context, @@ -95,7 +101,7 @@ func ResponsivenessProber( keyLogger io.Writer, captureExtendedStats bool, debugging *debug.DebugWithPrefix, -) (dataPoints chan utilities.Pair[*probe.ProbeDataPoint]) { +) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, uint]) { if debug.IsDebug(debugging.Level) { fmt.Printf( "(%s) Starting to collect responsiveness information at an interval of %v!\n", @@ -106,7 +112,7 @@ func ResponsivenessProber( // Make a channel to send back all the generated data points // when we are probing. - dataPoints = make(chan utilities.Pair[*probe.ProbeDataPoint]) + dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, uint]) go func() { wg := sync.WaitGroup{} @@ -141,6 +147,11 @@ func ResponsivenessProber( ) } + dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{ + Type: series.SeriesMessageReserve, Bucket: probeCount, + Measure: utilities.None[ResponsivenessProbeResult](), + } + // The presence of a custom TLSClientConfig in a *generic* `transport` // means that go will default to HTTP/1.1 and cowardly avoid HTTP/2: // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278 @@ -262,8 +273,13 @@ func ResponsivenessProber( dataPointsLock.Lock() // Now we have our four data points (three in the foreign probe data point and one in the self probe data point) if dataPoints != nil { - dataPoints <- utilities.Pair[*probe.ProbeDataPoint]{ - First: foreignProbeDataPoint, Second: selfProbeDataPoint, + measurement := ResponsivenessProbeResult{ + Foreign: foreignProbeDataPoint, Self: selfProbeDataPoint, + } + + dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{ + Type: series.SeriesMessageMeasure, Bucket: probeCount, + Measure: utilities.Some[ResponsivenessProbeResult](measurement), } } dataPointsLock.Unlock() @@ -300,8 +316,8 @@ func LoadGenerator( mnp int, captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections? debugging *debug.DebugWithPrefix, // How can we forget debugging? -) (stabilizerCommunicationChannel chan ThroughputDataPoint) { // Send back all the instantaneous throughputs that we generate. - stabilizerCommunicationChannel = make(chan ThroughputDataPoint) +) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, uint64]) { // Send back all the instantaneous throughputs that we generate. + seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, uint64]) go func() { flowsCreated := uint64(0) @@ -454,7 +470,14 @@ func LoadGenerator( len(*loadGeneratingConnectionsCollection.LGCs), granularThroughputDatapoints, } - stabilizerCommunicationChannel <- throughputDataPoint + + seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{ + Type: series.SeriesMessageReserve, Bucket: currentInterval, + } + seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{ + Type: series.SeriesMessageMeasure, Bucket: currentInterval, + Measure: utilities.Some[ThroughputDataPoint](throughputDataPoint), + } if generateLoadCtx.Err() != nil { // No need to add additional data points because the controller told us |
