summaryrefslogtreecommitdiff
path: root/rpm
diff options
context:
space:
mode:
Diffstat (limited to 'rpm')
-rw-r--r--rpm/calculations.go62
-rw-r--r--rpm/rpm.go37
2 files changed, 71 insertions, 28 deletions
diff --git a/rpm/calculations.go b/rpm/calculations.go
index 5387aa7..4b61413 100644
--- a/rpm/calculations.go
+++ b/rpm/calculations.go
@@ -17,40 +17,60 @@ package rpm
import (
"fmt"
- "github.com/network-quality/goresponsiveness/ms"
+ "github.com/network-quality/goresponsiveness/series"
+ "github.com/network-quality/goresponsiveness/utilities"
+ "golang.org/x/exp/constraints"
)
-type Rpm struct {
+type Rpm[Data utilities.Number] struct {
SelfRttsTotal int
ForeignRttsTotal int
SelfRttsTrimmed int
ForeignRttsTrimmed int
- SelfProbeRttPN float64
- ForeignProbeRttPN float64
+ SelfProbeRttPN Data
+ ForeignProbeRttPN Data
SelfProbeRttMean float64
ForeignProbeRttMean float64
PNRpm float64
MeanRpm float64
}
-func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.MathematicalSeries[float64], trimming uint, percentile int) Rpm {
- // First, let's do a double-sided trim of the top/bottom 10% of our measurements.
- selfRttsTotalCount := selfRtts.Len()
- foreignRttsTotalCount := foreignRtts.Len()
+func CalculateRpm[Data utilities.Number, Bucket constraints.Ordered](
+ selfRtts series.WindowSeries[Data, Bucket], aggregatedForeignRtts series.WindowSeries[Data, Bucket], trimming uint, percentile int,
+) Rpm[Data] {
+ // There may be more than one round trip accumulated together. If that is the case,
+ // we will blow them apart in to three separate measurements and each one will just
+ // be 1 / 3.
+ foreignRtts := series.NewWindowSeries[Data, int](series.Forever, 0)
+ foreignBuckets := 0
+ for _, v := range aggregatedForeignRtts.GetValues() {
+ if utilities.IsSome(v) {
+ v := utilities.GetSome(v)
+ foreignRtts.Reserve(foreignBuckets)
+ foreignRtts.Reserve(foreignBuckets + 1)
+ foreignRtts.Reserve(foreignBuckets + 2)
+ foreignRtts.Fill(foreignBuckets, v/3)
+ foreignRtts.Fill(foreignBuckets+1, v/3)
+ foreignRtts.Fill(foreignBuckets+2, v/3)
+ foreignBuckets += 3
+ }
+ }
- selfRttsTrimmed := selfRtts.DoubleSidedTrim(trimming)
- foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(trimming)
+ // First, let's do a double-sided trim of the top/bottom 10% of our measurements.
+ selfRttsTotalCount, _ := selfRtts.Count()
+ foreignRttsTotalCount, _ := foreignRtts.Count()
- selfRttsTrimmedCount := selfRttsTrimmed.Len()
- foreignRttsTrimmedCount := foreignRttsTrimmed.Len()
+ _, selfProbeRoundTripTimeMean, selfRttsTrimmed :=
+ series.TrimmedMean(selfRtts, int(trimming))
+ _, foreignProbeRoundTripTimeMean, foreignRttsTrimmed :=
+ series.TrimmedMean(foreignRtts, int(trimming))
- // Then, let's take the mean of those ...
- selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage()
- foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage()
+ selfRttsTrimmedCount := len(selfRttsTrimmed)
+ foreignRttsTrimmedCount := len(foreignRttsTrimmed)
// Second, let's do the P90 calculations.
- selfProbeRoundTripTimePN := selfRtts.Percentile(percentile)
- foreignProbeRoundTripTimePN := foreignRtts.Percentile(percentile)
+ _, selfProbeRoundTripTimePN := series.Percentile(selfRtts, percentile)
+ _, foreignProbeRoundTripTimePN := series.Percentile(foreignRtts, percentile)
// Note: The specification indicates that we want to calculate the foreign probes as such:
// 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign
@@ -62,7 +82,7 @@ func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.Mathem
pnRpm := 60.0 / (float64(selfProbeRoundTripTimePN+foreignProbeRoundTripTimePN) / 2.0)
meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0)
- return Rpm{
+ return Rpm[Data]{
SelfRttsTotal: selfRttsTotalCount, ForeignRttsTotal: foreignRttsTotalCount,
SelfRttsTrimmed: selfRttsTrimmedCount, ForeignRttsTrimmed: foreignRttsTrimmedCount,
SelfProbeRttPN: selfProbeRoundTripTimePN, ForeignProbeRttPN: foreignProbeRoundTripTimePN,
@@ -71,14 +91,14 @@ func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.Mathem
}
}
-func (rpm *Rpm) ToString() string {
+func (rpm *Rpm[Data]) ToString() string {
return fmt.Sprintf(
`Total Self Probes: %d
Total Foreign Probes: %d
Trimmed Self Probes Count: %d
Trimmed Foreign Probes Count: %d
-P90 Self RTT: %f
-P90 Foreign RTT: %f
+P90 Self RTT: %v
+P90 Foreign RTT: %v
Trimmed Mean Self RTT: %f
Trimmed Mean Foreign RTT: %f
`,
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 23ed9f4..b1b8ba4 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -29,6 +29,7 @@ import (
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
"github.com/network-quality/goresponsiveness/probe"
+ "github.com/network-quality/goresponsiveness/series"
"github.com/network-quality/goresponsiveness/utilities"
)
@@ -84,6 +85,11 @@ type SelfDataCollectionResult struct {
LoggingContinuation func()
}
+type ResponsivenessProbeResult struct {
+ Foreign *probe.ProbeDataPoint
+ Self *probe.ProbeDataPoint
+}
+
func ResponsivenessProber(
proberCtx context.Context,
networkActivityCtx context.Context,
@@ -95,7 +101,7 @@ func ResponsivenessProber(
keyLogger io.Writer,
captureExtendedStats bool,
debugging *debug.DebugWithPrefix,
-) (dataPoints chan utilities.Pair[*probe.ProbeDataPoint]) {
+) (dataPoints chan series.SeriesMessage[ResponsivenessProbeResult, uint]) {
if debug.IsDebug(debugging.Level) {
fmt.Printf(
"(%s) Starting to collect responsiveness information at an interval of %v!\n",
@@ -106,7 +112,7 @@ func ResponsivenessProber(
// Make a channel to send back all the generated data points
// when we are probing.
- dataPoints = make(chan utilities.Pair[*probe.ProbeDataPoint])
+ dataPoints = make(chan series.SeriesMessage[ResponsivenessProbeResult, uint])
go func() {
wg := sync.WaitGroup{}
@@ -141,6 +147,11 @@ func ResponsivenessProber(
)
}
+ dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{
+ Type: series.SeriesMessageReserve, Bucket: probeCount,
+ Measure: utilities.None[ResponsivenessProbeResult](),
+ }
+
// The presence of a custom TLSClientConfig in a *generic* `transport`
// means that go will default to HTTP/1.1 and cowardly avoid HTTP/2:
// https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278
@@ -262,8 +273,13 @@ func ResponsivenessProber(
dataPointsLock.Lock()
// Now we have our four data points (three in the foreign probe data point and one in the self probe data point)
if dataPoints != nil {
- dataPoints <- utilities.Pair[*probe.ProbeDataPoint]{
- First: foreignProbeDataPoint, Second: selfProbeDataPoint,
+ measurement := ResponsivenessProbeResult{
+ Foreign: foreignProbeDataPoint, Self: selfProbeDataPoint,
+ }
+
+ dataPoints <- series.SeriesMessage[ResponsivenessProbeResult, uint]{
+ Type: series.SeriesMessageMeasure, Bucket: probeCount,
+ Measure: utilities.Some[ResponsivenessProbeResult](measurement),
}
}
dataPointsLock.Unlock()
@@ -300,8 +316,8 @@ func LoadGenerator(
mnp int,
captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections?
debugging *debug.DebugWithPrefix, // How can we forget debugging?
-) (stabilizerCommunicationChannel chan ThroughputDataPoint) { // Send back all the instantaneous throughputs that we generate.
- stabilizerCommunicationChannel = make(chan ThroughputDataPoint)
+) (seriesCommunicationChannel chan series.SeriesMessage[ThroughputDataPoint, uint64]) { // Send back all the instantaneous throughputs that we generate.
+ seriesCommunicationChannel = make(chan series.SeriesMessage[ThroughputDataPoint, uint64])
go func() {
flowsCreated := uint64(0)
@@ -454,7 +470,14 @@ func LoadGenerator(
len(*loadGeneratingConnectionsCollection.LGCs),
granularThroughputDatapoints,
}
- stabilizerCommunicationChannel <- throughputDataPoint
+
+ seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{
+ Type: series.SeriesMessageReserve, Bucket: currentInterval,
+ }
+ seriesCommunicationChannel <- series.SeriesMessage[ThroughputDataPoint, uint64]{
+ Type: series.SeriesMessageMeasure, Bucket: currentInterval,
+ Measure: utilities.Some[ThroughputDataPoint](throughputDataPoint),
+ }
if generateLoadCtx.Err() != nil {
// No need to add additional data points because the controller told us