summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Hawkins <[email protected]>2023-05-23 17:58:14 -0400
committerWill Hawkins <[email protected]>2023-06-21 09:12:22 -0400
commitec2ccf69d8b08abb03fa3bdb3e7e95ae1862d619 (patch)
tree6b636bdbda82db40da89a2bde213c684542850dc
parent5558f0347baaf6db066314f0eaf82d7fb552b2f7 (diff)
Major Update/Refactor to Support IETF 02
Beginning of a release candidate for support for IETF 02 tag of the responsiveness spec.
-rw-r--r--Makefile2
-rw-r--r--constants/constants.go14
-rw-r--r--lgc/collection.go27
-rw-r--r--lgc/download.go4
-rw-r--r--lgc/lgc.go17
-rw-r--r--lgc/upload.go4
-rw-r--r--ms/ms.go29
-rw-r--r--ms/ms_test.go93
-rw-r--r--networkQuality.go924
-rw-r--r--probe/probe.go51
-rw-r--r--probe/tracer.go6
-rw-r--r--rpm/calculations.go94
-rw-r--r--rpm/parameters.go85
-rw-r--r--rpm/parameters_test.go93
-rw-r--r--rpm/rpm.go324
-rw-r--r--stabilizer/algorithm.go130
-rw-r--r--stabilizer/rev3.go199
-rw-r--r--utilities/utilities.go18
-rw-r--r--utilities/utilities_test.go10
19 files changed, 1271 insertions, 853 deletions
diff --git a/Makefile b/Makefile
index a024bc2..e573dc3 100644
--- a/Makefile
+++ b/Makefile
@@ -6,7 +6,7 @@ all: build test
build:
go build $(LDFLAGS) networkQuality.go
test:
- go test ./timeoutat/ ./traceable/ ./ms/ ./utilities/ ./lgc ./qualityattenuation
+ go test ./timeoutat/ ./traceable/ ./ms/ ./utilities/ ./lgc ./qualityattenuation ./rpm
golines:
find . -name '*.go' -exec ~/go/bin/golines -w {} \;
clean:
diff --git a/constants/constants.go b/constants/constants.go
index 66f7110..7a8d562 100644
--- a/constants/constants.go
+++ b/constants/constants.go
@@ -35,8 +35,6 @@ var (
// The amount of time that the client will cooldown if it is in debug mode.
CooldownPeriod time.Duration = 4 * time.Second
- // The amount of time that we give ourselves to calculate the RPM.
- RPMCalculationTime int = 10
// The default amount of time that a test will take to calculate the RPM.
DefaultTestTime int = 20
@@ -49,3 +47,15 @@ var (
// The default determination of whether to verify server certificates
DefaultInsecureSkipVerify bool = true
)
+
+type SpecParametersCliOptions struct {
+ Mad int
+ Id int
+ Tmp uint
+ Sdt float64
+ Mnp int
+ Mps int
+ Ptc float64
+}
+
+var SpecParameterCliOptionsDefaults = SpecParametersCliOptions{Mad: 4, Id: 1, Tmp: 5, Sdt: 5.0, Mnp: 16, Mps: 100, Ptc: 0.05}
diff --git a/lgc/collection.go b/lgc/collection.go
index 7560186..cd6a87d 100644
--- a/lgc/collection.go
+++ b/lgc/collection.go
@@ -16,6 +16,7 @@ package lgc
import (
"fmt"
+ "math/rand"
"sync"
)
@@ -33,10 +34,10 @@ func (collection *LoadGeneratingConnectionCollection) Get(idx int) (*LoadGenerat
collection.Lock.Unlock()
return nil, fmt.Errorf("collection is unlocked")
}
+ return collection.lockedGet(idx)
+}
- if idx > len(*collection.LGCs) {
- return nil, fmt.Errorf("index too large")
- }
+func (collection *LoadGeneratingConnectionCollection) lockedGet(idx int) (*LoadGeneratingConnection, error) {
return &(*collection.LGCs)[idx], nil
}
@@ -49,6 +50,22 @@ func (collection *LoadGeneratingConnectionCollection) Append(conn LoadGenerating
return nil
}
-func (collection *LoadGeneratingConnectionCollection) Len() int {
- return len(*collection.LGCs)
+func (collection *LoadGeneratingConnectionCollection) Len() (int, error) {
+ if collection.Lock.TryLock() {
+ collection.Lock.Unlock()
+ return -1, fmt.Errorf("collection is unlocked")
+ }
+ return len(*collection.LGCs), nil
+}
+
+func (collection *LoadGeneratingConnectionCollection) GetRandom() (*LoadGeneratingConnection, error) {
+ if collection.Lock.TryLock() {
+ collection.Lock.Unlock()
+ return nil, fmt.Errorf("collection is unlocked")
+ }
+
+ idx := int(rand.Uint32())
+ idx = idx % len(*collection.LGCs)
+
+ return collection.lockedGet(idx)
}
diff --git a/lgc/download.go b/lgc/download.go
index a73cc37..c13a1b1 100644
--- a/lgc/download.go
+++ b/lgc/download.go
@@ -64,6 +64,10 @@ func NewLoadGeneratingConnectionDownload(url string, keyLogger io.Writer, connec
return lgd
}
+func (lgd *LoadGeneratingConnectionDownload) Direction() LgcDirection {
+ return LGC_DOWN
+}
+
func (lgd *LoadGeneratingConnectionDownload) WaitUntilStarted(ctxt context.Context) bool {
conditional := func() bool { return lgd.status != LGC_STATUS_NOT_STARTED }
go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgd.statusWaiter)
diff --git a/lgc/lgc.go b/lgc/lgc.go
index db73efa..3b77015 100644
--- a/lgc/lgc.go
+++ b/lgc/lgc.go
@@ -31,6 +31,23 @@ type LoadGeneratingConnection interface {
ClientId() uint64
Stats() *stats.TraceStats
WaitUntilStarted(context.Context) bool
+ Direction() LgcDirection
+}
+
+type LgcDirection int
+
+const (
+ LGC_DOWN LgcDirection = iota
+ LGC_UP
+)
+
+func (direction LgcDirection) String() string {
+ if direction == LGC_DOWN {
+ return "Download"
+ } else if direction == LGC_UP {
+ return "Upload"
+ }
+ return "Invalid load-generating connection direction"
}
type LgcStatus int
diff --git a/lgc/upload.go b/lgc/upload.go
index 5175fe0..e4518b8 100644
--- a/lgc/upload.go
+++ b/lgc/upload.go
@@ -90,6 +90,10 @@ func (lgu *LoadGeneratingConnectionUpload) Status() LgcStatus {
return lgu.status
}
+func (lgd *LoadGeneratingConnectionUpload) Direction() LgcDirection {
+ return LGC_UP
+}
+
type syntheticCountingReader struct {
n *uint64
ctx context.Context
diff --git a/ms/ms.go b/ms/ms.go
index a13762a..06c7340 100644
--- a/ms/ms.go
+++ b/ms/ms.go
@@ -33,7 +33,7 @@ type MathematicalSeries[T constraints.Float | constraints.Integer] interface {
Len() int
Values() []T
Percentile(int) T
- DoubleSidedTrim(uint32) MathematicalSeries[T]
+ DoubleSidedTrim(uint) MathematicalSeries[T]
Less(int, int) bool
Swap(int, int)
}
@@ -77,7 +77,7 @@ func (ims *InfiniteMathematicalSeries[T]) Less(i, j int) bool {
return ims.elements[i] < ims.elements[j]
}
-func (ims *InfiniteMathematicalSeries[T]) DoubleSidedTrim(percent uint32) MathematicalSeries[T] {
+func (ims *InfiniteMathematicalSeries[T]) DoubleSidedTrim(percent uint) MathematicalSeries[T] {
if percent >= 100 {
panic(
fmt.Sprintf("Cannot perform double-sided trim for an invalid percentage: %d", percent),
@@ -137,7 +137,6 @@ func (ims *InfiniteMathematicalSeries[T]) AllSequentialIncreasesLessThan(
* N.B.: Overflow is possible -- use at your discretion!
*/
func (ims *InfiniteMathematicalSeries[T]) StandardDeviation() (bool, T) {
-
// From https://www.mathsisfun.com/data/standard-deviation-calculator.html
// Yes, for real!
@@ -165,7 +164,7 @@ func (ims *InfiniteMathematicalSeries[T]) StandardDeviation() (bool, T) {
// Finally, the standard deviation is the square root
// of the variance.
sd := T(math.Sqrt(variance))
- //sd := T(variance)
+ // sd := T(variance)
return true, sd
}
@@ -187,14 +186,14 @@ func (ims *InfiniteMathematicalSeries[T]) Percentile(p int) T {
}
type CappedMathematicalSeries[T constraints.Float | constraints.Integer] struct {
- elements_count uint64
+ elements_count uint
elements []T
- index uint64
- divisor *saturating.Saturating[uint64]
+ index uint
+ divisor *saturating.Saturating[uint]
}
func NewCappedMathematicalSeries[T constraints.Float | constraints.Integer](
- instants_count uint64,
+ instants_count uint,
) MathematicalSeries[T] {
return &CappedMathematicalSeries[T]{
elements: make([]T, instants_count),
@@ -221,7 +220,6 @@ func (ma *CappedMathematicalSeries[T]) CalculateAverage() float64 {
func (ma *CappedMathematicalSeries[T]) AllSequentialIncreasesLessThan(
limit float64,
) (_ bool, maximumSequentialIncrease float64) {
-
// If we have not yet accumulated a complete set of intervals,
// this is false.
if ma.divisor.Value() != ma.elements_count {
@@ -233,7 +231,7 @@ func (ma *CappedMathematicalSeries[T]) AllSequentialIncreasesLessThan(
oldestIndex := ma.index
previous := ma.elements[oldestIndex]
maximumSequentialIncrease = 0
- for i := uint64(1); i < ma.elements_count; i++ {
+ for i := uint(1); i < ma.elements_count; i++ {
currentIndex := (oldestIndex + i) % ma.elements_count
current := ma.elements[currentIndex]
percentChange := utilities.SignedPercentDifference(current, previous)
@@ -249,7 +247,6 @@ func (ma *CappedMathematicalSeries[T]) AllSequentialIncreasesLessThan(
* N.B.: Overflow is possible -- use at your discretion!
*/
func (ma *CappedMathematicalSeries[T]) StandardDeviation() (bool, T) {
-
// If we have not yet accumulated a complete set of intervals,
// we are always false.
if ma.divisor.Value() != ma.elements_count {
@@ -283,7 +280,7 @@ func (ma *CappedMathematicalSeries[T]) StandardDeviation() (bool, T) {
// Finally, the standard deviation is the square root
// of the variance.
sd := T(math.Sqrt(variance))
- //sd := T(variance)
+ // sd := T(variance)
return true, sd
}
@@ -312,7 +309,7 @@ func (ma *CappedMathematicalSeries[T]) Values() []T {
}
func (ma *CappedMathematicalSeries[T]) Len() int {
- if uint64(len(ma.elements)) != ma.elements_count {
+ if uint(len(ma.elements)) != ma.elements_count {
panic(
fmt.Sprintf(
"Error: A capped mathematical series' metadata is invalid: the length of its element array/slice does not match element_count! (%v vs %v)",
@@ -346,19 +343,19 @@ func (ims *CappedMathematicalSeries[T]) Less(i, j int) bool {
return ims.elements[i] < ims.elements[j]
}
-func (ims *CappedMathematicalSeries[T]) DoubleSidedTrim(percent uint32) MathematicalSeries[T] {
+func (ims *CappedMathematicalSeries[T]) DoubleSidedTrim(percent uint) MathematicalSeries[T] {
if percent >= 100 {
panic(
fmt.Sprintf("Cannot perform double-sided trim for an invalid percentage: %d", percent),
)
}
- trimmed := &CappedMathematicalSeries[T]{elements_count: uint64(ims.Len())}
+ trimmed := &CappedMathematicalSeries[T]{elements_count: uint(ims.Len())}
trimmed.elements = make([]T, ims.Len())
copy(trimmed.elements, ims.elements)
sort.Sort(trimmed)
- elementsToTrim := uint64(float32(ims.Len()) * ((float32(percent)) / float32(100.0)))
+ elementsToTrim := uint(float32(ims.Len()) * ((float32(percent)) / float32(100.0)))
trimmed.elements = trimmed.elements[elementsToTrim : len(trimmed.elements)-int(elementsToTrim)]
trimmed.elements_count -= (elementsToTrim * 2)
diff --git a/ms/ms_test.go b/ms/ms_test.go
index 533cc7e..34817d0 100644
--- a/ms/ms_test.go
+++ b/ms/ms_test.go
@@ -11,7 +11,7 @@ func Test_InfiniteValues(test *testing.T) {
series := NewInfiniteMathematicalSeries[float64]()
shouldMatch := make([]float64, 0)
previous := float64(1.0)
- for _ = range utilities.Iota(1, 80) {
+ for range utilities.Iota(1, 80) {
previous *= 1.059
series.AddElement(float64(previous))
shouldMatch = append(shouldMatch, previous)
@@ -21,10 +21,11 @@ func Test_InfiniteValues(test *testing.T) {
test.Fatalf("Values() on infinite mathematical series does not work.")
}
}
+
func Test_InfiniteSequentialIncreasesAlwaysLessThan(test *testing.T) {
series := NewInfiniteMathematicalSeries[float64]()
previous := float64(1.0)
- for _ = range utilities.Iota(1, 80) {
+ for range utilities.Iota(1, 80) {
previous *= 1.059
series.AddElement(float64(previous))
}
@@ -35,6 +36,7 @@ func Test_InfiniteSequentialIncreasesAlwaysLessThan(test *testing.T) {
)
}
}
+
func Test_CappedTooFewInstantsSequentialIncreasesLessThanAlwaysFalse(test *testing.T) {
series := NewCappedMathematicalSeries[float64](500)
series.AddElement(0.0)
@@ -51,14 +53,17 @@ func Test_Infinite_degenerate_percentile_too_high(test *testing.T) {
test.Fatalf("(infinite) Series percentile of 101 failed.")
}
}
+
func Test_Infinite_degenerate_percentile_too_low(test *testing.T) {
series := NewInfiniteMathematicalSeries[int]()
if series.Percentile(-1) != 0 {
test.Fatalf("(infinite) Series percentile of -1 failed.")
}
}
+
func Test_Infinite90_percentile(test *testing.T) {
- series := NewInfiniteMathematicalSeries[int]()
+ var expected int64 = 10
+ series := NewInfiniteMathematicalSeries[int64]()
series.AddElement(10)
series.AddElement(9)
series.AddElement(8)
@@ -70,15 +75,16 @@ func Test_Infinite90_percentile(test *testing.T) {
series.AddElement(2)
series.AddElement(1)
- if series.Percentile(90) != 10 {
+ if series.Percentile(90) != expected {
test.Fatalf(
- "(infinite) Series 90th percentile of 0 ... 10 failed: Expected 10 got %v.",
+ "(infinite) Series 90th percentile of 0 ... 10 failed: Expected: %v; Actual: %v.", expected,
series.Percentile(90),
)
}
}
func Test_Infinite90_percentile_reversed(test *testing.T) {
+ var expected int64 = 10
series := NewInfiniteMathematicalSeries[int64]()
series.AddElement(1)
series.AddElement(2)
@@ -91,15 +97,16 @@ func Test_Infinite90_percentile_reversed(test *testing.T) {
series.AddElement(9)
series.AddElement(10)
- if series.Percentile(90) != 10 {
+ if series.Percentile(90) != expected {
test.Fatalf(
- "(infinite) Series 90th percentile of 0 ... 10 failed: Expected 10 got %v.",
+ "(infinite) Series 90th percentile of 0 ... 10 failed: Expected %v; Actual: %v.", expected,
series.Percentile(90),
)
}
}
func Test_Infinite50_percentile_jumbled(test *testing.T) {
+ var expected int64 = 15
series := NewInfiniteMathematicalSeries[int64]()
series.AddElement(7)
series.AddElement(2)
@@ -112,15 +119,16 @@ func Test_Infinite50_percentile_jumbled(test *testing.T) {
series.AddElement(11)
series.AddElement(12)
- if series.Percentile(50) != 15 {
+ if series.Percentile(50) != expected {
test.Fatalf(
- "(infinite) Series 50 percentile of a jumble of numbers failed: Expected 15 got %v.",
+ "(infinite) Series 50 percentile of a jumble of numbers failed: Expected %v; Actual: %v.", expected,
series.Percentile(50),
)
}
}
func Test_InfiniteDoubleSidedTrimmedMean_jumbled(test *testing.T) {
+ expected := 16
series := NewInfiniteMathematicalSeries[int64]()
series.AddElement(7)
series.AddElement(2)
@@ -145,10 +153,10 @@ func Test_InfiniteDoubleSidedTrimmedMean_jumbled(test *testing.T) {
trimmed := series.DoubleSidedTrim(10)
- if trimmed.Len() != 16 {
+ if trimmed.Len() != expected {
test.Fatalf(
"Capped series is not of the proper size. Expected %v and got %v",
- 16,
+ expected,
trimmed.Len(),
)
}
@@ -165,7 +173,7 @@ func Test_InfiniteDoubleSidedTrimmedMean_jumbled(test *testing.T) {
func Test_CappedSequentialIncreasesAlwaysLessThan(test *testing.T) {
series := NewCappedMathematicalSeries[float64](40)
previous := float64(1.0)
- for _ = range utilities.Iota(1, 80) {
+ for range utilities.Iota(1, 80) {
previous *= 1.059
series.AddElement(float64(previous))
}
@@ -221,16 +229,41 @@ func Test_CappedSequentialIncreasesAlwaysLessThanWithWraparoundInverse(test *tes
}
func Test_CappedStandardDeviationCalculation(test *testing.T) {
+ expected := 2.93
series := NewCappedMathematicalSeries[float64](5)
// 5.7, 1.0, 8.6, 7.4, 2.2
series.AddElement(5.7)
+ series.AddElement(5.7)
+ series.AddElement(5.7)
+ series.AddElement(5.7)
+ series.AddElement(5.7)
+ series.AddElement(5.7)
+ series.AddElement(5.7)
+ series.AddElement(5.7)
+ series.AddElement(5.7)
series.AddElement(1.0)
series.AddElement(8.6)
series.AddElement(7.4)
series.AddElement(2.2)
- if _, sd := series.StandardDeviation(); !utilities.ApproximatelyEqual(2.93, sd, 0.01) {
- test.Fatalf("Standard deviation max calculation failed: %v.", sd)
+ if _, sd := series.StandardDeviation(); !utilities.ApproximatelyEqual(sd, expected, 0.01) {
+ test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd)
+ } else {
+ test.Logf("Standard deviation calculation result: %v", sd)
+ }
+}
+
+func Test_CappedStandardDeviationCalculation2(test *testing.T) {
+ expected := 1.41
+ series := NewCappedMathematicalSeries[float64](5)
+ series.AddElement(8)
+ series.AddElement(9)
+ series.AddElement(10)
+ series.AddElement(11)
+ series.AddElement(12)
+
+ if _, sd := series.StandardDeviation(); !utilities.ApproximatelyEqual(sd, expected, 0.01) {
+ test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd)
} else {
test.Logf("Standard deviation calculation result: %v", sd)
}
@@ -252,6 +285,7 @@ func Test_CappedRotatingValues(test *testing.T) {
test.Fatalf("Adding values does not properly erase earlier values.")
}
}
+
func Test_CappedLen(test *testing.T) {
series := NewCappedMathematicalSeries[int](5)
@@ -275,13 +309,16 @@ func Test_Capped_degenerate_percentile_too_high(test *testing.T) {
test.Fatalf("Series percentile of 101 failed.")
}
}
+
func Test_Capped_degenerate_percentile_too_low(test *testing.T) {
series := NewCappedMathematicalSeries[int](21)
if series.Percentile(-1) != 0 {
test.Fatalf("Series percentile of -1 failed.")
}
}
+
func Test_Capped90_percentile(test *testing.T) {
+ var expected int = 10
series := NewCappedMathematicalSeries[int](10)
series.AddElement(10)
series.AddElement(9)
@@ -294,9 +331,9 @@ func Test_Capped90_percentile(test *testing.T) {
series.AddElement(2)
series.AddElement(1)
- if series.Percentile(90) != 10 {
+ if series.Percentile(90) != expected {
test.Fatalf(
- "Series 90th percentile of 0 ... 10 failed: Expected 10 got %v.",
+ "Series 90th percentile of 0 ... 10 failed: Expected %v got %v.", expected,
series.Percentile(90),
)
}
@@ -324,6 +361,7 @@ func Test_Capped90_percentile_reversed(test *testing.T) {
}
func Test_Capped50_percentile_jumbled(test *testing.T) {
+ var expected int64 = 15
series := NewCappedMathematicalSeries[int64](10)
series.AddElement(7)
series.AddElement(2)
@@ -336,15 +374,16 @@ func Test_Capped50_percentile_jumbled(test *testing.T) {
series.AddElement(11)
series.AddElement(12)
- if series.Percentile(50) != 15 {
+ if series.Percentile(50) != expected {
test.Fatalf(
- "Series 50 percentile of a jumble of numbers failed: Expected 15 got %v.",
+ "Series 50 percentile of a jumble of numbers failed: Expected %v got %v.", expected,
series.Percentile(50),
)
}
}
func Test_CappedDoubleSidedTrimmedMean_jumbled(test *testing.T) {
+ expected := 8
series := NewCappedMathematicalSeries[int64](10)
series.AddElement(7)
series.AddElement(2)
@@ -360,10 +399,10 @@ func Test_CappedDoubleSidedTrimmedMean_jumbled(test *testing.T) {
trimmed := series.DoubleSidedTrim(10)
- if trimmed.Len() != 8 {
+ if trimmed.Len() != expected {
test.Fatalf(
"Capped series is not of the proper size. Expected %v and got %v",
- 8,
+ expected,
trimmed.Len(),
)
}
@@ -376,3 +415,17 @@ func Test_CappedDoubleSidedTrimmedMean_jumbled(test *testing.T) {
prev = v
}
}
+
+func Test_CappedAverage(test *testing.T) {
+ expected := 1.0082230220488836e+08
+ series := NewCappedMathematicalSeries[float64](4)
+ series.AddElement(9.94747772516195e+07)
+ series.AddElement(9.991286984703423e+07)
+ series.AddElement(1.0285437111086299e+08)
+ series.AddElement(1.0104719061003672e+08)
+ if average := series.CalculateAverage(); !utilities.ApproximatelyEqual(average, 0.01, expected) {
+ test.Fatalf(
+ "Expected: %v; Actual: %v.", average, expected,
+ )
+ }
+}
diff --git a/networkQuality.go b/networkQuality.go
index f97b27c..776c0e7 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -22,6 +22,7 @@ import (
"net/url"
"os"
"runtime/pprof"
+ "strings"
"time"
"github.com/network-quality/goresponsiveness/ccw"
@@ -29,6 +30,7 @@ import (
"github.com/network-quality/goresponsiveness/constants"
"github.com/network-quality/goresponsiveness/datalogger"
"github.com/network-quality/goresponsiveness/debug"
+ "github.com/network-quality/goresponsiveness/direction"
"github.com/network-quality/goresponsiveness/extendedstats"
"github.com/network-quality/goresponsiveness/lgc"
"github.com/network-quality/goresponsiveness/ms"
@@ -68,10 +70,46 @@ var (
"Enable debugging.",
)
rpmtimeout = flag.Int(
- "rpmtimeout",
- constants.RPMCalculationTime,
- "Maximum time to spend calculating RPM (i.e., total test time.).",
+ "rpm.timeout",
+ constants.DefaultTestTime,
+ "Maximum time (in seconds) to spend calculating RPM (i.e., total test time.).",
)
+ rpmmad = flag.Int(
+ "rpm.mad",
+ constants.SpecParameterCliOptionsDefaults.Mad,
+ "Moving average distance -- number of intervals considered during stability calculations.",
+ )
+ rpmid = flag.Int(
+ "rpm.id",
+ constants.SpecParameterCliOptionsDefaults.Id,
+ "Duration of the interval between re-evaluating the network conditions (in seconds).",
+ )
+ rpmtmp = flag.Uint(
+ "rpm.tmp",
+ constants.SpecParameterCliOptionsDefaults.Tmp,
+ "Percent of measurements to trim when calculating statistics about network conditions (between 0 and 100).",
+ )
+ rpmsdt = flag.Float64(
+ "rpm.sdt",
+ constants.SpecParameterCliOptionsDefaults.Sdt,
+ "Cutoff in the standard deviation of measured values about network conditions between unstable and stable.",
+ )
+ rpmmnp = flag.Int(
+ "rpm.mnp",
+ constants.SpecParameterCliOptionsDefaults.Mnp,
+ "Maximimum number of parallel connections to establish when attempting to reach working conditions.",
+ )
+ rpmmps = flag.Int(
+ "rpm.mps",
+ constants.SpecParameterCliOptionsDefaults.Mps,
+ "Maximimum number of probes to send per second.",
+ )
+ rpmptc = flag.Float64(
+ "rpm.ptc",
+ constants.SpecParameterCliOptionsDefaults.Ptc,
+ "Percentage of the (discovered) total network capacity that probes are allowed to consume.",
+ )
+
sslKeyFileName = flag.String(
"ssl-key-file",
"",
@@ -97,11 +135,6 @@ var (
"",
"Store granular information about tests results in files with this basename. Time and information type will be appended (before the first .) to create separate log files. Disabled by default.",
)
- probeIntervalTime = flag.Uint(
- "probe-interval-time",
- 100,
- "Time (in ms) between probes (foreign and self).",
- )
connectToAddr = flag.String(
"connect-to",
"",
@@ -132,8 +165,26 @@ func main() {
os.Exit(0)
}
- timeoutDuration := time.Second * time.Duration(*rpmtimeout)
- timeoutAbsoluteTime := time.Now().Add(timeoutDuration)
+ var debugLevel debug.DebugLevel = debug.Error
+
+ if *debugCliFlag {
+ debugLevel = debug.Debug
+ }
+
+ specParameters, err := rpm.SpecParametersFromArguments(*rpmtimeout, *rpmmad, *rpmid,
+ *rpmtmp, *rpmsdt, *rpmmnp, *rpmmps, *rpmptc)
+ if err != nil {
+ fmt.Fprintf(
+ os.Stderr,
+ "Error: There was an error configuring the test with user-supplied parameters: %v\n",
+ err,
+ )
+ os.Exit(1)
+ }
+
+ if debug.IsDebug(debugLevel) {
+ fmt.Printf("Running the test according to the following spec parameters:\n%v\n", specParameters.ToString())
+ }
var configHostPort string
@@ -158,30 +209,14 @@ func main() {
// the others.
operatingCtx, operatingCtxCancel := context.WithCancel(context.Background())
- // The operator contexts. These contexts control the processes that manage
- // network activity but do not control network activity.
-
- uploadLoadGeneratorOperatorCtx, uploadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx)
- downloadLoadGeneratorOperatorCtx, downloadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx)
- proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx)
-
- // This context is used to control the network activity (i.e., it controls all
- // the connections that are open to do load generation and probing). Cancelling this context will close
- // all the network connections that are responsible for generating the load.
- networkActivityCtx, networkActivityCtxCancel := context.WithCancel(operatingCtx)
-
config := &config.Config{
ConnectToAddr: *connectToAddr,
}
- var debugLevel debug.DebugLevel = debug.Error
-
- if *debugCliFlag {
- debugLevel = debug.Debug
- }
if *calculateExtendedStats && !extendedstats.ExtendedStatsAvailable() {
*calculateExtendedStats = false
- fmt.Printf(
+ fmt.Fprintf(
+ os.Stderr,
"Warning: Calculation of extended statistics was requested but is not supported on this platform.\n",
)
}
@@ -223,53 +258,14 @@ func main() {
fmt.Printf("Configuration: %s\n", config)
}
- timeoutChannel := timeoutat.TimeoutAt(
- operatingCtx,
- timeoutAbsoluteTime,
- debugLevel,
- )
- if debug.IsDebug(debugLevel) {
- fmt.Printf("Test will end no later than %v\n", timeoutAbsoluteTime)
- }
-
- // print the banner
- dt := time.Now().UTC()
- fmt.Printf(
- "%s UTC Go Responsiveness to %s...\n",
- dt.Format("01-02-2006 15:04:05"),
- configHostPort,
- )
-
- if len(*profile) != 0 {
- f, err := os.Create(*profile)
- if err != nil {
- fmt.Fprintf(
- os.Stderr,
- "Error: Profiling requested but could not open the log file ( %s ) for writing: %v\n",
- *profile,
- err,
- )
- os.Exit(1)
- }
- pprof.StartCPUProfile(f)
- defer pprof.StopCPUProfile()
- }
- var selfProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] = nil
- var foreignProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] = nil
- var downloadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil
- var uploadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil
- var granularThroughputDataLogger datalogger.DataLogger[rpm.GranularThroughputDataPoint] = nil
+ downloadDirection := direction.Direction{}
+ uploadDirection := direction.Direction{}
// User wants to log data
if *dataLoggerBaseFileName != "" {
var err error = nil
unique := time.Now().UTC().Format("01-02-2006-15-04-05")
- dataLoggerSelfFilename := utilities.FilenameAppend(*dataLoggerBaseFileName, "-self-"+unique)
- dataLoggerForeignFilename := utilities.FilenameAppend(
- *dataLoggerBaseFileName,
- "-foreign-"+unique,
- )
dataLoggerDownloadThroughputFilename := utilities.FilenameAppend(
*dataLoggerBaseFileName,
"-throughput-download-"+unique,
@@ -278,100 +274,158 @@ func main() {
*dataLoggerBaseFileName,
"-throughput-upload-"+unique,
)
- dataLoggerGranularThroughputFilename := utilities.FilenameAppend(
+
+ dataLoggerDownloadGranularThroughputFilename := utilities.FilenameAppend(
*dataLoggerBaseFileName,
- "-throughput-granular-"+unique,
+ "-throughput-download-granular-"+unique,
)
- selfProbeDataLogger, err = datalogger.CreateCSVDataLogger[probe.ProbeDataPoint](
+ dataLoggerUploadGranularThroughputFilename := utilities.FilenameAppend(
+ *dataLoggerBaseFileName,
+ "-throughput-upload-granular-"+unique,
+ )
+
+ dataLoggerSelfFilename := utilities.FilenameAppend(*dataLoggerBaseFileName, "-self-"+unique)
+ dataLoggerForeignFilename := utilities.FilenameAppend(
+ *dataLoggerBaseFileName,
+ "-foreign-"+unique,
+ )
+
+ selfProbeDataLogger, err := datalogger.CreateCSVDataLogger[probe.ProbeDataPoint](
dataLoggerSelfFilename,
)
if err != nil {
- fmt.Printf(
+ fmt.Fprintf(
+ os.Stderr,
"Warning: Could not create the file for storing self probe results (%s). Disabling functionality.\n",
dataLoggerSelfFilename,
)
selfProbeDataLogger = nil
}
+ uploadDirection.SelfProbeDataLogger = selfProbeDataLogger
+ downloadDirection.SelfProbeDataLogger = selfProbeDataLogger
- foreignProbeDataLogger, err = datalogger.CreateCSVDataLogger[probe.ProbeDataPoint](
+ foreignProbeDataLogger, err := datalogger.CreateCSVDataLogger[probe.ProbeDataPoint](
dataLoggerForeignFilename,
)
if err != nil {
- fmt.Printf(
+ fmt.Fprintf(
+ os.Stderr,
"Warning: Could not create the file for storing foreign probe results (%s). Disabling functionality.\n",
dataLoggerForeignFilename,
)
foreignProbeDataLogger = nil
}
+ uploadDirection.ForeignProbeDataLogger = selfProbeDataLogger
+ downloadDirection.ForeignProbeDataLogger = foreignProbeDataLogger
- downloadThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint](
+ downloadDirection.ThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint](
dataLoggerDownloadThroughputFilename,
)
if err != nil {
- fmt.Printf(
+ fmt.Fprintf(
+ os.Stderr,
"Warning: Could not create the file for storing download throughput results (%s). Disabling functionality.\n",
dataLoggerDownloadThroughputFilename,
)
- downloadThroughputDataLogger = nil
+ downloadDirection.ThroughputDataLogger = nil
}
-
- uploadThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint](
+ uploadDirection.ThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint](
dataLoggerUploadThroughputFilename,
)
if err != nil {
- fmt.Printf(
+ fmt.Fprintf(
+ os.Stderr,
"Warning: Could not create the file for storing upload throughput results (%s). Disabling functionality.\n",
dataLoggerUploadThroughputFilename,
)
- uploadThroughputDataLogger = nil
+ uploadDirection.ThroughputDataLogger = nil
}
- granularThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.GranularThroughputDataPoint](
- dataLoggerGranularThroughputFilename,
+ downloadDirection.GranularThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.GranularThroughputDataPoint](
+ dataLoggerDownloadGranularThroughputFilename,
)
if err != nil {
- fmt.Printf(
- "Warning: Could not create the file for storing granular throughput results (%s). Disabling functionality.\n",
- dataLoggerGranularThroughputFilename,
+ fmt.Fprintf(
+ os.Stderr,
+ "Warning: Could not create the file for storing download granular throughput results (%s). Disabling functionality.\n",
+ dataLoggerDownloadGranularThroughputFilename,
+ )
+ downloadDirection.GranularThroughputDataLogger = nil
+ }
+ uploadDirection.GranularThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.GranularThroughputDataPoint](
+ dataLoggerUploadGranularThroughputFilename,
+ )
+ if err != nil {
+ fmt.Fprintf(
+ os.Stderr,
+ "Warning: Could not create the file for storing upload granular throughput results (%s). Disabling functionality.\n",
+ dataLoggerUploadGranularThroughputFilename,
)
- granularThroughputDataLogger = nil
+ uploadDirection.GranularThroughputDataLogger = nil
}
+
}
// If, for some reason, the data loggers are nil, make them Null Data Loggers so that we don't have conditional
// code later.
- if selfProbeDataLogger == nil {
- selfProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]()
+ if downloadDirection.SelfProbeDataLogger == nil {
+ downloadDirection.SelfProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]()
+ }
+ if uploadDirection.SelfProbeDataLogger == nil {
+ uploadDirection.SelfProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]()
}
- if foreignProbeDataLogger == nil {
- foreignProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]()
+
+ if downloadDirection.ForeignProbeDataLogger == nil {
+ downloadDirection.ForeignProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]()
}
- if downloadThroughputDataLogger == nil {
- downloadThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]()
+ if uploadDirection.ForeignProbeDataLogger == nil {
+ uploadDirection.ForeignProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]()
}
- if uploadThroughputDataLogger == nil {
- uploadThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]()
+
+ if downloadDirection.ThroughputDataLogger == nil {
+ downloadDirection.ThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]()
+ }
+ if uploadDirection.ThroughputDataLogger == nil {
+ uploadDirection.ThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]()
+ }
+
+ if downloadDirection.GranularThroughputDataLogger == nil {
+ downloadDirection.GranularThroughputDataLogger =
+ datalogger.CreateNullDataLogger[rpm.GranularThroughputDataPoint]()
}
- if granularThroughputDataLogger == nil {
- granularThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.GranularThroughputDataPoint]()
+ if uploadDirection.GranularThroughputDataLogger == nil {
+ uploadDirection.GranularThroughputDataLogger =
+ datalogger.CreateNullDataLogger[rpm.GranularThroughputDataPoint]()
}
/*
* Create (and then, ironically, name) two anonymous functions that, when invoked,
* will create load-generating connections for upload/download
*/
- generateLgdc := func() lgc.LoadGeneratingConnection {
+ downloadDirection.CreateLgdc = func() lgc.LoadGeneratingConnection {
lgd := lgc.NewLoadGeneratingConnectionDownload(config.Urls.LargeUrl,
sslKeyFileConcurrentWriter, config.ConnectToAddr, *insecureSkipVerify)
return &lgd
}
-
- generateLguc := func() lgc.LoadGeneratingConnection {
+ uploadDirection.CreateLgdc = func() lgc.LoadGeneratingConnection {
lgu := lgc.NewLoadGeneratingConnectionUpload(config.Urls.UploadUrl,
sslKeyFileConcurrentWriter, config.ConnectToAddr, *insecureSkipVerify)
return &lgu
}
+ downloadDirection.DirectionDebugging = debug.NewDebugWithPrefix(debugLevel, "download")
+ downloadDirection.ProbeDebugging = debug.NewDebugWithPrefix(debugLevel, "download probe")
+
+ uploadDirection.DirectionDebugging = debug.NewDebugWithPrefix(debugLevel, "upload")
+ uploadDirection.ProbeDebugging = debug.NewDebugWithPrefix(debugLevel, "upload probe")
+
+ downloadDirection.Lgcc = lgc.NewLoadGeneratingConnectionCollection()
+ uploadDirection.Lgcc = lgc.NewLoadGeneratingConnectionCollection()
+
+ // We do not do tracing on upload connections so there are no extended stats for those connections!
+ uploadDirection.ExtendedStatsEligible = false
+ downloadDirection.ExtendedStatsEligible = true
+
generateSelfProbeConfiguration := func() probe.ProbeConfiguration {
return probe.ProbeConfiguration{
URL: config.Urls.SmallUrl,
@@ -388,421 +442,391 @@ func main() {
}
}
- var downloadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "download")
- var uploadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "upload")
- var combinedProbeDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "combined probe")
-
- downloadLoadGeneratingConnectionCollection := lgc.NewLoadGeneratingConnectionCollection()
- uploadLoadGeneratingConnectionCollection := lgc.NewLoadGeneratingConnectionCollection()
+ downloadDirection.DirectionLabel = "Download"
+ uploadDirection.DirectionLabel = "Upload"
- // TODO: Separate contexts for load generation and data collection. If we do that, if either of the two
- // data collection go routines stops well before the other, they will continue to send probes and we can
- // generate additional information!
+ directions := []*direction.Direction{&downloadDirection, &uploadDirection}
- selfDownProbeConnectionCommunicationChannel, downloadThroughputChannel := rpm.LoadGenerator(
- networkActivityCtx,
- downloadLoadGeneratorOperatorCtx,
- time.Second,
- generateLgdc,
- &downloadLoadGeneratingConnectionCollection,
- *calculateExtendedStats,
- downloadDebugging,
- )
- selfUpProbeConnectionCommunicationChannel, uploadThroughputChannel := rpm.LoadGenerator(
- networkActivityCtx,
- uploadLoadGeneratorOperatorCtx,
- time.Second,
- generateLguc,
- &uploadLoadGeneratingConnectionCollection,
- *calculateExtendedStats,
- uploadDebugging,
+ // print the banner
+ dt := time.Now().UTC()
+ fmt.Printf(
+ "%s UTC Go Responsiveness to %s...\n",
+ dt.Format("01-02-2006 15:04:05"),
+ configHostPort,
)
- // Handles for the first connection that the load-generating go routines (both up and
- // download) open are passed back on the self[Down|Up]ProbeConnectionCommunicationChannel
- // so that we can then start probes on those connections.
- selfDownProbeConnection := <-selfDownProbeConnectionCommunicationChannel
- selfUpProbeConnection := <-selfUpProbeConnectionCommunicationChannel
+ if len(*profile) != 0 {
+ f, err := os.Create(*profile)
+ if err != nil {
+ fmt.Fprintf(
+ os.Stderr,
+ "Error: Profiling requested but could not open the log file ( %s ) for writing: %v\n",
+ *profile,
+ err,
+ )
+ os.Exit(1)
+ }
+ pprof.StartCPUProfile(f)
+ defer pprof.StopCPUProfile()
+ }
+
+ // All tests will accumulate data to these series because it will all matter for RPM calculation!
+ selfRtts := ms.NewInfiniteMathematicalSeries[float64]()
+ foreignRtts := ms.NewInfiniteMathematicalSeries[float64]()
- // The combined prober will handle launching, monitoring, etc of *both* the self and foreign
- // probes.
- probeDataPointsChannel := rpm.CombinedProber(
- proberOperatorCtx,
- networkActivityCtx,
- generateForeignProbeConfiguration,
- generateSelfProbeConfiguration,
- selfDownProbeConnection,
- selfUpProbeConnection,
- time.Millisecond*(time.Duration(*probeIntervalTime)),
- sslKeyFileConcurrentWriter,
- *calculateExtendedStats,
- combinedProbeDebugging,
- )
+ var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil
+ if *printQualityAttenuation {
+ selfRttsQualityAttenuation = qualityattenuation.NewSimpleQualityAttenuation()
+ }
- responsivenessIsStable := false
- downloadThroughputIsStable := false
- uploadThroughputIsStable := false
+ for _, direction := range directions {
- // Test parameters:
- // 1. I: The number of previous instantaneous measurements to consider when generating
- // the so-called instantaneous moving averages.
- // 2. K: The number of instantaneous moving averages to consider when determining stability.
- // 3: S: The standard deviation cutoff used to determine stability among the K preceding
- // moving averages of a measurement.
- // See
+ timeoutDuration := specParameters.TestTimeout
+ timeoutAbsoluteTime := time.Now().Add(timeoutDuration)
- throughputI := constants.InstantaneousThroughputMeasurementCount
- probeI := constants.InstantaneousProbeMeasurementCount
- K := constants.InstantaneousMovingAverageStabilityCount
- S := constants.StabilityStandardDeviation
+ timeoutChannel := timeoutat.TimeoutAt(
+ operatingCtx,
+ timeoutAbsoluteTime,
+ debugLevel,
+ )
+ if debug.IsDebug(debugLevel) {
+ fmt.Printf("%s Test will end no later than %v\n", direction.DirectionLabel, timeoutAbsoluteTime)
+ }
- downloadThroughputStabilizerDebugConfig :=
- debug.NewDebugWithPrefix(debug.Debug, "Download Throughput Stabilizer")
- downloadThroughputStabilizerDebugLevel := debug.Error
- if *debugCliFlag {
- downloadThroughputStabilizerDebugLevel = debug.Debug
- }
- downloadThroughputStabilizer := stabilizer.NewThroughputStabilizer(throughputI, K, S,
- downloadThroughputStabilizerDebugLevel, downloadThroughputStabilizerDebugConfig)
+ throughputCtx, throughputCtxCancel := context.WithCancel(operatingCtx)
+ proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx)
- uploadThroughputStabilizerDebugConfig :=
- debug.NewDebugWithPrefix(debug.Debug, "Upload Throughput Stabilizer")
- uploadThroughputStabilizerDebugLevel := debug.Error
- if *debugCliFlag {
- uploadThroughputStabilizerDebugLevel = debug.Debug
- }
- uploadThroughputStabilizer := stabilizer.NewThroughputStabilizer(throughputI, K, S,
- uploadThroughputStabilizerDebugLevel, uploadThroughputStabilizerDebugConfig)
+ // This context is used to control the network activity (i.e., it controls all
+ // the connections that are open to do load generation and probing). Cancelling this context will close
+ // all the network connections that are responsible for generating the load.
+ networkActivityCtx, networkActivityCtxCancel := context.WithCancel(operatingCtx)
- probeStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, "Probe Stabilizer")
- probeStabilizerDebugLevel := debug.Error
- if *debugCliFlag {
- probeStabilizerDebugLevel = debug.Debug
- }
- probeStabilizer := stabilizer.NewProbeStabilizer(probeI, K, S, probeStabilizerDebugLevel, probeStabilizerDebugConfig)
+ throughputGeneratorCtx, throughputGeneratorCtxCancel := context.WithCancel(throughputCtx)
- selfRtts := ms.NewInfiniteMathematicalSeries[float64]()
- selfRttsQualityAttenuation := qualityattenuation.NewSimpleQualityAttenuation()
- foreignRtts := ms.NewInfiniteMathematicalSeries[float64]()
+ lgStabilizationCommunicationChannel := rpm.LoadGenerator(
+ throughputCtx,
+ networkActivityCtx,
+ throughputGeneratorCtx,
+ specParameters.EvalInterval,
+ direction.CreateLgdc,
+ &direction.Lgcc,
+ specParameters.MaxParallelConns,
+ *calculateExtendedStats,
+ direction.DirectionDebugging,
+ )
- // For later debugging output, record the last throughputs on load-generating connectings
- // and the number of open connections.
- lastUploadThroughputRate := float64(0)
- lastUploadThroughputOpenConnectionCount := int(0)
- lastDownloadThroughputRate := float64(0)
- lastDownloadThroughputOpenConnectionCount := int(0)
+ throughputStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug,
+ fmt.Sprintf("%v Throughput Stabilizer", direction.DirectionLabel))
+ downloadThroughputStabilizerDebugLevel := debug.Error
+ if *debugCliFlag {
+ downloadThroughputStabilizerDebugLevel = debug.Debug
+ }
+ throughputStabilizer := stabilizer.NewStabilizer[float64](
+ uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance, 0, "bytes",
+ downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig)
- // Every time that there is a new measurement, the possibility exists that the measurements become unstable.
- // This allows us to continue pushing until *everything* is stable at the same time.
-timeout:
- for !(responsivenessIsStable && downloadThroughputIsStable && uploadThroughputIsStable) {
- select {
+ responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug,
+ fmt.Sprintf("%v Responsiveness Stabilizer", direction.DirectionLabel))
+ responsivenessStabilizerDebugLevel := debug.Error
+ if *debugCliFlag {
+ responsivenessStabilizerDebugLevel = debug.Debug
+ }
+ responsivenessStabilizer := stabilizer.NewStabilizer[int64](
+ uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance,
+ specParameters.TrimmedMeanPct, "milliseconds",
+ responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig)
- case downloadThroughputMeasurement := <-downloadThroughputChannel:
- {
- downloadThroughputStabilizer.AddMeasurement(downloadThroughputMeasurement)
- downloadThroughputIsStable = downloadThroughputStabilizer.IsStable()
- if *debugCliFlag {
- fmt.Printf(
- "################# Download is instantaneously %s.\n",
- utilities.Conditional(downloadThroughputIsStable, "stable", "unstable"))
- }
- downloadThroughputDataLogger.LogRecord(downloadThroughputMeasurement)
- for i := range downloadThroughputMeasurement.GranularThroughputDataPoints {
- datapoint := downloadThroughputMeasurement.GranularThroughputDataPoints[i]
- datapoint.Direction = "Download"
- granularThroughputDataLogger.LogRecord(datapoint)
- }
+ // For later debugging output, record the last throughputs on load-generating connectings
+ // and the number of open connections.
+ lastThroughputRate := float64(0)
+ lastThroughputOpenConnectionCount := int(0)
- lastDownloadThroughputRate = downloadThroughputMeasurement.Throughput
- lastDownloadThroughputOpenConnectionCount =
- downloadThroughputMeasurement.Connections
- }
+ lg_timeout:
+ for !direction.StableThroughput {
+ select {
+ case throughputMeasurement := <-lgStabilizationCommunicationChannel:
+ {
+ throughputStabilizer.AddMeasurement(
+ throughputMeasurement.Throughput)
+ direction.StableThroughput = throughputStabilizer.IsStable()
+ if *debugCliFlag {
+ fmt.Printf(
+ "################# %v is instantaneously %s.\n", direction.DirectionLabel,
+ utilities.Conditional(direction.StableThroughput, "stable", "unstable"))
+ }
+ direction.ThroughputDataLogger.LogRecord(throughputMeasurement)
+ for i := range throughputMeasurement.GranularThroughputDataPoints {
+ datapoint := throughputMeasurement.GranularThroughputDataPoints[i]
+ datapoint.Direction = "Download"
+ direction.GranularThroughputDataLogger.LogRecord(datapoint)
+ }
+
+ lastThroughputRate = throughputMeasurement.Throughput
+ lastThroughputOpenConnectionCount = throughputMeasurement.Connections
- case uploadThroughputMeasurement := <-uploadThroughputChannel:
- {
- uploadThroughputStabilizer.AddMeasurement(uploadThroughputMeasurement)
- uploadThroughputIsStable = uploadThroughputStabilizer.IsStable()
- if *debugCliFlag {
- fmt.Printf(
- "################# Upload is instantaneously %s.\n",
- utilities.Conditional(uploadThroughputIsStable, "stable", "unstable"))
+ if direction.StableThroughput {
+ throughputGeneratorCtxCancel()
+ }
}
- uploadThroughputDataLogger.LogRecord(uploadThroughputMeasurement)
- for i := range uploadThroughputMeasurement.GranularThroughputDataPoints {
- datapoint := uploadThroughputMeasurement.GranularThroughputDataPoints[i]
- datapoint.Direction = "Upload"
- granularThroughputDataLogger.LogRecord(datapoint)
+ case <-timeoutChannel:
+ {
+ break lg_timeout
}
+ }
+ }
- lastUploadThroughputRate = uploadThroughputMeasurement.Throughput
- lastUploadThroughputOpenConnectionCount = uploadThroughputMeasurement.Connections
+ if direction.StableThroughput {
+ if *debugCliFlag {
+ fmt.Printf("################# Throughput is stable; beginning responsiveness testing.\n")
}
- case probeMeasurement := <-probeDataPointsChannel:
- {
- probeStabilizer.AddMeasurement(probeMeasurement)
+ } else {
+ fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Adding 15 seconds to calculate speculative RPM results.\n")
+ speculativeTimeoutDuration := time.Second * 15
+ speculativeAbsoluteTimeoutTime := time.Now().Add(speculativeTimeoutDuration)
+ timeoutChannel = timeoutat.TimeoutAt(
+ operatingCtx,
+ speculativeAbsoluteTimeoutTime,
+ debugLevel,
+ )
+ }
- // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy
- // is *actually* important, but it can't hurt?
- responsivenessIsStable = probeStabilizer.IsStable()
+ perDirectionSelfRtts := ms.NewInfiniteMathematicalSeries[float64]()
+ perDirectionForeignRtts := ms.NewInfiniteMathematicalSeries[float64]()
- if *debugCliFlag {
- fmt.Printf(
- "################# Responsiveness is instantaneously %s.\n",
- utilities.Conditional(responsivenessIsStable, "stable", "unstable"))
- }
- if probeMeasurement.Type == probe.Foreign {
+ responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber(
+ proberOperatorCtx,
+ networkActivityCtx,
+ generateForeignProbeConfiguration,
+ generateSelfProbeConfiguration,
+ &direction.Lgcc,
+ direction.CreateLgdc().Direction(), // TODO: This could be better!
+ specParameters.ProbeInterval,
+ sslKeyFileConcurrentWriter,
+ *calculateExtendedStats,
+ direction.ProbeDebugging,
+ )
+
+ responsiveness_timeout:
+ for !direction.StableResponsiveness {
+ select {
+ case probeMeasurement := <-responsivenessStabilizationCommunicationChannel:
+ {
+ foreignDataPoint := probeMeasurement.First
+ selfDataPoint := probeMeasurement.Second
+
+ responsivenessStabilizer.AddMeasurement(
+ (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds())
+
+ // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy
+ // is *actually* important, but it can't hurt?
+ direction.StableResponsiveness = responsivenessStabilizer.IsStable()
+
+ if *debugCliFlag {
+ fmt.Printf(
+ "################# Responsiveness is instantaneously %s.\n",
+ utilities.Conditional(direction.StableResponsiveness, "stable", "unstable"))
+ }
// There may be more than one round trip accumulated together. If that is the case,
// we will blow them apart in to three separate measurements and each one will just
// be 1 / measurement.RoundTripCount of the total length.
- for range utilities.Iota(0, int(probeMeasurement.RoundTripCount)) {
- foreignRtts.AddElement(probeMeasurement.Duration.Seconds() /
- float64(probeMeasurement.RoundTripCount))
+ for range utilities.Iota(0, int(foreignDataPoint.RoundTripCount)) {
+ foreignRtts.AddElement(foreignDataPoint.Duration.Seconds() /
+ float64(foreignDataPoint.RoundTripCount))
+ perDirectionForeignRtts.AddElement(foreignDataPoint.Duration.Seconds() /
+ float64(foreignDataPoint.RoundTripCount))
}
- } else if probeMeasurement.Type == probe.SelfDown || probeMeasurement.Type == probe.SelfUp {
- selfRtts.AddElement(probeMeasurement.Duration.Seconds())
- if *printQualityAttenuation {
- selfRttsQualityAttenuation.AddSample(probeMeasurement.Duration.Seconds())
+ selfRtts.AddElement(selfDataPoint.Duration.Seconds())
+ perDirectionSelfRtts.AddElement(selfDataPoint.Duration.Seconds())
+
+ if selfRttsQualityAttenuation != nil {
+ selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds())
}
+
+ direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint)
+ direction.SelfProbeDataLogger.LogRecord(*selfDataPoint)
}
+ case throughputMeasurement := <-lgStabilizationCommunicationChannel:
+ {
+ if *debugCliFlag {
+ fmt.Printf("Adding a throughput measurement.\n")
+ }
+ // There may be more than one round trip accumulated together. If that is the case,
+ direction.ThroughputDataLogger.LogRecord(throughputMeasurement)
+ for i := range throughputMeasurement.GranularThroughputDataPoints {
+ datapoint := throughputMeasurement.GranularThroughputDataPoints[i]
+ datapoint.Direction = direction.DirectionLabel
+ direction.GranularThroughputDataLogger.LogRecord(datapoint)
+ }
+
+ lastThroughputRate = throughputMeasurement.Throughput
+ lastThroughputOpenConnectionCount = throughputMeasurement.Connections
- if probeMeasurement.Type == probe.Foreign {
- foreignProbeDataLogger.LogRecord(probeMeasurement)
- } else if probeMeasurement.Type == probe.SelfDown || probeMeasurement.Type == probe.SelfUp {
- selfProbeDataLogger.LogRecord(probeMeasurement)
}
- }
- case <-timeoutChannel:
- {
- break timeout
+ case <-timeoutChannel:
+ {
+ break responsiveness_timeout
+ }
}
}
- }
-
- // TODO: Reset timeout to RPM timeout stat?
- // Did the test run to stability?
- testRanToStability := (downloadThroughputIsStable && uploadThroughputIsStable && responsivenessIsStable)
+ // Did the test run to stability?
+ testRanToStability := direction.StableThroughput && direction.StableResponsiveness
- if *debugCliFlag {
- fmt.Printf("Stopping all the load generating data generators (stability: %s).\n",
- utilities.Conditional(testRanToStability, "success", "failure"))
- }
-
- /* At this point there are
- 1. Load generators running
- -- uploadLoadGeneratorOperatorCtx
- -- downloadLoadGeneratorOperatorCtx
- 2. Network connections opened by those load generators:
- -- lgNetworkActivityCtx
- 3. Probes
- -- proberCtx
- */
+ if *debugCliFlag {
+ fmt.Printf("Stopping all the load generating data generators (stability: %s).\n",
+ utilities.Conditional(testRanToStability, "success", "failure"))
+ }
- // First, stop the load generator and the probe operators (but *not* the network activity)
- proberOperatorCtxCancel()
- downloadLoadGeneratorOperatorCtxCancel()
- uploadLoadGeneratorOperatorCtxCancel()
+ /* At this point there are
+ 1. Load generators running
+ -- uploadLoadGeneratorOperatorCtx
+ -- downloadLoadGeneratorOperatorCtx
+ 2. Network connections opened by those load generators:
+ -- lgNetworkActivityCtx
+ 3. Probes
+ -- proberCtx
+ */
- // Second, calculate the extended stats (if the user requested)
+ // First, stop the load generator and the probe operators (but *not* the network activity)
+ proberOperatorCtxCancel()
+ throughputCtxCancel()
- extendedStats := extendedstats.AggregateExtendedStats{}
- if *calculateExtendedStats {
- if extendedstats.ExtendedStatsAvailable() {
- func() {
- // Put inside an IIFE so that we can use a defer!
- downloadLoadGeneratingConnectionCollection.Lock.Lock()
- defer downloadLoadGeneratingConnectionCollection.Lock.Unlock()
+ // Second, calculate the extended stats (if the user requested and they are available for the direction)
+ extendedStats := extendedstats.AggregateExtendedStats{}
+ if *calculateExtendedStats && direction.ExtendedStatsEligible {
+ if extendedstats.ExtendedStatsAvailable() {
+ func() {
+ // Put inside an IIFE so that we can use a defer!
+ direction.Lgcc.Lock.Lock()
+ defer direction.Lgcc.Lock.Unlock()
- // Note: We do not trace upload connections!
- for i := 0; i < downloadLoadGeneratingConnectionCollection.Len(); i++ {
- // Assume that extended statistics are available -- the check was done explicitly at
- // program startup if the calculateExtendedStats flag was set by the user on the command line.
- currentLgc, _ := downloadLoadGeneratingConnectionCollection.Get(i)
- if err := extendedStats.IncorporateConnectionStats(
- (*currentLgc).Stats().ConnInfo.Conn); err != nil {
+ // Note: We do not trace upload connections!
+ downloadLgcCount, err := direction.Lgcc.Len()
+ if err != nil {
fmt.Fprintf(
os.Stderr,
- "Warning: Could not add extended stats for the connection: %v\n",
- err,
+ "Warning: Could not calculate the number of download load-generating connections; aborting extended stats preparation.\n",
)
+ return
}
- }
- }()
- } else {
- // TODO: Should we just log here?
- panic("Extended stats are not available but the user requested their calculation.")
+ for i := 0; i < downloadLgcCount; i++ {
+ // Assume that extended statistics are available -- the check was done explicitly at
+ // program startup if the calculateExtendedStats flag was set by the user on the command line.
+ currentLgc, _ := direction.Lgcc.Get(i)
+ if err := extendedStats.IncorporateConnectionStats(
+ (*currentLgc).Stats().ConnInfo.Conn); err != nil {
+ fmt.Fprintf(
+ os.Stderr,
+ "Warning: Could not add extended stats for the connection: %v\n",
+ err,
+ )
+ }
+ }
+ }()
+ } else {
+ // TODO: Should we just log here?
+ panic("Extended stats are not available but the user requested their calculation.")
+ }
}
- }
- // Third, stop the network connections opened by the load generators and probers.
- networkActivityCtxCancel()
+ // Third, stop the network connections opened by the load generators and probers.
+ networkActivityCtxCancel()
- // Finally, stop the world.
- operatingCtxCancel()
-
- // Calculate the RPM
-
- // First, let's do a double-sided trim of the top/bottom 10% of our measurements.
- selfRttsTotalCount := selfRtts.Len()
- foreignRttsTotalCount := foreignRtts.Len()
-
- selfRttsTrimmed := selfRtts.DoubleSidedTrim(10)
- foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(10)
-
- selfRttsTrimmedCount := selfRttsTrimmed.Len()
- foreignRttsTrimmedCount := foreignRttsTrimmed.Len()
-
- // Then, let's take the mean of those ...
- selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage()
- foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage()
+ fmt.Printf(
+ "%v: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
+ direction.DirectionLabel,
+ utilities.ToMbps(lastThroughputRate),
+ utilities.ToMBps(lastThroughputRate),
+ lastThroughputOpenConnectionCount,
+ )
- // Second, let's do the P90 calculations.
- selfProbeRoundTripTimeP90 := selfRtts.Percentile(90)
- foreignProbeRoundTripTimeP90 := foreignRtts.Percentile(90)
+ if *calculateExtendedStats {
+ fmt.Println(extendedStats.Repr())
+ }
+ directionResult := rpm.CalculateRpm(perDirectionSelfRtts, perDirectionForeignRtts, specParameters.TrimmedMeanPct, 90)
+ if *debugCliFlag {
+ fmt.Printf("(%s RPM Calculation stats): %v\n", direction.DirectionLabel, directionResult.ToString())
+ }
- // Note: The specification indicates that we want to calculate the foreign probes as such:
- // 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign
- // where tcp_foreign, tls_foreign, http_foreign are the P90 RTTs for the connection
- // of the tcp, tls and http connections, respectively. However, we cannot break out
- // the individual RTTs so we assume that they are roughly equal.
+ if !testRanToStability {
+ fmt.Printf("Test did not run to stability, these results are estimates:\n")
+ }
- // This is 60 because we measure in seconds not ms
- p90Rpm := 60.0 / (float64(selfProbeRoundTripTimeP90+foreignProbeRoundTripTimeP90) / 2.0)
- meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0)
+ fmt.Printf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, directionResult.PNRpm, 90)
+ fmt.Printf("%s RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel,
+ directionResult.MeanRpm, specParameters.TrimmedMeanPct)
- if *debugCliFlag {
- fmt.Printf(
- `Total Self Probes: %d
-Total Foreign Probes: %d
-Trimmed Self Probes Count: %d
-Trimmed Foreign Probes Count: %d
-P90 Self RTT: %f
-P90 Foreign RTT: %f
-Trimmed Mean Self RTT: %f
-Trimmed Mean Foreign RTT: %f
-`,
- selfRttsTotalCount,
- foreignRttsTotalCount,
- selfRttsTrimmedCount,
- foreignRttsTrimmedCount,
- selfProbeRoundTripTimeP90,
- foreignProbeRoundTripTimeP90,
- selfProbeRoundTripTimeMean,
- foreignProbeRoundTripTimeMean,
- )
- }
+ if len(*prometheusStatsFilename) > 0 {
+ var testStable int
+ if testRanToStability {
+ testStable = 1
+ }
+ var buffer bytes.Buffer
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_test_stable %d\n",
+ strings.ToLower(direction.DirectionLabel), testStable))
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_p90_rpm_value %d\n",
+ strings.ToLower(direction.DirectionLabel), int64(directionResult.PNRpm)))
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_trimmed_rpm_value %d\n",
+ strings.ToLower(direction.DirectionLabel),
+ int64(directionResult.MeanRpm)))
- if *printQualityAttenuation {
- fmt.Println("Quality Attenuation Statistics:")
- fmt.Printf(
- `Number of losses: %d
-Number of samples: %d
-Loss: %f
-Min: %.6f
-Max: %.6f
-Mean: %.6f
-Variance: %.6f
-Standard Deviation: %.6f
-PDV(90): %.6f
-PDV(99): %.6f
-P(90): %.6f
-P(99): %.6f
-`, selfRttsQualityAttenuation.GetNumberOfLosses(),
- selfRttsQualityAttenuation.GetNumberOfSamples(),
- selfRttsQualityAttenuation.GetLossPercentage(),
- selfRttsQualityAttenuation.GetMinimum(),
- selfRttsQualityAttenuation.GetMaximum(),
- selfRttsQualityAttenuation.GetAverage(),
- selfRttsQualityAttenuation.GetVariance(),
- selfRttsQualityAttenuation.GetStandardDeviation(),
- selfRttsQualityAttenuation.GetPDV(90),
- selfRttsQualityAttenuation.GetPDV(99),
- selfRttsQualityAttenuation.GetPercentile(90),
- selfRttsQualityAttenuation.GetPercentile(99))
- }
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_bits_per_second %d\n",
+ strings.ToLower(direction.DirectionLabel), int64(lastThroughputRate)))
+ buffer.WriteString(fmt.Sprintf("networkquality_%v_connections %d\n",
+ strings.ToLower(direction.DirectionLabel),
+ int64(lastThroughputOpenConnectionCount)))
- if !testRanToStability {
- fmt.Printf("Test did not run to stability, these results are estimates:\n")
- }
+ if err := os.WriteFile(*prometheusStatsFilename, buffer.Bytes(), 0o644); err != nil {
+ fmt.Printf("could not write %s: %s", *prometheusStatsFilename, err)
+ os.Exit(1)
+ }
+ }
- fmt.Printf("RPM: %5.0f (P90)\n", p90Rpm)
- fmt.Printf("RPM: %5.0f (Double-Sided 10%% Trimmed Mean)\n", meanRpm)
+ direction.ThroughputDataLogger.Export()
+ if *debugCliFlag {
+ fmt.Printf("Closing the %v throughput data logger.\n", direction.DirectionLabel)
+ }
+ direction.ThroughputDataLogger.Close()
- fmt.Printf(
- "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
- utilities.ToMbps(lastDownloadThroughputRate),
- utilities.ToMBps(lastDownloadThroughputRate),
- lastDownloadThroughputOpenConnectionCount,
- )
- fmt.Printf(
- "Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n",
- utilities.ToMbps(lastUploadThroughputRate),
- utilities.ToMBps(lastUploadThroughputRate),
- lastUploadThroughputOpenConnectionCount,
- )
+ direction.GranularThroughputDataLogger.Export()
+ if *debugCliFlag {
+ fmt.Printf("Closing the %v granular throughput data logger.\n", direction.DirectionLabel)
+ }
+ direction.GranularThroughputDataLogger.Close()
- if *calculateExtendedStats {
- fmt.Println(extendedStats.Repr())
+ if *debugCliFlag {
+ fmt.Printf("In debugging mode, we will cool down between tests.\n")
+ time.Sleep(constants.CooldownPeriod)
+ fmt.Printf("Done cooling down.\n")
+ }
}
- selfProbeDataLogger.Export()
- if *debugCliFlag {
- fmt.Printf("Closing the self data logger.\n")
- }
- selfProbeDataLogger.Close()
+ result := rpm.CalculateRpm(selfRtts, foreignRtts, specParameters.TrimmedMeanPct, 90)
- foreignProbeDataLogger.Export()
if *debugCliFlag {
- fmt.Printf("Closing the foreign data logger.\n")
+ fmt.Printf("(Final RPM Calculation stats): %v\n", result.ToString())
}
- foreignProbeDataLogger.Close()
- downloadThroughputDataLogger.Export()
- if *debugCliFlag {
- fmt.Printf("Closing the download throughput data logger.\n")
- }
- downloadThroughputDataLogger.Close()
+ fmt.Printf("Final RPM: %5.0f (P%d)\n", result.PNRpm, 90)
+ fmt.Printf("Final RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n",
+ result.MeanRpm, specParameters.TrimmedMeanPct)
- uploadThroughputDataLogger.Export()
- if *debugCliFlag {
- fmt.Printf("Closing the upload throughput data logger.\n")
- }
- uploadThroughputDataLogger.Close()
+ // Stop the world.
+ operatingCtxCancel()
- granularThroughputDataLogger.Export()
+ // Note: We do *not* have to export/close the upload *and* download
+ // sides of the self/foreign probe data loggers because they both
+ // refer to the same logger. Closing/exporting one will close/export
+ // the other.
+ uploadDirection.SelfProbeDataLogger.Export()
if *debugCliFlag {
- fmt.Printf("Closing the granular throughput data logger.\n")
+ fmt.Printf("Closing the self data loggers.\n")
}
- granularThroughputDataLogger.Close()
+ uploadDirection.SelfProbeDataLogger.Close()
+ uploadDirection.ForeignProbeDataLogger.Export()
if *debugCliFlag {
- fmt.Printf("In debugging mode, we will cool down.\n")
- time.Sleep(constants.CooldownPeriod)
- fmt.Printf("Done cooling down.\n")
- }
-
- if len(*prometheusStatsFilename) > 0 {
- var testStable int
- if testRanToStability {
- testStable = 1
- }
- var buffer bytes.Buffer
- buffer.WriteString(fmt.Sprintf("networkquality_test_stable %d\n", testStable))
- buffer.WriteString(fmt.Sprintf("networkquality_rpm_value %d\n", int64(p90Rpm)))
- buffer.WriteString(fmt.Sprintf("networkquality_trimmed_rpm_value %d\n",
- int64(meanRpm))) // utilities.ToMbps(lastDownloadThroughputRate),
-
- buffer.WriteString(fmt.Sprintf("networkquality_download_bits_per_second %d\n", int64(lastDownloadThroughputRate)))
- buffer.WriteString(fmt.Sprintf("networkquality_download_connections %d\n",
- int64(lastDownloadThroughputOpenConnectionCount)))
- buffer.WriteString(fmt.Sprintf("networkquality_upload_bits_per_second %d\n", int64(lastUploadThroughputRate)))
- buffer.WriteString(fmt.Sprintf("networkquality_upload_connections %d\n",
- lastUploadThroughputOpenConnectionCount))
-
- if err := os.WriteFile(*prometheusStatsFilename, buffer.Bytes(), 0o644); err != nil {
- fmt.Printf("could not write %s: %s", *prometheusStatsFilename, err)
- os.Exit(1)
- }
+ fmt.Printf("Closing the foreign data loggers.\n")
}
+ uploadDirection.SelfProbeDataLogger.Close()
}
diff --git a/probe/probe.go b/probe/probe.go
index 3a38e3f..fa19411 100644
--- a/probe/probe.go
+++ b/probe/probe.go
@@ -21,7 +21,6 @@ import (
"net/http"
"net/http/httptrace"
"os"
- "sync"
"time"
"github.com/network-quality/goresponsiveness/debug"
@@ -72,28 +71,25 @@ func (pt ProbeType) Value() string {
return "Foreign"
}
+func (pt ProbeType) IsSelf() bool {
+ return pt == SelfUp || pt == SelfDown
+}
+
func Probe(
managingCtx context.Context,
- waitGroup *sync.WaitGroup,
client *http.Client,
- lgc lgc.LoadGeneratingConnection,
probeUrl string,
probeHost string, // optional: for use with a test_endpoint
+ probeDirection lgc.LgcDirection,
probeType ProbeType,
- result *chan ProbeDataPoint,
+ probeId uint,
captureExtendedStats bool,
debugging *debug.DebugWithPrefix,
-) error {
- if waitGroup != nil {
- waitGroup.Add(1)
- defer waitGroup.Done()
- }
-
+) (*ProbeDataPoint, error) {
if client == nil {
- return fmt.Errorf("cannot start a probe with a nil client")
+ return nil, fmt.Errorf("cannot start a probe with a nil client")
}
- probeId := utilities.GenerateUniqueId()
probeTracer := NewProbeTracer(client, probeType, probeId, debugging)
time_before_probe := time.Now()
probe_req, err := http.NewRequestWithContext(
@@ -103,7 +99,7 @@ func Probe(
nil,
)
if err != nil {
- return err
+ return nil, err
}
// Used to disable compression
@@ -112,18 +108,18 @@ func Probe(
probe_resp, err := client.Do(probe_req)
if err != nil {
- return err
+ return nil, err
}
// Header.Get returns "" when not set
if probe_resp.Header.Get("Content-Encoding") != "" {
- return fmt.Errorf("Content-Encoding header was set (compression not allowed)")
+ return nil, fmt.Errorf("Content-Encoding header was set (compression not allowed)")
}
// TODO: Make this interruptable somehow by using _ctx_.
_, err = io.ReadAll(probe_resp.Body)
if err != nil {
- return err
+ return nil, err
}
time_after_probe := time.Now()
@@ -144,16 +140,13 @@ func Probe(
) + probeTracer.GetTCPDelta()
// We must have reused the connection if we are a self probe!
- if (probeType == SelfUp || probeType == SelfDown) && !probeTracer.stats.ConnectionReused {
- if !utilities.IsInterfaceNil(lgc) {
- fmt.Fprintf(os.Stderr,
- "(%s) (%s Probe %v) Probe should have reused a connection, but it didn't (connection status: %v)!\n",
- debugging.Prefix,
- probeType.Value(),
- probeId,
- lgc.Status(),
- )
- }
+ if probeType.IsSelf() && !probeTracer.stats.ConnectionReused {
+ fmt.Fprintf(os.Stderr,
+ "(%s) (%s Probe %v) Probe should have reused a connection, but it didn't!\n",
+ debugging.Prefix,
+ probeType.Value(),
+ probeId,
+ )
panic(!probeTracer.stats.ConnectionReused)
}
@@ -199,14 +192,12 @@ func Probe(
fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err)
}
}
- dataPoint := ProbeDataPoint{
+ return &ProbeDataPoint{
Time: time_before_probe,
RoundTripCount: uint64(roundTripCount),
Duration: totalDelay,
TCPRtt: tcpRtt,
TCPCwnd: tcpCwnd,
Type: probeType,
- }
- *result <- dataPoint
- return nil
+ }, nil
}
diff --git a/probe/tracer.go b/probe/tracer.go
index bea1334..e59e1aa 100644
--- a/probe/tracer.go
+++ b/probe/tracer.go
@@ -33,7 +33,7 @@ type ProbeTracer struct {
stats *stats.TraceStats
trace *httptrace.ClientTrace
debug debug.DebugLevel
- probeid uint64
+ probeid uint
probeType ProbeType
}
@@ -41,7 +41,7 @@ func (p *ProbeTracer) String() string {
return fmt.Sprintf("(Probe %v): stats: %v\n", p.probeid, p.stats)
}
-func (p *ProbeTracer) ProbeId() uint64 {
+func (p *ProbeTracer) ProbeId() uint {
return p.probeid
}
@@ -293,7 +293,7 @@ func (probe *ProbeTracer) SetHttpResponseReadyTime(
func NewProbeTracer(
client *http.Client,
probeType ProbeType,
- probeId uint64,
+ probeId uint,
debugging *debug.DebugWithPrefix,
) *ProbeTracer {
probe := &ProbeTracer{
diff --git a/rpm/calculations.go b/rpm/calculations.go
new file mode 100644
index 0000000..5387aa7
--- /dev/null
+++ b/rpm/calculations.go
@@ -0,0 +1,94 @@
+/*
+ * This file is part of Go Responsiveness.
+ *
+ * Go Responsiveness is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software Foundation,
+ * either version 2 of the License, or (at your option) any later version.
+ * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package rpm
+
+import (
+ "fmt"
+
+ "github.com/network-quality/goresponsiveness/ms"
+)
+
+type Rpm struct {
+ SelfRttsTotal int
+ ForeignRttsTotal int
+ SelfRttsTrimmed int
+ ForeignRttsTrimmed int
+ SelfProbeRttPN float64
+ ForeignProbeRttPN float64
+ SelfProbeRttMean float64
+ ForeignProbeRttMean float64
+ PNRpm float64
+ MeanRpm float64
+}
+
+func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.MathematicalSeries[float64], trimming uint, percentile int) Rpm {
+ // First, let's do a double-sided trim of the top/bottom 10% of our measurements.
+ selfRttsTotalCount := selfRtts.Len()
+ foreignRttsTotalCount := foreignRtts.Len()
+
+ selfRttsTrimmed := selfRtts.DoubleSidedTrim(trimming)
+ foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(trimming)
+
+ selfRttsTrimmedCount := selfRttsTrimmed.Len()
+ foreignRttsTrimmedCount := foreignRttsTrimmed.Len()
+
+ // Then, let's take the mean of those ...
+ selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage()
+ foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage()
+
+ // Second, let's do the P90 calculations.
+ selfProbeRoundTripTimePN := selfRtts.Percentile(percentile)
+ foreignProbeRoundTripTimePN := foreignRtts.Percentile(percentile)
+
+ // Note: The specification indicates that we want to calculate the foreign probes as such:
+ // 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign
+ // where tcp_foreign, tls_foreign, http_foreign are the P90 RTTs for the connection
+ // of the tcp, tls and http connections, respectively. However, we cannot break out
+ // the individual RTTs so we assume that they are roughly equal.
+
+ // This is 60 because we measure in seconds not ms
+ pnRpm := 60.0 / (float64(selfProbeRoundTripTimePN+foreignProbeRoundTripTimePN) / 2.0)
+ meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0)
+
+ return Rpm{
+ SelfRttsTotal: selfRttsTotalCount, ForeignRttsTotal: foreignRttsTotalCount,
+ SelfRttsTrimmed: selfRttsTrimmedCount, ForeignRttsTrimmed: foreignRttsTrimmedCount,
+ SelfProbeRttPN: selfProbeRoundTripTimePN, ForeignProbeRttPN: foreignProbeRoundTripTimePN,
+ SelfProbeRttMean: selfProbeRoundTripTimeMean, ForeignProbeRttMean: foreignProbeRoundTripTimeMean,
+ PNRpm: pnRpm, MeanRpm: meanRpm,
+ }
+}
+
+func (rpm *Rpm) ToString() string {
+ return fmt.Sprintf(
+ `Total Self Probes: %d
+Total Foreign Probes: %d
+Trimmed Self Probes Count: %d
+Trimmed Foreign Probes Count: %d
+P90 Self RTT: %f
+P90 Foreign RTT: %f
+Trimmed Mean Self RTT: %f
+Trimmed Mean Foreign RTT: %f
+`,
+ rpm.SelfRttsTotal,
+ rpm.ForeignRttsTotal,
+ rpm.SelfRttsTrimmed,
+ rpm.ForeignRttsTrimmed,
+ rpm.SelfProbeRttPN,
+ rpm.ForeignProbeRttPN,
+ rpm.SelfProbeRttMean,
+ rpm.ForeignProbeRttMean,
+ )
+}
diff --git a/rpm/parameters.go b/rpm/parameters.go
new file mode 100644
index 0000000..aff8639
--- /dev/null
+++ b/rpm/parameters.go
@@ -0,0 +1,85 @@
+/*
+ * This file is part of Go Responsiveness.
+ *
+ * Go Responsiveness is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software Foundation,
+ * either version 2 of the License, or (at your option) any later version.
+ * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package rpm
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/network-quality/goresponsiveness/utilities"
+)
+
+type SpecParameters struct {
+ TestTimeout time.Duration // Total test time.
+ MovingAvgDist int
+ EvalInterval time.Duration // How often to reevaluate network conditions.
+ TrimmedMeanPct uint
+ StdDevTolerance float64
+ MaxParallelConns int
+ ProbeInterval time.Duration
+ ProbeCapacityPct float64
+}
+
+func SpecParametersFromArguments(timeout int, mad int, id int, tmp uint, sdt float64, mnp int, mps int, ptc float64) (*SpecParameters, error) {
+ if timeout <= 0 {
+ return nil, fmt.Errorf("cannot specify a 0 or negative timeout for the test")
+ }
+ if mad <= 0 {
+ return nil, fmt.Errorf("cannot specify a 0 or negative moving-average distance for the test")
+ }
+ if id <= 0 {
+ return nil, fmt.Errorf("cannot specify a 0 or negative reevaluation interval for the test")
+ }
+ if tmp < 0 {
+ return nil, fmt.Errorf("cannot specify a negative trimming percentage for the test")
+ }
+ if sdt < 0 {
+ return nil, fmt.Errorf("cannot specify a negative standard-deviation tolerance for the test")
+ }
+ if mnp <= 0 {
+ return nil, fmt.Errorf("cannot specify a 0 or negative maximum number of parallel connections for the test")
+ }
+ if mps <= 0 {
+ return nil, fmt.Errorf("cannot specify a 0 or negative probing interval for the test")
+ }
+ if ptc <= 0 {
+ return nil, fmt.Errorf("cannot specify a 0 or negative probe capacity for the test")
+ }
+ testTimeout := time.Second * time.Duration(timeout)
+ evalInterval := time.Second * time.Duration(id)
+ probeInterval := utilities.PerSecondToInterval(int64(mps))
+
+ params := SpecParameters{
+ TestTimeout: testTimeout, MovingAvgDist: mad,
+ EvalInterval: evalInterval, TrimmedMeanPct: tmp, StdDevTolerance: sdt,
+ MaxParallelConns: mnp, ProbeInterval: probeInterval, ProbeCapacityPct: ptc,
+ }
+ return &params, nil
+}
+
+func (parameters *SpecParameters) ToString() string {
+ return fmt.Sprintf(
+ `Timeout: %v,
+Moving-Average Distance: %v,
+Interval Duration: %v,
+Trimmed-Mean Percentage: %v,
+Standard-Deviation Tolerance: %v,
+Maximum number of parallel connections: %v,
+Probe Interval: %v (derived from given maximum-probes-per-second parameter),
+Maximum Percentage Of Throughput For Probes: %v`,
+ parameters.TestTimeout, parameters.MovingAvgDist, parameters.EvalInterval, parameters.TrimmedMeanPct,
+ parameters.StdDevTolerance, parameters.MaxParallelConns, parameters.ProbeInterval, parameters.ProbeCapacityPct,
+ )
+}
diff --git a/rpm/parameters_test.go b/rpm/parameters_test.go
new file mode 100644
index 0000000..4a955c5
--- /dev/null
+++ b/rpm/parameters_test.go
@@ -0,0 +1,93 @@
+/*
+ * This file is part of Go Responsiveness.
+ *
+ * Go Responsiveness is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software Foundation,
+ * either version 2 of the License, or (at your option) any later version.
+ * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package rpm
+
+import (
+ "strings"
+ "testing"
+)
+
+func TestSpecParametersFromArgumentsBadTimeout(t *testing.T) {
+ _, err := SpecParametersFromArguments(0, 0, 0, 0, 0, 0, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "timeout") {
+ t.Fatalf("0 timeout improperly allowed.")
+ }
+ _, err = SpecParametersFromArguments(-1, 0, 0, 0, 0, 0, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "timeout") {
+ t.Fatalf("negative timeout improperly allowed.")
+ }
+}
+
+func TestSpecParametersFromArgumentsBadMad(t *testing.T) {
+ _, err := SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "moving-average") {
+ t.Fatalf("0 mad improperly allowed.")
+ }
+ _, err = SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "moving-average") {
+ t.Fatalf("negative mad improperly allowed.")
+ }
+}
+
+func TestSpecParametersFromArgumentsBadId(t *testing.T) {
+ _, err := SpecParametersFromArguments(1, 1, 0, 0, 0, 0, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "reevaluation") {
+ t.Fatalf("0 id improperly allowed.")
+ }
+ _, err = SpecParametersFromArguments(1, 1, -1, 0, 0, 0, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "reevaluation") {
+ t.Fatalf("negative id improperly allowed.")
+ }
+}
+
+func TestSpecParametersFromArgumentsBadSdt(t *testing.T) {
+ _, err := SpecParametersFromArguments(1, 1, 1, 1, -1, 0, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "deviation") {
+ t.Fatalf("0 sdt improperly allowed.")
+ }
+}
+
+func TestSpecParametersFromArgumentsBadMnp(t *testing.T) {
+ _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 0, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "parallel") {
+ t.Fatalf("0 mnp improperly allowed.")
+ }
+ _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, -1, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "parallel") {
+ t.Fatalf("negative mnp improperly allowed.")
+ }
+}
+
+func TestSpecParametersFromArgumentsBadMps(t *testing.T) {
+ _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 0, 0)
+ if err == nil || !strings.Contains(err.Error(), "probing interval") {
+ t.Fatalf("0 mps improperly allowed.")
+ }
+ _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, -1, 0)
+ if err == nil || !strings.Contains(err.Error(), "probing interval") {
+ t.Fatalf("negative mps improperly allowed.")
+ }
+}
+
+func TestSpecParametersFromArgumentsBadPtc(t *testing.T) {
+ _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 0)
+ if err == nil || !strings.Contains(err.Error(), "capacity") {
+ t.Fatalf("0 ptc improperly allowed.")
+ }
+ _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, -1)
+ if err == nil || !strings.Contains(err.Error(), "capacity") {
+ t.Fatalf("negative ptc improperly allowed.")
+ }
+}
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 07bc787..23ed9f4 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -37,19 +37,22 @@ func addFlows(
toAdd uint64,
lgcc *lgc.LoadGeneratingConnectionCollection,
lgcGenerator func() lgc.LoadGeneratingConnection,
- debug debug.DebugLevel,
+ debugging debug.DebugLevel,
) uint64 {
lgcc.Lock.Lock()
defer lgcc.Lock.Unlock()
for i := uint64(0); i < toAdd; i++ {
// First, generate the connection.
- newGenerator := lgcGenerator()
- lgcc.Append(newGenerator)
+ newConnection := lgcGenerator()
+ lgcc.Append(newConnection)
+ if debug.IsDebug(debugging) {
+ fmt.Printf("Added a new %s load-generating connection.\n", newConnection.Direction())
+ }
// Second, try to start the connection.
- if !newGenerator.Start(ctx, debug) {
+ if !newConnection.Start(ctx, debugging) {
// If there was an error, we'll make sure that the caller knows it.
fmt.Printf(
- "Error starting lgc with id %d!\n", newGenerator.ClientId(),
+ "Error starting lgc with id %d!\n", newConnection.ClientId(),
)
return i
}
@@ -81,49 +84,60 @@ type SelfDataCollectionResult struct {
LoggingContinuation func()
}
-func CombinedProber(
+func ResponsivenessProber(
proberCtx context.Context,
networkActivityCtx context.Context,
foreignProbeConfigurationGenerator func() probe.ProbeConfiguration,
selfProbeConfigurationGenerator func() probe.ProbeConfiguration,
- selfDownProbeConnection lgc.LoadGeneratingConnection,
- selfUpProbeConnection lgc.LoadGeneratingConnection,
+ selfProbeConnectionCollection *lgc.LoadGeneratingConnectionCollection,
+ probeDirection lgc.LgcDirection,
probeInterval time.Duration,
keyLogger io.Writer,
captureExtendedStats bool,
debugging *debug.DebugWithPrefix,
-) (dataPoints chan probe.ProbeDataPoint) {
+) (dataPoints chan utilities.Pair[*probe.ProbeDataPoint]) {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) Starting to collect responsiveness information at an interval of %v!\n",
+ debugging.Prefix,
+ probeInterval,
+ )
+ }
+
// Make a channel to send back all the generated data points
// when we are probing.
- dataPoints = make(chan probe.ProbeDataPoint)
+ dataPoints = make(chan utilities.Pair[*probe.ProbeDataPoint])
go func() {
wg := sync.WaitGroup{}
- probeCount := 0
+ probeCount := uint(0)
+
+ dataPointsLock := sync.Mutex{}
// As long as our context says that we can continue to probe!
for proberCtx.Err() == nil {
-
time.Sleep(probeInterval)
- foreignProbeConfiguration := foreignProbeConfigurationGenerator()
- selfProbeConfiguration := selfProbeConfigurationGenerator()
-
- if debug.IsDebug(debugging.Level) {
- fmt.Printf(
- "(%s) About to send round %d of probes!\n",
- debugging.Prefix,
- probeCount+1,
- )
+ // We may have slept for a very long time. So, let's check to see if we are
+ // still active, just for fun!
+ if proberCtx.Err() != nil {
+ break
}
- transport := &http.Transport{}
- transport.TLSClientConfig = &tls.Config{}
- transport.Proxy = http.ProxyFromEnvironment
- if !utilities.IsInterfaceNil(keyLogger) {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ probeCount++
+ probeCount := probeCount
+
+ foreignProbeConfiguration := foreignProbeConfigurationGenerator()
+ selfProbeConfiguration := selfProbeConfigurationGenerator()
+
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "Using an SSL Key Logger for this foreign probe.\n",
+ "(%s) About to send round %d of probes!\n",
+ debugging.Prefix,
+ probeCount,
)
}
@@ -134,112 +148,160 @@ func CombinedProber(
// depend on whether the url contains
// https:// or http://:
// https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L74
- transport.TLSClientConfig.KeyLogWriter = keyLogger
- }
+ transport := &http.Transport{}
+ transport.TLSClientConfig = &tls.Config{}
+ transport.Proxy = http.ProxyFromEnvironment
- transport.TLSClientConfig.InsecureSkipVerify =
- foreignProbeConfiguration.InsecureSkipVerify
+ if !utilities.IsInterfaceNil(keyLogger) {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "Using an SSL Key Logger for a foreign probe.\n",
+ )
+ }
- utilities.OverrideHostTransport(transport,
- foreignProbeConfiguration.ConnectToAddr)
+ transport.TLSClientConfig.KeyLogWriter = keyLogger
+ }
- foreignProbeClient := &http.Client{Transport: transport}
+ transport.TLSClientConfig.InsecureSkipVerify =
+ foreignProbeConfiguration.InsecureSkipVerify
- // Start Foreign Connection Prober
- probeCount++
- go probe.Probe(
- networkActivityCtx,
- &wg,
- foreignProbeClient,
- nil,
- foreignProbeConfiguration.URL,
- foreignProbeConfiguration.Host,
- probe.Foreign,
- &dataPoints,
- captureExtendedStats,
- debugging,
- )
+ utilities.OverrideHostTransport(transport,
+ foreignProbeConfiguration.ConnectToAddr)
- // Start Self Download Connection Prober
+ foreignProbeClient := &http.Client{Transport: transport}
- // TODO: Make the following sanity check more than just a check.
- // We only want to start a SelfDown probe on a connection that is
- // in the RUNNING state.
- if selfDownProbeConnection.Status() == lgc.LGC_STATUS_RUNNING {
- go probe.Probe(
+ // Start Foreign Connection Prober
+ foreignProbeDataPoint, err := probe.Probe(
networkActivityCtx,
- &wg,
- selfDownProbeConnection.Client(),
- selfDownProbeConnection,
- selfProbeConfiguration.URL,
- selfProbeConfiguration.Host,
- probe.SelfDown,
- &dataPoints,
+ foreignProbeClient,
+ foreignProbeConfiguration.URL,
+ foreignProbeConfiguration.Host,
+ probeDirection,
+ probe.Foreign,
+ probeCount,
captureExtendedStats,
debugging,
)
- } else {
- panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n",
- debugging.Prefix, selfDownProbeConnection.Status()))
- }
+ if err != nil {
+ return
+ }
+
+ var selfProbeConnection *lgc.LoadGeneratingConnection = nil
+ func() {
+ selfProbeConnectionCollection.Lock.Lock()
+ defer selfProbeConnectionCollection.Lock.Unlock()
+ selfProbeConnection, err = selfProbeConnectionCollection.GetRandom()
+ if err != nil {
+ if debug.IsWarn(debugging.Level) {
+ fmt.Printf(
+ "(%s) Failed to get a random %s load-generating connection on which to send a probe: %v.\n",
+ debugging.Prefix,
+ utilities.Conditional(probeDirection == lgc.LGC_DOWN, "download", "upload"),
+ err,
+ )
+ }
+ return
+ }
+ }()
+ if selfProbeConnection == nil {
+ return
+ }
- // Start Self Upload Connection Prober
+ // TODO: Make the following sanity check more than just a check.
+ // We only want to start a SelfUp probe on a connection that is
+ // in the RUNNING state.
+ if (*selfProbeConnection).Status() != lgc.LGC_STATUS_RUNNING {
+ if debug.IsWarn(debugging.Level) {
+ fmt.Printf(
+ "(%s) The selected random %s load-generating connection on which to send a probe was not running.\n",
+ debugging.Prefix,
+ utilities.Conditional(probeDirection == lgc.LGC_DOWN, "download", "upload"),
+ )
+ }
+ return
+ }
- // TODO: Make the following sanity check more than just a check.
- // We only want to start a SelfDown probe on a connection that is
- // in the RUNNING state.
- if selfUpProbeConnection.Status() == lgc.LGC_STATUS_RUNNING {
- go probe.Probe(
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) Selected %s load-generating connection with ID %d to send a self probe with Id %d.\n",
+ debugging.Prefix,
+ utilities.Conditional(probeDirection == lgc.LGC_DOWN, "download", "upload"),
+ (*selfProbeConnection).ClientId(),
+ probeCount,
+ )
+ }
+ selfProbeDataPoint, err := probe.Probe(
proberCtx,
- &wg,
- selfUpProbeConnection.Client(),
- nil,
+ (*selfProbeConnection).Client(),
selfProbeConfiguration.URL,
selfProbeConfiguration.Host,
- probe.SelfUp,
- &dataPoints,
+ probeDirection,
+ utilities.Conditional(probeDirection == lgc.LGC_DOWN, probe.SelfDown, probe.SelfUp),
+ probeCount,
captureExtendedStats,
debugging,
)
- } else {
- panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n",
- debugging.Prefix, selfUpProbeConnection.Status()))
- }
+ if err != nil {
+ fmt.Printf(
+ "(%s) There was an error sending a self probe with Id %d: %v\n",
+ debugging.Prefix,
+ probeCount,
+ err,
+ )
+ return
+ }
+
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) About to report results for round %d of probes!\n",
+ debugging.Prefix,
+ probeCount,
+ )
+ }
+
+ dataPointsLock.Lock()
+ // Now we have our four data points (three in the foreign probe data point and one in the self probe data point)
+ if dataPoints != nil {
+ dataPoints <- utilities.Pair[*probe.ProbeDataPoint]{
+ First: foreignProbeDataPoint, Second: selfProbeDataPoint,
+ }
+ }
+ dataPointsLock.Unlock()
+ }()
}
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) Combined probe driver is going to start waiting for its probes to finish.\n",
+ "(%s) Probe driver is going to start waiting for its probes to finish.\n",
debugging.Prefix,
)
}
utilities.OrTimeout(func() { wg.Wait() }, 2*time.Second)
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) Combined probe driver is done waiting for its probes to finish.\n",
+ "(%s) Probe driver is done waiting for its probes to finish.\n",
debugging.Prefix,
)
}
+ dataPointsLock.Lock()
close(dataPoints)
+ dataPoints = nil
+ dataPointsLock.Unlock()
}()
return
}
func LoadGenerator(
+ throughputCtx context.Context, // Stop our activity when we no longer need any throughput
networkActivityCtx context.Context, // Create all network connections in this context.
- loadGeneratorCtx context.Context, // Stop our activity when we no longer need to generate load.
+ generateLoadCtx context.Context, // Stop adding additional throughput when we are stable.
rampupInterval time.Duration,
lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection.
loadGeneratingConnectionsCollection *lgc.LoadGeneratingConnectionCollection,
+ mnp int,
captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections?
debugging *debug.DebugWithPrefix, // How can we forget debugging?
-) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes.
- throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate.
-) {
- throughputCalculations = make(chan ThroughputDataPoint)
- // The channel that we are going to use to send back the connection to use for probing may not immediately
- // be read by the caller. We don't want to wait around until they are ready before we start doing our work.
- // So, we'll make it buffered.
- probeConnectionCommunicationChannel = make(chan lgc.LoadGeneratingConnection, 1)
+) (stabilizerCommunicationChannel chan ThroughputDataPoint) { // Send back all the instantaneous throughputs that we generate.
+ stabilizerCommunicationChannel = make(chan ThroughputDataPoint)
go func() {
flowsCreated := uint64(0)
@@ -252,32 +314,12 @@ func LoadGenerator(
debugging.Level,
)
- // We have at least a single load-generating channel. This channel will be the one that
- // the self probes use.
- go func() {
- loadGeneratingConnectionsCollection.Lock.Lock()
- zerothConnection, err := loadGeneratingConnectionsCollection.Get(0)
- loadGeneratingConnectionsCollection.Lock.Unlock()
- if err != nil {
- panic("Could not get the zeroth connection!\n")
- }
- // We are going to wait until it is started.
- if !(*zerothConnection).WaitUntilStarted(loadGeneratorCtx) {
- fmt.Fprintf(os.Stderr, "Could not wait until the zeroth load-generating connection was started!\n")
- return
- }
- // Now that it is started, we will send it back to the caller so that
- // they can pass it on to the CombinedProber which will use it for the
- // self probes.
- probeConnectionCommunicationChannel <- *zerothConnection
- }()
-
nextSampleStartTime := time.Now().Add(rampupInterval)
for currentInterval := uint64(0); true; currentInterval++ {
- // If the loadGeneratorCtx is canceled, then that means our work here is done ...
- if loadGeneratorCtx.Err() != nil {
+ // If the throughputCtx is canceled, then that means our work here is done ...
+ if throughputCtx.Err() != nil {
break
}
@@ -297,6 +339,12 @@ func LoadGenerator(
}
nextSampleStartTime = time.Now().Add(time.Second)
+ // Waiting is the hardest part -- that was a long time asleep
+ // and we may have been cancelled during that time!
+ if throughputCtx.Err() != nil {
+ break
+ }
+
// Compute "instantaneous aggregate" goodput which is the number of
// bytes transferred within the last second.
var instantaneousThroughputTotal float64 = 0
@@ -406,16 +454,50 @@ func LoadGenerator(
len(*loadGeneratingConnectionsCollection.LGCs),
granularThroughputDatapoints,
}
- throughputCalculations <- throughputDataPoint
+ stabilizerCommunicationChannel <- throughputDataPoint
- // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now!
- flowsCreated += addFlows(
- networkActivityCtx,
- constants.AdditiveNumberOfLoadGeneratingConnections,
- loadGeneratingConnectionsCollection,
- lgcGenerator,
- debugging.Level,
- )
+ if generateLoadCtx.Err() != nil {
+ // No need to add additional data points because the controller told us
+ // that we were stable. But, we want to continue taking measurements!
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "%v: Throughput is stable; not adding any additional load-generating connections.\n",
+ debugging,
+ )
+ }
+ continue
+ }
+
+ loadGeneratingConnectionsCollection.Lock.Lock()
+ currentParallelConnectionCount, err :=
+ loadGeneratingConnectionsCollection.Len()
+ loadGeneratingConnectionsCollection.Lock.Unlock()
+
+ if err != nil {
+ if debug.IsWarn(debugging.Level) {
+ fmt.Printf(
+ "%v: Failed to get a count of the number of parallel load-generating connections: %v.\n",
+ debugging,
+ err,
+ )
+ }
+ }
+ if currentParallelConnectionCount < mnp {
+ // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now!
+ flowsCreated += addFlows(
+ networkActivityCtx,
+ constants.AdditiveNumberOfLoadGeneratingConnections,
+ loadGeneratingConnectionsCollection,
+ lgcGenerator,
+ debugging.Level,
+ )
+ } else if debug.IsWarn(debugging.Level) {
+ fmt.Printf(
+ "%v: Maximum number of parallel transport-layer connections reached (%d). Not adding another.\n",
+ debugging,
+ mnp,
+ )
+ }
}
if debug.IsDebug(debugging.Level) {
diff --git a/stabilizer/algorithm.go b/stabilizer/algorithm.go
new file mode 100644
index 0000000..45c34d9
--- /dev/null
+++ b/stabilizer/algorithm.go
@@ -0,0 +1,130 @@
+/*
+ * This file is part of Go Responsiveness.
+ *
+ * Go Responsiveness is free software: you can redistribute it and/or modify it under
+ * the terms of the GNU General Public License as published by the Free Software Foundation,
+ * either version 2 of the License, or (at your option) any later version.
+ * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+package stabilizer
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/network-quality/goresponsiveness/debug"
+ "github.com/network-quality/goresponsiveness/ms"
+ "golang.org/x/exp/constraints"
+)
+
+type MeasurementStablizer[T constraints.Float | constraints.Integer] struct {
+ instantaneousses ms.MathematicalSeries[T]
+ aggregates ms.MathematicalSeries[float64]
+ stabilityStandardDeviation float64
+ trimmingLevel uint
+ m sync.Mutex
+ dbgLevel debug.DebugLevel
+ dbgConfig *debug.DebugWithPrefix
+ units string
+}
+
+// Stabilizer parameters:
+// 1. MAD: An all-purpose value that determines the hysteresis of various calculations
+// that will affect saturation (of either throughput or responsiveness).
+// 2: SDT: The standard deviation cutoff used to determine stability among the K preceding
+// moving averages of a measurement.
+// 3: TMP: The percentage by which to trim the values before calculating the standard deviation
+// to determine whether the value is within acceptable range for stability (SDT).
+
+// Stabilizer Algorithm:
+// Throughput stabilization is achieved when the standard deviation of the MAD number of the most
+// recent moving averages of instantaneous measurements is within an upper bound.
+//
+// Yes, that *is* a little confusing:
+// The user will deliver us a steady diet of measurements of the number of bytes transmitted during the immediately
+// previous interval. We will keep the MAD most recent of those measurements. Every time that we get a new
+// measurement, we will recalculate the moving average of the MAD most instantaneous measurements. We will call that
+// the moving average aggregate throughput at interval p. We keep the MAD most recent of those values.
+// If the calculated standard deviation of *those* values is less than SDT, we declare
+// stability.
+
+func NewStabilizer[T constraints.Float | constraints.Integer](
+ mad uint,
+ sdt float64,
+ trimmingLevel uint,
+ units string,
+ debugLevel debug.DebugLevel,
+ debug *debug.DebugWithPrefix,
+) MeasurementStablizer[T] {
+ return MeasurementStablizer[T]{
+ instantaneousses: ms.NewCappedMathematicalSeries[T](mad),
+ aggregates: ms.NewCappedMathematicalSeries[float64](mad),
+ stabilityStandardDeviation: sdt,
+ trimmingLevel: trimmingLevel,
+ units: units,
+ dbgConfig: debug,
+ dbgLevel: debugLevel,
+ }
+}
+
+func (r3 *MeasurementStablizer[T]) AddMeasurement(measurement T) {
+ r3.m.Lock()
+ defer r3.m.Unlock()
+ // Add this instantaneous measurement to the mix of the MAD previous instantaneous measurements.
+ r3.instantaneousses.AddElement(measurement)
+ // Calculate the moving average of the MAD previous instantaneous measurements (what the
+ // algorithm calls moving average aggregate throughput at interval p) and add it to
+ // the mix of MAD previous moving averages.
+ r3.aggregates.AddElement(r3.instantaneousses.CalculateAverage())
+
+ if debug.IsDebug(r3.dbgLevel) {
+ fmt.Printf(
+ "%s: MA: %f Mbps (previous %d intervals).\n",
+ r3.dbgConfig.String(),
+ r3.aggregates.CalculateAverage(),
+ r3.aggregates.Len(),
+ )
+ }
+}
+
+func (r3 *MeasurementStablizer[T]) IsStable() bool {
+ // There are MAD number of measurements of the _moving average aggregate throughput
+ // at interval p_ in movingAverages.
+ isvalid, stddev := r3.aggregates.StandardDeviation()
+
+ if !isvalid {
+ // If there are not enough values in the series to be able to calculate a
+ // standard deviation, then we know that we are not yet stable. Vamoose.
+ return false
+ }
+
+ // Stability is determined by whether or not the standard deviation of the values
+ // is within some percentage of the average.
+ stabilityCutoff := r3.aggregates.CalculateAverage() * (r3.stabilityStandardDeviation / 100.0)
+ isStable := stddev <= stabilityCutoff
+
+ if debug.IsDebug(r3.dbgLevel) {
+ fmt.Printf(
+ "%s: Is Stable? %v; Standard Deviation: %f %s; Is Normally Distributed? %v; Standard Deviation Cutoff: %v %s).\n",
+ r3.dbgConfig.String(),
+ isStable,
+ stddev,
+ r3.units,
+ r3.aggregates.IsNormallyDistributed(),
+ stabilityCutoff,
+ r3.units,
+ )
+ fmt.Printf("%s: Values: ", r3.dbgConfig.String())
+ for _, v := range r3.aggregates.Values() {
+ fmt.Printf("%v, ", v)
+ }
+ fmt.Printf("\n")
+ }
+ return isStable
+}
diff --git a/stabilizer/rev3.go b/stabilizer/rev3.go
deleted file mode 100644
index 4c24f54..0000000
--- a/stabilizer/rev3.go
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * This file is part of Go Responsiveness.
- *
- * Go Responsiveness is free software: you can redistribute it and/or modify it under
- * the terms of the GNU General Public License as published by the Free Software Foundation,
- * either version 2 of the License, or (at your option) any later version.
- * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
- * PARTICULAR PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>.
- */
-
-package stabilizer
-
-import (
- "fmt"
- "sync"
-
- "github.com/network-quality/goresponsiveness/debug"
- "github.com/network-quality/goresponsiveness/ms"
- "github.com/network-quality/goresponsiveness/probe"
- "github.com/network-quality/goresponsiveness/rpm"
- "github.com/network-quality/goresponsiveness/utilities"
-)
-
-type DataPointStabilizer struct {
- instantaneousMeasurements ms.MathematicalSeries[float64]
- movingAverages ms.MathematicalSeries[float64]
- stabilityStandardDeviation float64
- m sync.Mutex
- dbgLevel debug.DebugLevel
- dbgConfig *debug.DebugWithPrefix
-}
-
-type ProbeStabilizer DataPointStabilizer
-type ThroughputStabilizer DataPointStabilizer
-
-// Stabilizer parameters:
-// 1. I: The number of previous instantaneous measurements to consider when generating
-// the so-called instantaneous moving averages.
-// 2. K: The number of instantaneous moving averages to consider when determining stability.
-// 3: S: The standard deviation cutoff used to determine stability among the K preceding
-// moving averages of a measurement.
-
-// Rev3 Stabilizer Algorithm:
-// Stabilization is achieved when the standard deviation of a given number of the most recent moving averages of
-// instantaneous measurements is within an upper bound.
-//
-// Yes, that *is* a little confusing:
-// The user will deliver us a steady diet of so-called instantaneous measurements. We will keep the I most recent
-// of those measurements. Every time that we get a new instantaneous measurement, we will recalculate the moving
-// average of the I most instantaneous measurements. We will call that an instantaneous moving average. We keep the K
-// most recent instantaneous moving averages. Every time that we calculate a new instantaneous moving average, we will
-// calculate the standard deviation of those values. If the calculated standard deviation is less than S, we declare
-// stability.
-
-func NewProbeStabilizer(
- i uint64,
- k uint64,
- s float64,
- debugLevel debug.DebugLevel,
- debug *debug.DebugWithPrefix,
-) ProbeStabilizer {
- return ProbeStabilizer{instantaneousMeasurements: ms.NewCappedMathematicalSeries[float64](i),
- movingAverages: ms.NewCappedMathematicalSeries[float64](k),
- stabilityStandardDeviation: s,
- dbgConfig: debug,
- dbgLevel: debugLevel}
-}
-
-func (r3 *ProbeStabilizer) AddMeasurement(measurement probe.ProbeDataPoint) {
- r3.m.Lock()
- defer r3.m.Unlock()
-
- // There may be more than one round trip accumulated together. If that is the case,
- // we will blow them apart in to three separate measurements and each one will just
- // be 1 / measurement.RoundTripCount of the total length.
- for range utilities.Iota(0, int(measurement.RoundTripCount)) {
- // Add this instantaneous measurement to the mix of the I previous instantaneous measurements.
- r3.instantaneousMeasurements.AddElement(
- measurement.Duration.Seconds() / float64(measurement.RoundTripCount),
- )
- }
- // Calculate the moving average of the I previous instantaneous measurements and add it to
- // the mix of K previous moving averages.
- r3.movingAverages.AddElement(r3.instantaneousMeasurements.CalculateAverage())
-
- if debug.IsDebug(r3.dbgLevel) {
- fmt.Printf(
- "%s: MA: %f ns (previous %d intervals).\n",
- r3.dbgConfig.String(),
- r3.movingAverages.CalculateAverage(),
- r3.movingAverages.Len(),
- )
- }
-}
-
-func (r3 *ProbeStabilizer) IsStable() bool {
- // calculate whether the standard deviation of the K previous moving averages falls below S.
- isvalid, stddev := r3.movingAverages.StandardDeviation()
-
- if !isvalid {
- // If there are not enough values in the series to be able to calculate a
- // standard deviation, then we know that we are not yet stable. Vamoose.
- return false
- }
-
- // Stability is determined by whether or not the standard deviation of the values
- // is within some percentage of the average.
- stabilityCutoff := r3.movingAverages.CalculateAverage() * (r3.stabilityStandardDeviation / 100.0)
- isStable := stddev <= stabilityCutoff
-
- if debug.IsDebug(r3.dbgLevel) {
- fmt.Printf(
- "%s: Is Stable? %v; Standard Deviation: %f s; Is Normally Distributed? %v; Standard Deviation Cutoff: %v s).\n",
- r3.dbgConfig.String(),
- isStable,
- stddev,
- r3.movingAverages.IsNormallyDistributed(),
- stabilityCutoff,
- )
- fmt.Printf("%s: Values: ", r3.dbgConfig.String())
- for _, v := range r3.movingAverages.Values() {
- fmt.Printf("%v, ", v)
- }
- fmt.Printf("\n")
- }
-
- return isStable
-}
-
-func NewThroughputStabilizer(
- i uint64,
- k uint64,
- s float64,
- debugLevel debug.DebugLevel,
- debug *debug.DebugWithPrefix,
-) ThroughputStabilizer {
- return ThroughputStabilizer{
- instantaneousMeasurements: ms.NewCappedMathematicalSeries[float64](i),
- movingAverages: ms.NewCappedMathematicalSeries[float64](k),
- stabilityStandardDeviation: s,
- dbgConfig: debug,
- dbgLevel: debugLevel,
- }
-}
-
-func (r3 *ThroughputStabilizer) AddMeasurement(measurement rpm.ThroughputDataPoint) {
- r3.m.Lock()
- defer r3.m.Unlock()
- // Add this instantaneous measurement to the mix of the I previous instantaneous measurements.
- r3.instantaneousMeasurements.AddElement(utilities.ToMbps(measurement.Throughput))
- // Calculate the moving average of the I previous instantaneous measurements and add it to
- // the mix of K previous moving averages.
- r3.movingAverages.AddElement(r3.instantaneousMeasurements.CalculateAverage())
-
- if debug.IsDebug(r3.dbgLevel) {
- fmt.Printf(
- "%s: MA: %f Mbps (previous %d intervals).\n",
- r3.dbgConfig.String(),
- r3.movingAverages.CalculateAverage(),
- r3.movingAverages.Len(),
- )
- }
-}
-
-func (r3 *ThroughputStabilizer) IsStable() bool {
- isvalid, stddev := r3.movingAverages.StandardDeviation()
-
- if !isvalid {
- // If there are not enough values in the series to be able to calculate a
- // standard deviation, then we know that we are not yet stable. Vamoose.
- return false
- }
-
- // Stability is determined by whether or not the standard deviation of the values
- // is within some percentage of the average.
- stabilityCutoff := r3.movingAverages.CalculateAverage() * (r3.stabilityStandardDeviation / 100.0)
- isStable := stddev <= stabilityCutoff
-
- if debug.IsDebug(r3.dbgLevel) {
- fmt.Printf(
- "%s: Is Stable? %v; Standard Deviation: %f Mbps; Is Normally Distributed? %v; Standard Deviation Cutoff: %v Mbps).\n",
- r3.dbgConfig.String(),
- isStable,
- stddev,
- r3.movingAverages.IsNormallyDistributed(),
- stabilityCutoff,
- )
- fmt.Printf("%s: Values: ", r3.dbgConfig.String())
- for _, v := range r3.movingAverages.Values() {
- fmt.Printf("%v, ", v)
- }
- fmt.Printf("\n")
- }
- return isStable
-}
diff --git a/utilities/utilities.go b/utilities/utilities.go
index e75d373..ff04023 100644
--- a/utilities/utilities.go
+++ b/utilities/utilities.go
@@ -30,13 +30,10 @@ import (
"golang.org/x/exp/constraints"
)
-var (
- // GitVersion is the Git revision hash
- GitVersion = "dev"
-)
+// GitVersion is the Git revision hash
+var GitVersion = "dev"
func Iota(low int, high int) (made []int) {
-
made = make([]int, high-low)
for counter := low; counter < high; counter++ {
made[counter-low] = counter
@@ -67,7 +64,7 @@ func AbsPercentDifference(
)
}
-func Conditional(condition bool, t string, f string) string {
+func Conditional[T any](condition bool, t T, f T) T {
if condition {
return t
}
@@ -229,3 +226,12 @@ func ContextSignaler(ctxt context.Context, st time.Duration, condition *func() b
return
}
}
+
+type Pair[T any] struct {
+ First T
+ Second T
+}
+
+func PerSecondToInterval(rate int64) time.Duration {
+ return time.Duration(time.Second.Nanoseconds() / rate)
+}
diff --git a/utilities/utilities_test.go b/utilities/utilities_test.go
index aa66f6b..9cd4ef0 100644
--- a/utilities/utilities_test.go
+++ b/utilities/utilities_test.go
@@ -116,3 +116,13 @@ func TestWaitWithContext(t *testing.T) {
wg.Wait()
}
+
+func TestPerSecondToInterval(t *testing.T) {
+ if time.Second != PerSecondToInterval(1) {
+ t.Fatalf("A number of nanoseconds is not equal to a second!")
+ }
+
+ if time.Second/2 != PerSecondToInterval(2) {
+ t.Fatalf("Something that happens twice per second should happen every 5000ns.")
+ }
+}