diff options
| author | Will Hawkins <[email protected]> | 2023-07-10 13:45:50 -0400 |
|---|---|---|
| committer | Will Hawkins <[email protected]> | 2023-07-10 13:45:50 -0400 |
| commit | 78d574a74665c8bc062c26755c80a8b524bce347 (patch) | |
| tree | 7ad65f0052defaea63acb2f3445be00ef97e24d6 /rpm | |
| parent | fe17152a507bbf94a11cca7f49a51cbae9c0d67b (diff) | |
[Feature] Major update: Track measurements that may be delayed
Among other major feature additions, this version of the client tracks
any measurements that may be long delayed and considers their presence
or absence as part of a stability measurement.
This version of the client also more closely tracks the spec. In
particular, it performs a sinle-sided trimmed mean rather than a
double-sided trimmed mean.
Signed-off-by: Will Hawkins <[email protected]>
Diffstat (limited to 'rpm')
| -rw-r--r-- | rpm/calculations.go | 62 | ||||
| -rw-r--r-- | rpm/rpm.go | 37 |
2 files changed, 71 insertions, 28 deletions
diff --git a/rpm/calculations.go b/rpm/calculations.go index 5387aa7..4b61413 100644 --- a/rpm/calculations.go +++ b/rpm/calculations.go @@ -17,40 +17,60 @@ package rpm import ( "fmt" - "github.com/network-quality/goresponsiveness/ms" + "github.com/network-quality/goresponsiveness/series" + "github.com/network-quality/goresponsiveness/utilities" + "golang.org/x/exp/constraints" ) -type Rpm struct { +type Rpm[Data utilities.Number] struct { SelfRttsTotal int ForeignRttsTotal int SelfRttsTrimmed int ForeignRttsTrimmed int - SelfProbeRttPN float64 - ForeignProbeRttPN float64 + SelfProbeRttPN Data + ForeignProbeRttPN Data SelfProbeRttMean float64 ForeignProbeRttMean float64 PNRpm float64 MeanRpm float64 } -func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.MathematicalSeries[float64], trimming uint, percentile int) Rpm { - // First, let's do a double-sided trim of the top/bottom 10% of our measurements. - selfRttsTotalCount := selfRtts.Len() - foreignRttsTotalCount := foreignRtts.Len() +func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered]( + selfRtts series.WindowSeries[Data, Bucket], aggregatedForeignRtts series.WindowSeries[Data, Bucket], trimming uint, percentile int, +) Rpm[Data] { + // There may be more than one round trip accumulated together. If that is the case, + // we will blow them apart in to three separate measurements and each one will just + // be 1 / 3. + foreignRtts := series.NewWindowSeries[Data, int](series.Forever, 0) + foreignBuckets := 0 + for _, v := range aggregatedForeignRtts.GetValues() { + if utilities.IsSome(v) { + v := utilities.GetSome(v) + foreignRtts.Reserve(foreignBuckets) + foreignRtts.Reserve(foreignBuckets + 1) + foreignRtts.Reserve(foreignBuckets + 2) + foreignRtts.Fill(foreignBuckets, v/3) + foreignRtts.Fill(foreignBuckets+1, v/3) + foreignRtts.Fill(foreignBuckets+2, v/3) + foreignBuckets += 3 + } + } - selfRttsTrimmed := selfRtts.DoubleSidedTrim(trimming) - foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(trimming) + // First, let's do a double-sided trim of the top/bottom 10% of our measurements. + selfRttsTotalCount, _ := selfRtts.Count() + foreignRttsTotalCount, _ := foreignRtts.Count() - selfRttsTrimmedCount := selfRttsTrimmed.Len() - foreignRttsTrimmedCount := foreignRttsTrimmed.Len() + _, selfProbeRoundTripTimeMean, selfRttsTrimmed := + series.TrimmedMean(selfRtts, int(trimming)) + _, foreignProbeRoundTripTimeMean, foreignRttsTrimmed := + series.TrimmedMean(foreignRtts, int(trimming)) - // Then, let's take the mean of those ... - selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage() - foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage() + selfRttsTrimmedCount := len(selfRttsTrimmed) + foreignRttsTrimmedCount := len(foreignRttsTrimmed) // Second, let's do the P90 calculations. - selfProbeRoundTripTimePN := selfRtts.Percentile(percentile) - foreignProbeRoundTripTimePN := foreignRtts.Percentile(percentile) + _, selfProbeRoundTripTimePN := series.Percentile(selfRtts, percentile) + _, foreignProbeRoundTripTimePN := series.Percentile(foreignRtts, percentile) // Note: The specification indicates that we want to calculate the foreign probes as such: // 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign @@ -62,7 +82,7 @@ func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.Mathem pnRpm := 60.0 / (float64(selfProbeRoundTripTimePN+foreignProbeRoundTripTimePN) / 2.0) meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0) - return Rpm{ + return Rpm[Data]{ SelfRttsTotal: selfRttsTotalCount, ForeignRttsTotal: foreignRttsTotalCount, SelfRttsTrimmed: selfRttsTrimmedCount, ForeignRttsTrimmed: foreignRttsTrimmedCount, SelfProbeRttPN: selfProbeRoundTripTimePN, ForeignProbeRttPN: foreignProbeRoundTripTimePN, @@ -71,14 +91,14 @@ func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.Mathem } } -func (rpm *Rpm) ToString() string { +func (rpm *Rpm[Data]) ToString() string { return fmt.Sprintf( `Total Self Probes: %d Total Foreign Probes: %d Trimmed Self Probes Count: %d Trimmed Foreign Probes Count: %d -P90 Self RTT: %f -P90 Foreign RTT: %f +P90 Self RTT: %v +P90 Foreign RTT: %v Trimmed Mean Self RTT: %f Trimmed Mean Foreign RTT: %f `, @@ -29,6 +29,7 @@ import ( "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" "github.com/network-quality/goresponsiveness/probe" + "github.com/network-quality/goresponsiveness/series" "github.com/network-quality/goresponsiveness/utilities" ) @@ -84,6 +85,11 @@ type SelfDataCollectionResult struct { LoggingContinuation func() } +type ResponsivenessProbeResult struct { + Foreign *probe.ProbeDataPoint + Self *probe.ProbeDataPoint +} + func ResponsivenessProber( proberCtx context.Context, networkActivityCtx context.Context, @@ -95,7 +101,7 @@ func ResponsivenessProber( keyLogger io.Writer, captureExtendedStats bool, debugging *debug.DebugWithPrefix, -) (dataPoints chan utilities.Pair[*probe.ProbeDataPoint]) { +) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, uint]) { if debug.IsDebug(debugging.Level) { fmt.Printf( "(%s) Starting to collect responsiveness information at an interval of %v!\n", @@ -106,7 +112,7 @@ func ResponsivenessProber( // Make a channel to send back all the generated data points // when we are probing. - dataPoints = make(chan utilities.Pair[*probe.ProbeDataPoint]) + dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, uint]) go func() { wg := sync.WaitGroup{} @@ -141,6 +147,11 @@ func ResponsivenessProber( ) } + dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{ + Type: series.SeriesMessageReserve, Bucket: probeCount, + Measure: utilities.None[ResponsivenessProbeResult](), + } + // The presence of a custom TLSClientConfig in a *generic* `transport` // means that go will default to HTTP/1.1 and cowardly avoid HTTP/2: // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278 @@ -262,8 +273,13 @@ func ResponsivenessProber( dataPointsLock.Lock() // Now we have our four data points (three in the foreign probe data point and one in the self probe data point) if dataPoints != nil { - dataPoints <- utilities.Pair[*probe.ProbeDataPoint]{ - First: foreignProbeDataPoint, Second: selfProbeDataPoint, + measurement := ResponsivenessProbeResult{ + Foreign: foreignProbeDataPoint, Self: selfProbeDataPoint, + } + + dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{ + Type: series.SeriesMessageMeasure, Bucket: probeCount, + Measure: utilities.Some[ResponsivenessProbeResult](measurement), } } dataPointsLock.Unlock() @@ -300,8 +316,8 @@ func LoadGenerator( mnp int, captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections? debugging *debug.DebugWithPrefix, // How can we forget debugging? -) (stabilizerCommunicationChannel chan ThroughputDataPoint) { // Send back all the instantaneous throughputs that we generate. - stabilizerCommunicationChannel = make(chan ThroughputDataPoint) +) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, uint64]) { // Send back all the instantaneous throughputs that we generate. + seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, uint64]) go func() { flowsCreated := uint64(0) @@ -454,7 +470,14 @@ func LoadGenerator( len(*loadGeneratingConnectionsCollection.LGCs), granularThroughputDatapoints, } - stabilizerCommunicationChannel <- throughputDataPoint + + seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{ + Type: series.SeriesMessageReserve, Bucket: currentInterval, + } + seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{ + Type: series.SeriesMessageMeasure, Bucket: currentInterval, + Measure: utilities.Some[ThroughputDataPoint](throughputDataPoint), + } if generateLoadCtx.Err() != nil { // No need to add additional data points because the controller told us |
