summaryrefslogtreecommitdiff
path: root/rpm
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2023-07-10 13:45:50 -0400
committerWill Hawkins <[email protected]>2023-07-10 13:45:50 -0400
commit78d574a74665c8bc062c26755c80a8b524bce347 (patch)
tree7ad65f0052defaea63acb2f3445be00ef97e24d6 /rpm
parentfe17152a507bbf94a11cca7f49a51cbae9c0d67b (diff)
[Feature] Major update: Track measurements that may be delayed
Among other major feature additions, this version of the client tracks any measurements that may be long delayed and considers their presence or absence as part of a stability measurement. This version of the client also more closely tracks the spec. In particular, it performs a sinle-sided trimmed mean rather than a double-sided trimmed mean. Signed-off-by: Will Hawkins <[email protected]>
Diffstat (limited to 'rpm')
-rw-r--r--rpm/calculations.go62
-rw-r--r--rpm/rpm.go37
2 files changed, 71 insertions, 28 deletions
diff --git a/rpm/calculations.go b/rpm/calculations.go
index 5387aa7..4b61413 100644
--- a/rpm/calculations.go
+++ b/rpm/calculations.go
@@ -17,40 +17,60 @@ package rpm
import (
"fmt"
- "github.com/network-quality/goresponsiveness/ms"
+ "github.com/network-quality/goresponsiveness/series"
+ "github.com/network-quality/goresponsiveness/utilities"
+ "golang.org/x/exp/constraints"
)
-type Rpm struct {
+type Rpm[Data utilities.Number] struct {
SelfRttsTotal int
ForeignRttsTotal int
SelfRttsTrimmed int
ForeignRttsTrimmed int
- SelfProbeRttPN float64
- ForeignProbeRttPN float64
+ SelfProbeRttPN Data
+ ForeignProbeRttPN Data
SelfProbeRttMean float64
ForeignProbeRttMean float64
PNRpm float64
MeanRpm float64
}
-func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.MathematicalSeries[float64], trimming uint, percentile int) Rpm {
- // First, let's do a double-sided trim of the top/bottom 10% of our measurements.
- selfRttsTotalCount := selfRtts.Len()
- foreignRttsTotalCount := foreignRtts.Len()
+func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered](
+ selfRtts series.WindowSeries[Data, Bucket], aggregatedForeignRtts series.WindowSeries[Data, Bucket], trimming uint, percentile int,
+) Rpm[Data] {
+ // There may be more than one round trip accumulated together. If that is the case,
+ // we will blow them apart in to three separate measurements and each one will just
+ // be 1 / 3.
+ foreignRtts := series.NewWindowSeries[Data, int](series.Forever, 0)
+ foreignBuckets := 0
+ for _, v := range aggregatedForeignRtts.GetValues() {
+ if utilities.IsSome(v) {
+ v := utilities.GetSome(v)
+ foreignRtts.Reserve(foreignBuckets)
+ foreignRtts.Reserve(foreignBuckets + 1)
+ foreignRtts.Reserve(foreignBuckets + 2)
+ foreignRtts.Fill(foreignBuckets, v/3)
+ foreignRtts.Fill(foreignBuckets+1, v/3)
+ foreignRtts.Fill(foreignBuckets+2, v/3)
+ foreignBuckets += 3
+ }
+ }
- selfRttsTrimmed := selfRtts.DoubleSidedTrim(trimming)
- foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(trimming)
+ // First, let's do a double-sided trim of the top/bottom 10% of our measurements.
+ selfRttsTotalCount, _ := selfRtts.Count()
+ foreignRttsTotalCount, _ := foreignRtts.Count()
- selfRttsTrimmedCount := selfRttsTrimmed.Len()
- foreignRttsTrimmedCount := foreignRttsTrimmed.Len()
+ _, selfProbeRoundTripTimeMean, selfRttsTrimmed :=
+ series.TrimmedMean(selfRtts, int(trimming))
+ _, foreignProbeRoundTripTimeMean, foreignRttsTrimmed :=
+ series.TrimmedMean(foreignRtts, int(trimming))
- // Then, let's take the mean of those ...
- selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage()
- foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage()
+ selfRttsTrimmedCount := len(selfRttsTrimmed)
+ foreignRttsTrimmedCount := len(foreignRttsTrimmed)
// Second, let's do the P90 calculations.
- selfProbeRoundTripTimePN := selfRtts.Percentile(percentile)
- foreignProbeRoundTripTimePN := foreignRtts.Percentile(percentile)
+ _, selfProbeRoundTripTimePN := series.Percentile(selfRtts, percentile)
+ _, foreignProbeRoundTripTimePN := series.Percentile(foreignRtts, percentile)
// Note: The specification indicates that we want to calculate the foreign probes as such:
// 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign
@@ -62,7 +82,7 @@ func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.Mathem
pnRpm := 60.0 / (float64(selfProbeRoundTripTimePN+foreignProbeRoundTripTimePN) / 2.0)
meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0)
- return Rpm{
+ return Rpm[Data]{
SelfRttsTotal: selfRttsTotalCount, ForeignRttsTotal: foreignRttsTotalCount,
SelfRttsTrimmed: selfRttsTrimmedCount, ForeignRttsTrimmed: foreignRttsTrimmedCount,
SelfProbeRttPN: selfProbeRoundTripTimePN, ForeignProbeRttPN: foreignProbeRoundTripTimePN,
@@ -71,14 +91,14 @@ func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.Mathem
}
}
-func (rpm *Rpm) ToString() string {
+func (rpm *Rpm[Data]) ToString() string {
return fmt.Sprintf(
`Total Self Probes: %d
Total Foreign Probes: %d
Trimmed Self Probes Count: %d
Trimmed Foreign Probes Count: %d
-P90 Self RTT: %f
-P90 Foreign RTT: %f
+P90 Self RTT: %v
+P90 Foreign RTT: %v
Trimmed Mean Self RTT: %f
Trimmed Mean Foreign RTT: %f
`,
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 23ed9f4..b1b8ba4 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -29,6 +29,7 @@ import (
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
"github.com/network-quality/goresponsiveness/probe"
+ "github.com/network-quality/goresponsiveness/series"
"github.com/network-quality/goresponsiveness/utilities"
)
@@ -84,6 +85,11 @@ type SelfDataCollectionResult struct {
LoggingContinuation func()
}
+type ResponsivenessProbeResult struct {
+ Foreign *probe.ProbeDataPoint
+ Self *probe.ProbeDataPoint
+}
+
func ResponsivenessProber(
proberCtx context.Context,
networkActivityCtx context.Context,
@@ -95,7 +101,7 @@ func ResponsivenessProber(
keyLogger io.Writer,
captureExtendedStats bool,
debugging *debug.DebugWithPrefix,
-) (dataPoints chan utilities.Pair[*probe.ProbeDataPoint]) {
+) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, uint]) {
if debug.IsDebug(debugging.Level) {
fmt.Printf(
"(%s) Starting to collect responsiveness information at an interval of %v!\n",
@@ -106,7 +112,7 @@ func ResponsivenessProber(
// Make a channel to send back all the generated data points
// when we are probing.
- dataPoints = make(chan utilities.Pair[*probe.ProbeDataPoint])
+ dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, uint])
go func() {
wg := sync.WaitGroup{}
@@ -141,6 +147,11 @@ func ResponsivenessProber(
)
}
+ dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{
+ Type: series.SeriesMessageReserve, Bucket: probeCount,
+ Measure: utilities.None[ResponsivenessProbeResult](),
+ }
+
// The presence of a custom TLSClientConfig in a *generic* `transport`
// means that go will default to HTTP/1.1 and cowardly avoid HTTP/2:
// https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278
@@ -262,8 +273,13 @@ func ResponsivenessProber(
dataPointsLock.Lock()
// Now we have our four data points (three in the foreign probe data point and one in the self probe data point)
if dataPoints != nil {
- dataPoints <- utilities.Pair[*probe.ProbeDataPoint]{
- First: foreignProbeDataPoint, Second: selfProbeDataPoint,
+ measurement := ResponsivenessProbeResult{
+ Foreign: foreignProbeDataPoint, Self: selfProbeDataPoint,
+ }
+
+ dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{
+ Type: series.SeriesMessageMeasure, Bucket: probeCount,
+ Measure: utilities.Some[ResponsivenessProbeResult](measurement),
}
}
dataPointsLock.Unlock()
@@ -300,8 +316,8 @@ func LoadGenerator(
mnp int,
captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections?
debugging *debug.DebugWithPrefix, // How can we forget debugging?
-) (stabilizerCommunicationChannel chan ThroughputDataPoint) { // Send back all the instantaneous throughputs that we generate.
- stabilizerCommunicationChannel = make(chan ThroughputDataPoint)
+) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, uint64]) { // Send back all the instantaneous throughputs that we generate.
+ seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, uint64])
go func() {
flowsCreated := uint64(0)
@@ -454,7 +470,14 @@ func LoadGenerator(
len(*loadGeneratingConnectionsCollection.LGCs),
granularThroughputDatapoints,
}
- stabilizerCommunicationChannel <- throughputDataPoint
+
+ seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{
+ Type: series.SeriesMessageReserve, Bucket: currentInterval,
+ }
+ seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{
+ Type: series.SeriesMessageMeasure, Bucket: currentInterval,
+ Measure: utilities.Some[ThroughputDataPoint](throughputDataPoint),
+ }
if generateLoadCtx.Err() != nil {
// No need to add additional data points because the controller told us