diff options
| author | Will Hawkins <[email protected]> | 2023-05-23 17:58:14 -0400 |
|---|---|---|
| committer | Will Hawkins <[email protected]> | 2023-06-21 09:12:22 -0400 |
| commit | ec2ccf69d8b08abb03fa3bdb3e7e95ae1862d619 (patch) | |
| tree | 6b636bdbda82db40da89a2bde213c684542850dc | |
| parent | 5558f0347baaf6db066314f0eaf82d7fb552b2f7 (diff) | |
Major Update/Refactor to Support IETF 02
Beginning of a release candidate for support for IETF 02 tag of the
responsiveness spec.
| -rw-r--r-- | Makefile | 2 | ||||
| -rw-r--r-- | constants/constants.go | 14 | ||||
| -rw-r--r-- | lgc/collection.go | 27 | ||||
| -rw-r--r-- | lgc/download.go | 4 | ||||
| -rw-r--r-- | lgc/lgc.go | 17 | ||||
| -rw-r--r-- | lgc/upload.go | 4 | ||||
| -rw-r--r-- | ms/ms.go | 29 | ||||
| -rw-r--r-- | ms/ms_test.go | 93 | ||||
| -rw-r--r-- | networkQuality.go | 924 | ||||
| -rw-r--r-- | probe/probe.go | 51 | ||||
| -rw-r--r-- | probe/tracer.go | 6 | ||||
| -rw-r--r-- | rpm/calculations.go | 94 | ||||
| -rw-r--r-- | rpm/parameters.go | 85 | ||||
| -rw-r--r-- | rpm/parameters_test.go | 93 | ||||
| -rw-r--r-- | rpm/rpm.go | 324 | ||||
| -rw-r--r-- | stabilizer/algorithm.go | 130 | ||||
| -rw-r--r-- | stabilizer/rev3.go | 199 | ||||
| -rw-r--r-- | utilities/utilities.go | 18 | ||||
| -rw-r--r-- | utilities/utilities_test.go | 10 |
19 files changed, 1271 insertions, 853 deletions
@@ -6,7 +6,7 @@ all: build test build: go build $(LDFLAGS) networkQuality.go test: - go test ./timeoutat/ ./traceable/ ./ms/ ./utilities/ ./lgc ./qualityattenuation + go test ./timeoutat/ ./traceable/ ./ms/ ./utilities/ ./lgc ./qualityattenuation ./rpm golines: find . -name '*.go' -exec ~/go/bin/golines -w {} \; clean: diff --git a/constants/constants.go b/constants/constants.go index 66f7110..7a8d562 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -35,8 +35,6 @@ var ( // The amount of time that the client will cooldown if it is in debug mode. CooldownPeriod time.Duration = 4 * time.Second - // The amount of time that we give ourselves to calculate the RPM. - RPMCalculationTime int = 10 // The default amount of time that a test will take to calculate the RPM. DefaultTestTime int = 20 @@ -49,3 +47,15 @@ var ( // The default determination of whether to verify server certificates DefaultInsecureSkipVerify bool = true ) + +type SpecParametersCliOptions struct { + Mad int + Id int + Tmp uint + Sdt float64 + Mnp int + Mps int + Ptc float64 +} + +var SpecParameterCliOptionsDefaults = SpecParametersCliOptions{Mad: 4, Id: 1, Tmp: 5, Sdt: 5.0, Mnp: 16, Mps: 100, Ptc: 0.05} diff --git a/lgc/collection.go b/lgc/collection.go index 7560186..cd6a87d 100644 --- a/lgc/collection.go +++ b/lgc/collection.go @@ -16,6 +16,7 @@ package lgc import ( "fmt" + "math/rand" "sync" ) @@ -33,10 +34,10 @@ func (collection *LoadGeneratingConnectionCollection) Get(idx int) (*LoadGenerat collection.Lock.Unlock() return nil, fmt.Errorf("collection is unlocked") } + return collection.lockedGet(idx) +} - if idx > len(*collection.LGCs) { - return nil, fmt.Errorf("index too large") - } +func (collection *LoadGeneratingConnectionCollection) lockedGet(idx int) (*LoadGeneratingConnection, error) { return &(*collection.LGCs)[idx], nil } @@ -49,6 +50,22 @@ func (collection *LoadGeneratingConnectionCollection) Append(conn LoadGenerating return nil } -func (collection *LoadGeneratingConnectionCollection) Len() int { - return len(*collection.LGCs) +func (collection *LoadGeneratingConnectionCollection) Len() (int, error) { + if collection.Lock.TryLock() { + collection.Lock.Unlock() + return -1, fmt.Errorf("collection is unlocked") + } + return len(*collection.LGCs), nil +} + +func (collection *LoadGeneratingConnectionCollection) GetRandom() (*LoadGeneratingConnection, error) { + if collection.Lock.TryLock() { + collection.Lock.Unlock() + return nil, fmt.Errorf("collection is unlocked") + } + + idx := int(rand.Uint32()) + idx = idx % len(*collection.LGCs) + + return collection.lockedGet(idx) } diff --git a/lgc/download.go b/lgc/download.go index a73cc37..c13a1b1 100644 --- a/lgc/download.go +++ b/lgc/download.go @@ -64,6 +64,10 @@ func NewLoadGeneratingConnectionDownload(url string, keyLogger io.Writer, connec return lgd } +func (lgd *LoadGeneratingConnectionDownload) Direction() LgcDirection { + return LGC_DOWN +} + func (lgd *LoadGeneratingConnectionDownload) WaitUntilStarted(ctxt context.Context) bool { conditional := func() bool { return lgd.status != LGC_STATUS_NOT_STARTED } go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgd.statusWaiter) @@ -31,6 +31,23 @@ type LoadGeneratingConnection interface { ClientId() uint64 Stats() *stats.TraceStats WaitUntilStarted(context.Context) bool + Direction() LgcDirection +} + +type LgcDirection int + +const ( + LGC_DOWN LgcDirection = iota + LGC_UP +) + +func (direction LgcDirection) String() string { + if direction == LGC_DOWN { + return "Download" + } else if direction == LGC_UP { + return "Upload" + } + return "Invalid load-generating connection direction" } type LgcStatus int diff --git a/lgc/upload.go b/lgc/upload.go index 5175fe0..e4518b8 100644 --- a/lgc/upload.go +++ b/lgc/upload.go @@ -90,6 +90,10 @@ func (lgu *LoadGeneratingConnectionUpload) Status() LgcStatus { return lgu.status } +func (lgd *LoadGeneratingConnectionUpload) Direction() LgcDirection { + return LGC_UP +} + type syntheticCountingReader struct { n *uint64 ctx context.Context @@ -33,7 +33,7 @@ type MathematicalSeries[T constraints.Float | constraints.Integer] interface { Len() int Values() []T Percentile(int) T - DoubleSidedTrim(uint32) MathematicalSeries[T] + DoubleSidedTrim(uint) MathematicalSeries[T] Less(int, int) bool Swap(int, int) } @@ -77,7 +77,7 @@ func (ims *InfiniteMathematicalSeries[T]) Less(i, j int) bool { return ims.elements[i] < ims.elements[j] } -func (ims *InfiniteMathematicalSeries[T]) DoubleSidedTrim(percent uint32) MathematicalSeries[T] { +func (ims *InfiniteMathematicalSeries[T]) DoubleSidedTrim(percent uint) MathematicalSeries[T] { if percent >= 100 { panic( fmt.Sprintf("Cannot perform double-sided trim for an invalid percentage: %d", percent), @@ -137,7 +137,6 @@ func (ims *InfiniteMathematicalSeries[T]) AllSequentialIncreasesLessThan( * N.B.: Overflow is possible -- use at your discretion! */ func (ims *InfiniteMathematicalSeries[T]) StandardDeviation() (bool, T) { - // From https://www.mathsisfun.com/data/standard-deviation-calculator.html // Yes, for real! @@ -165,7 +164,7 @@ func (ims *InfiniteMathematicalSeries[T]) StandardDeviation() (bool, T) { // Finally, the standard deviation is the square root // of the variance. sd := T(math.Sqrt(variance)) - //sd := T(variance) + // sd := T(variance) return true, sd } @@ -187,14 +186,14 @@ func (ims *InfiniteMathematicalSeries[T]) Percentile(p int) T { } type CappedMathematicalSeries[T constraints.Float | constraints.Integer] struct { - elements_count uint64 + elements_count uint elements []T - index uint64 - divisor *saturating.Saturating[uint64] + index uint + divisor *saturating.Saturating[uint] } func NewCappedMathematicalSeries[T constraints.Float | constraints.Integer]( - instants_count uint64, + instants_count uint, ) MathematicalSeries[T] { return &CappedMathematicalSeries[T]{ elements: make([]T, instants_count), @@ -221,7 +220,6 @@ func (ma *CappedMathematicalSeries[T]) CalculateAverage() float64 { func (ma *CappedMathematicalSeries[T]) AllSequentialIncreasesLessThan( limit float64, ) (_ bool, maximumSequentialIncrease float64) { - // If we have not yet accumulated a complete set of intervals, // this is false. if ma.divisor.Value() != ma.elements_count { @@ -233,7 +231,7 @@ func (ma *CappedMathematicalSeries[T]) AllSequentialIncreasesLessThan( oldestIndex := ma.index previous := ma.elements[oldestIndex] maximumSequentialIncrease = 0 - for i := uint64(1); i < ma.elements_count; i++ { + for i := uint(1); i < ma.elements_count; i++ { currentIndex := (oldestIndex + i) % ma.elements_count current := ma.elements[currentIndex] percentChange := utilities.SignedPercentDifference(current, previous) @@ -249,7 +247,6 @@ func (ma *CappedMathematicalSeries[T]) AllSequentialIncreasesLessThan( * N.B.: Overflow is possible -- use at your discretion! */ func (ma *CappedMathematicalSeries[T]) StandardDeviation() (bool, T) { - // If we have not yet accumulated a complete set of intervals, // we are always false. if ma.divisor.Value() != ma.elements_count { @@ -283,7 +280,7 @@ func (ma *CappedMathematicalSeries[T]) StandardDeviation() (bool, T) { // Finally, the standard deviation is the square root // of the variance. sd := T(math.Sqrt(variance)) - //sd := T(variance) + // sd := T(variance) return true, sd } @@ -312,7 +309,7 @@ func (ma *CappedMathematicalSeries[T]) Values() []T { } func (ma *CappedMathematicalSeries[T]) Len() int { - if uint64(len(ma.elements)) != ma.elements_count { + if uint(len(ma.elements)) != ma.elements_count { panic( fmt.Sprintf( "Error: A capped mathematical series' metadata is invalid: the length of its element array/slice does not match element_count! (%v vs %v)", @@ -346,19 +343,19 @@ func (ims *CappedMathematicalSeries[T]) Less(i, j int) bool { return ims.elements[i] < ims.elements[j] } -func (ims *CappedMathematicalSeries[T]) DoubleSidedTrim(percent uint32) MathematicalSeries[T] { +func (ims *CappedMathematicalSeries[T]) DoubleSidedTrim(percent uint) MathematicalSeries[T] { if percent >= 100 { panic( fmt.Sprintf("Cannot perform double-sided trim for an invalid percentage: %d", percent), ) } - trimmed := &CappedMathematicalSeries[T]{elements_count: uint64(ims.Len())} + trimmed := &CappedMathematicalSeries[T]{elements_count: uint(ims.Len())} trimmed.elements = make([]T, ims.Len()) copy(trimmed.elements, ims.elements) sort.Sort(trimmed) - elementsToTrim := uint64(float32(ims.Len()) * ((float32(percent)) / float32(100.0))) + elementsToTrim := uint(float32(ims.Len()) * ((float32(percent)) / float32(100.0))) trimmed.elements = trimmed.elements[elementsToTrim : len(trimmed.elements)-int(elementsToTrim)] trimmed.elements_count -= (elementsToTrim * 2) diff --git a/ms/ms_test.go b/ms/ms_test.go index 533cc7e..34817d0 100644 --- a/ms/ms_test.go +++ b/ms/ms_test.go @@ -11,7 +11,7 @@ func Test_InfiniteValues(test *testing.T) { series := NewInfiniteMathematicalSeries[float64]() shouldMatch := make([]float64, 0) previous := float64(1.0) - for _ = range utilities.Iota(1, 80) { + for range utilities.Iota(1, 80) { previous *= 1.059 series.AddElement(float64(previous)) shouldMatch = append(shouldMatch, previous) @@ -21,10 +21,11 @@ func Test_InfiniteValues(test *testing.T) { test.Fatalf("Values() on infinite mathematical series does not work.") } } + func Test_InfiniteSequentialIncreasesAlwaysLessThan(test *testing.T) { series := NewInfiniteMathematicalSeries[float64]() previous := float64(1.0) - for _ = range utilities.Iota(1, 80) { + for range utilities.Iota(1, 80) { previous *= 1.059 series.AddElement(float64(previous)) } @@ -35,6 +36,7 @@ func Test_InfiniteSequentialIncreasesAlwaysLessThan(test *testing.T) { ) } } + func Test_CappedTooFewInstantsSequentialIncreasesLessThanAlwaysFalse(test *testing.T) { series := NewCappedMathematicalSeries[float64](500) series.AddElement(0.0) @@ -51,14 +53,17 @@ func Test_Infinite_degenerate_percentile_too_high(test *testing.T) { test.Fatalf("(infinite) Series percentile of 101 failed.") } } + func Test_Infinite_degenerate_percentile_too_low(test *testing.T) { series := NewInfiniteMathematicalSeries[int]() if series.Percentile(-1) != 0 { test.Fatalf("(infinite) Series percentile of -1 failed.") } } + func Test_Infinite90_percentile(test *testing.T) { - series := NewInfiniteMathematicalSeries[int]() + var expected int64 = 10 + series := NewInfiniteMathematicalSeries[int64]() series.AddElement(10) series.AddElement(9) series.AddElement(8) @@ -70,15 +75,16 @@ func Test_Infinite90_percentile(test *testing.T) { series.AddElement(2) series.AddElement(1) - if series.Percentile(90) != 10 { + if series.Percentile(90) != expected { test.Fatalf( - "(infinite) Series 90th percentile of 0 ... 10 failed: Expected 10 got %v.", + "(infinite) Series 90th percentile of 0 ... 10 failed: Expected: %v; Actual: %v.", expected, series.Percentile(90), ) } } func Test_Infinite90_percentile_reversed(test *testing.T) { + var expected int64 = 10 series := NewInfiniteMathematicalSeries[int64]() series.AddElement(1) series.AddElement(2) @@ -91,15 +97,16 @@ func Test_Infinite90_percentile_reversed(test *testing.T) { series.AddElement(9) series.AddElement(10) - if series.Percentile(90) != 10 { + if series.Percentile(90) != expected { test.Fatalf( - "(infinite) Series 90th percentile of 0 ... 10 failed: Expected 10 got %v.", + "(infinite) Series 90th percentile of 0 ... 10 failed: Expected %v; Actual: %v.", expected, series.Percentile(90), ) } } func Test_Infinite50_percentile_jumbled(test *testing.T) { + var expected int64 = 15 series := NewInfiniteMathematicalSeries[int64]() series.AddElement(7) series.AddElement(2) @@ -112,15 +119,16 @@ func Test_Infinite50_percentile_jumbled(test *testing.T) { series.AddElement(11) series.AddElement(12) - if series.Percentile(50) != 15 { + if series.Percentile(50) != expected { test.Fatalf( - "(infinite) Series 50 percentile of a jumble of numbers failed: Expected 15 got %v.", + "(infinite) Series 50 percentile of a jumble of numbers failed: Expected %v; Actual: %v.", expected, series.Percentile(50), ) } } func Test_InfiniteDoubleSidedTrimmedMean_jumbled(test *testing.T) { + expected := 16 series := NewInfiniteMathematicalSeries[int64]() series.AddElement(7) series.AddElement(2) @@ -145,10 +153,10 @@ func Test_InfiniteDoubleSidedTrimmedMean_jumbled(test *testing.T) { trimmed := series.DoubleSidedTrim(10) - if trimmed.Len() != 16 { + if trimmed.Len() != expected { test.Fatalf( "Capped series is not of the proper size. Expected %v and got %v", - 16, + expected, trimmed.Len(), ) } @@ -165,7 +173,7 @@ func Test_InfiniteDoubleSidedTrimmedMean_jumbled(test *testing.T) { func Test_CappedSequentialIncreasesAlwaysLessThan(test *testing.T) { series := NewCappedMathematicalSeries[float64](40) previous := float64(1.0) - for _ = range utilities.Iota(1, 80) { + for range utilities.Iota(1, 80) { previous *= 1.059 series.AddElement(float64(previous)) } @@ -221,16 +229,41 @@ func Test_CappedSequentialIncreasesAlwaysLessThanWithWraparoundInverse(test *tes } func Test_CappedStandardDeviationCalculation(test *testing.T) { + expected := 2.93 series := NewCappedMathematicalSeries[float64](5) // 5.7, 1.0, 8.6, 7.4, 2.2 series.AddElement(5.7) + series.AddElement(5.7) + series.AddElement(5.7) + series.AddElement(5.7) + series.AddElement(5.7) + series.AddElement(5.7) + series.AddElement(5.7) + series.AddElement(5.7) + series.AddElement(5.7) series.AddElement(1.0) series.AddElement(8.6) series.AddElement(7.4) series.AddElement(2.2) - if _, sd := series.StandardDeviation(); !utilities.ApproximatelyEqual(2.93, sd, 0.01) { - test.Fatalf("Standard deviation max calculation failed: %v.", sd) + if _, sd := series.StandardDeviation(); !utilities.ApproximatelyEqual(sd, expected, 0.01) { + test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd) + } else { + test.Logf("Standard deviation calculation result: %v", sd) + } +} + +func Test_CappedStandardDeviationCalculation2(test *testing.T) { + expected := 1.41 + series := NewCappedMathematicalSeries[float64](5) + series.AddElement(8) + series.AddElement(9) + series.AddElement(10) + series.AddElement(11) + series.AddElement(12) + + if _, sd := series.StandardDeviation(); !utilities.ApproximatelyEqual(sd, expected, 0.01) { + test.Fatalf("Standard deviation max calculation failed: Expected: %v; Actual: %v.", expected, sd) } else { test.Logf("Standard deviation calculation result: %v", sd) } @@ -252,6 +285,7 @@ func Test_CappedRotatingValues(test *testing.T) { test.Fatalf("Adding values does not properly erase earlier values.") } } + func Test_CappedLen(test *testing.T) { series := NewCappedMathematicalSeries[int](5) @@ -275,13 +309,16 @@ func Test_Capped_degenerate_percentile_too_high(test *testing.T) { test.Fatalf("Series percentile of 101 failed.") } } + func Test_Capped_degenerate_percentile_too_low(test *testing.T) { series := NewCappedMathematicalSeries[int](21) if series.Percentile(-1) != 0 { test.Fatalf("Series percentile of -1 failed.") } } + func Test_Capped90_percentile(test *testing.T) { + var expected int = 10 series := NewCappedMathematicalSeries[int](10) series.AddElement(10) series.AddElement(9) @@ -294,9 +331,9 @@ func Test_Capped90_percentile(test *testing.T) { series.AddElement(2) series.AddElement(1) - if series.Percentile(90) != 10 { + if series.Percentile(90) != expected { test.Fatalf( - "Series 90th percentile of 0 ... 10 failed: Expected 10 got %v.", + "Series 90th percentile of 0 ... 10 failed: Expected %v got %v.", expected, series.Percentile(90), ) } @@ -324,6 +361,7 @@ func Test_Capped90_percentile_reversed(test *testing.T) { } func Test_Capped50_percentile_jumbled(test *testing.T) { + var expected int64 = 15 series := NewCappedMathematicalSeries[int64](10) series.AddElement(7) series.AddElement(2) @@ -336,15 +374,16 @@ func Test_Capped50_percentile_jumbled(test *testing.T) { series.AddElement(11) series.AddElement(12) - if series.Percentile(50) != 15 { + if series.Percentile(50) != expected { test.Fatalf( - "Series 50 percentile of a jumble of numbers failed: Expected 15 got %v.", + "Series 50 percentile of a jumble of numbers failed: Expected %v got %v.", expected, series.Percentile(50), ) } } func Test_CappedDoubleSidedTrimmedMean_jumbled(test *testing.T) { + expected := 8 series := NewCappedMathematicalSeries[int64](10) series.AddElement(7) series.AddElement(2) @@ -360,10 +399,10 @@ func Test_CappedDoubleSidedTrimmedMean_jumbled(test *testing.T) { trimmed := series.DoubleSidedTrim(10) - if trimmed.Len() != 8 { + if trimmed.Len() != expected { test.Fatalf( "Capped series is not of the proper size. Expected %v and got %v", - 8, + expected, trimmed.Len(), ) } @@ -376,3 +415,17 @@ func Test_CappedDoubleSidedTrimmedMean_jumbled(test *testing.T) { prev = v } } + +func Test_CappedAverage(test *testing.T) { + expected := 1.0082230220488836e+08 + series := NewCappedMathematicalSeries[float64](4) + series.AddElement(9.94747772516195e+07) + series.AddElement(9.991286984703423e+07) + series.AddElement(1.0285437111086299e+08) + series.AddElement(1.0104719061003672e+08) + if average := series.CalculateAverage(); !utilities.ApproximatelyEqual(average, 0.01, expected) { + test.Fatalf( + "Expected: %v; Actual: %v.", average, expected, + ) + } +} diff --git a/networkQuality.go b/networkQuality.go index f97b27c..776c0e7 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -22,6 +22,7 @@ import ( "net/url" "os" "runtime/pprof" + "strings" "time" "github.com/network-quality/goresponsiveness/ccw" @@ -29,6 +30,7 @@ import ( "github.com/network-quality/goresponsiveness/constants" "github.com/network-quality/goresponsiveness/datalogger" "github.com/network-quality/goresponsiveness/debug" + "github.com/network-quality/goresponsiveness/direction" "github.com/network-quality/goresponsiveness/extendedstats" "github.com/network-quality/goresponsiveness/lgc" "github.com/network-quality/goresponsiveness/ms" @@ -68,10 +70,46 @@ var ( "Enable debugging.", ) rpmtimeout = flag.Int( - "rpmtimeout", - constants.RPMCalculationTime, - "Maximum time to spend calculating RPM (i.e., total test time.).", + "rpm.timeout", + constants.DefaultTestTime, + "Maximum time (in seconds) to spend calculating RPM (i.e., total test time.).", ) + rpmmad = flag.Int( + "rpm.mad", + constants.SpecParameterCliOptionsDefaults.Mad, + "Moving average distance -- number of intervals considered during stability calculations.", + ) + rpmid = flag.Int( + "rpm.id", + constants.SpecParameterCliOptionsDefaults.Id, + "Duration of the interval between re-evaluating the network conditions (in seconds).", + ) + rpmtmp = flag.Uint( + "rpm.tmp", + constants.SpecParameterCliOptionsDefaults.Tmp, + "Percent of measurements to trim when calculating statistics about network conditions (between 0 and 100).", + ) + rpmsdt = flag.Float64( + "rpm.sdt", + constants.SpecParameterCliOptionsDefaults.Sdt, + "Cutoff in the standard deviation of measured values about network conditions between unstable and stable.", + ) + rpmmnp = flag.Int( + "rpm.mnp", + constants.SpecParameterCliOptionsDefaults.Mnp, + "Maximimum number of parallel connections to establish when attempting to reach working conditions.", + ) + rpmmps = flag.Int( + "rpm.mps", + constants.SpecParameterCliOptionsDefaults.Mps, + "Maximimum number of probes to send per second.", + ) + rpmptc = flag.Float64( + "rpm.ptc", + constants.SpecParameterCliOptionsDefaults.Ptc, + "Percentage of the (discovered) total network capacity that probes are allowed to consume.", + ) + sslKeyFileName = flag.String( "ssl-key-file", "", @@ -97,11 +135,6 @@ var ( "", "Store granular information about tests results in files with this basename. Time and information type will be appended (before the first .) to create separate log files. Disabled by default.", ) - probeIntervalTime = flag.Uint( - "probe-interval-time", - 100, - "Time (in ms) between probes (foreign and self).", - ) connectToAddr = flag.String( "connect-to", "", @@ -132,8 +165,26 @@ func main() { os.Exit(0) } - timeoutDuration := time.Second * time.Duration(*rpmtimeout) - timeoutAbsoluteTime := time.Now().Add(timeoutDuration) + var debugLevel debug.DebugLevel = debug.Error + + if *debugCliFlag { + debugLevel = debug.Debug + } + + specParameters, err := rpm.SpecParametersFromArguments(*rpmtimeout, *rpmmad, *rpmid, + *rpmtmp, *rpmsdt, *rpmmnp, *rpmmps, *rpmptc) + if err != nil { + fmt.Fprintf( + os.Stderr, + "Error: There was an error configuring the test with user-supplied parameters: %v\n", + err, + ) + os.Exit(1) + } + + if debug.IsDebug(debugLevel) { + fmt.Printf("Running the test according to the following spec parameters:\n%v\n", specParameters.ToString()) + } var configHostPort string @@ -158,30 +209,14 @@ func main() { // the others. operatingCtx, operatingCtxCancel := context.WithCancel(context.Background()) - // The operator contexts. These contexts control the processes that manage - // network activity but do not control network activity. - - uploadLoadGeneratorOperatorCtx, uploadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx) - downloadLoadGeneratorOperatorCtx, downloadLoadGeneratorOperatorCtxCancel := context.WithCancel(operatingCtx) - proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx) - - // This context is used to control the network activity (i.e., it controls all - // the connections that are open to do load generation and probing). Cancelling this context will close - // all the network connections that are responsible for generating the load. - networkActivityCtx, networkActivityCtxCancel := context.WithCancel(operatingCtx) - config := &config.Config{ ConnectToAddr: *connectToAddr, } - var debugLevel debug.DebugLevel = debug.Error - - if *debugCliFlag { - debugLevel = debug.Debug - } if *calculateExtendedStats && !extendedstats.ExtendedStatsAvailable() { *calculateExtendedStats = false - fmt.Printf( + fmt.Fprintf( + os.Stderr, "Warning: Calculation of extended statistics was requested but is not supported on this platform.\n", ) } @@ -223,53 +258,14 @@ func main() { fmt.Printf("Configuration: %s\n", config) } - timeoutChannel := timeoutat.TimeoutAt( - operatingCtx, - timeoutAbsoluteTime, - debugLevel, - ) - if debug.IsDebug(debugLevel) { - fmt.Printf("Test will end no later than %v\n", timeoutAbsoluteTime) - } - - // print the banner - dt := time.Now().UTC() - fmt.Printf( - "%s UTC Go Responsiveness to %s...\n", - dt.Format("01-02-2006 15:04:05"), - configHostPort, - ) - - if len(*profile) != 0 { - f, err := os.Create(*profile) - if err != nil { - fmt.Fprintf( - os.Stderr, - "Error: Profiling requested but could not open the log file ( %s ) for writing: %v\n", - *profile, - err, - ) - os.Exit(1) - } - pprof.StartCPUProfile(f) - defer pprof.StopCPUProfile() - } - var selfProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] = nil - var foreignProbeDataLogger datalogger.DataLogger[probe.ProbeDataPoint] = nil - var downloadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil - var uploadThroughputDataLogger datalogger.DataLogger[rpm.ThroughputDataPoint] = nil - var granularThroughputDataLogger datalogger.DataLogger[rpm.GranularThroughputDataPoint] = nil + downloadDirection := direction.Direction{} + uploadDirection := direction.Direction{} // User wants to log data if *dataLoggerBaseFileName != "" { var err error = nil unique := time.Now().UTC().Format("01-02-2006-15-04-05") - dataLoggerSelfFilename := utilities.FilenameAppend(*dataLoggerBaseFileName, "-self-"+unique) - dataLoggerForeignFilename := utilities.FilenameAppend( - *dataLoggerBaseFileName, - "-foreign-"+unique, - ) dataLoggerDownloadThroughputFilename := utilities.FilenameAppend( *dataLoggerBaseFileName, "-throughput-download-"+unique, @@ -278,100 +274,158 @@ func main() { *dataLoggerBaseFileName, "-throughput-upload-"+unique, ) - dataLoggerGranularThroughputFilename := utilities.FilenameAppend( + + dataLoggerDownloadGranularThroughputFilename := utilities.FilenameAppend( *dataLoggerBaseFileName, - "-throughput-granular-"+unique, + "-throughput-download-granular-"+unique, ) - selfProbeDataLogger, err = datalogger.CreateCSVDataLogger[probe.ProbeDataPoint]( + dataLoggerUploadGranularThroughputFilename := utilities.FilenameAppend( + *dataLoggerBaseFileName, + "-throughput-upload-granular-"+unique, + ) + + dataLoggerSelfFilename := utilities.FilenameAppend(*dataLoggerBaseFileName, "-self-"+unique) + dataLoggerForeignFilename := utilities.FilenameAppend( + *dataLoggerBaseFileName, + "-foreign-"+unique, + ) + + selfProbeDataLogger, err := datalogger.CreateCSVDataLogger[probe.ProbeDataPoint]( dataLoggerSelfFilename, ) if err != nil { - fmt.Printf( + fmt.Fprintf( + os.Stderr, "Warning: Could not create the file for storing self probe results (%s). Disabling functionality.\n", dataLoggerSelfFilename, ) selfProbeDataLogger = nil } + uploadDirection.SelfProbeDataLogger = selfProbeDataLogger + downloadDirection.SelfProbeDataLogger = selfProbeDataLogger - foreignProbeDataLogger, err = datalogger.CreateCSVDataLogger[probe.ProbeDataPoint]( + foreignProbeDataLogger, err := datalogger.CreateCSVDataLogger[probe.ProbeDataPoint]( dataLoggerForeignFilename, ) if err != nil { - fmt.Printf( + fmt.Fprintf( + os.Stderr, "Warning: Could not create the file for storing foreign probe results (%s). Disabling functionality.\n", dataLoggerForeignFilename, ) foreignProbeDataLogger = nil } + uploadDirection.ForeignProbeDataLogger = selfProbeDataLogger + downloadDirection.ForeignProbeDataLogger = foreignProbeDataLogger - downloadThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint]( + downloadDirection.ThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint]( dataLoggerDownloadThroughputFilename, ) if err != nil { - fmt.Printf( + fmt.Fprintf( + os.Stderr, "Warning: Could not create the file for storing download throughput results (%s). Disabling functionality.\n", dataLoggerDownloadThroughputFilename, ) - downloadThroughputDataLogger = nil + downloadDirection.ThroughputDataLogger = nil } - - uploadThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint]( + uploadDirection.ThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.ThroughputDataPoint]( dataLoggerUploadThroughputFilename, ) if err != nil { - fmt.Printf( + fmt.Fprintf( + os.Stderr, "Warning: Could not create the file for storing upload throughput results (%s). Disabling functionality.\n", dataLoggerUploadThroughputFilename, ) - uploadThroughputDataLogger = nil + uploadDirection.ThroughputDataLogger = nil } - granularThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.GranularThroughputDataPoint]( - dataLoggerGranularThroughputFilename, + downloadDirection.GranularThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.GranularThroughputDataPoint]( + dataLoggerDownloadGranularThroughputFilename, ) if err != nil { - fmt.Printf( - "Warning: Could not create the file for storing granular throughput results (%s). Disabling functionality.\n", - dataLoggerGranularThroughputFilename, + fmt.Fprintf( + os.Stderr, + "Warning: Could not create the file for storing download granular throughput results (%s). Disabling functionality.\n", + dataLoggerDownloadGranularThroughputFilename, + ) + downloadDirection.GranularThroughputDataLogger = nil + } + uploadDirection.GranularThroughputDataLogger, err = datalogger.CreateCSVDataLogger[rpm.GranularThroughputDataPoint]( + dataLoggerUploadGranularThroughputFilename, + ) + if err != nil { + fmt.Fprintf( + os.Stderr, + "Warning: Could not create the file for storing upload granular throughput results (%s). Disabling functionality.\n", + dataLoggerUploadGranularThroughputFilename, ) - granularThroughputDataLogger = nil + uploadDirection.GranularThroughputDataLogger = nil } + } // If, for some reason, the data loggers are nil, make them Null Data Loggers so that we don't have conditional // code later. - if selfProbeDataLogger == nil { - selfProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]() + if downloadDirection.SelfProbeDataLogger == nil { + downloadDirection.SelfProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]() + } + if uploadDirection.SelfProbeDataLogger == nil { + uploadDirection.SelfProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]() } - if foreignProbeDataLogger == nil { - foreignProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]() + + if downloadDirection.ForeignProbeDataLogger == nil { + downloadDirection.ForeignProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]() } - if downloadThroughputDataLogger == nil { - downloadThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]() + if uploadDirection.ForeignProbeDataLogger == nil { + uploadDirection.ForeignProbeDataLogger = datalogger.CreateNullDataLogger[probe.ProbeDataPoint]() } - if uploadThroughputDataLogger == nil { - uploadThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]() + + if downloadDirection.ThroughputDataLogger == nil { + downloadDirection.ThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]() + } + if uploadDirection.ThroughputDataLogger == nil { + uploadDirection.ThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.ThroughputDataPoint]() + } + + if downloadDirection.GranularThroughputDataLogger == nil { + downloadDirection.GranularThroughputDataLogger = + datalogger.CreateNullDataLogger[rpm.GranularThroughputDataPoint]() } - if granularThroughputDataLogger == nil { - granularThroughputDataLogger = datalogger.CreateNullDataLogger[rpm.GranularThroughputDataPoint]() + if uploadDirection.GranularThroughputDataLogger == nil { + uploadDirection.GranularThroughputDataLogger = + datalogger.CreateNullDataLogger[rpm.GranularThroughputDataPoint]() } /* * Create (and then, ironically, name) two anonymous functions that, when invoked, * will create load-generating connections for upload/download */ - generateLgdc := func() lgc.LoadGeneratingConnection { + downloadDirection.CreateLgdc = func() lgc.LoadGeneratingConnection { lgd := lgc.NewLoadGeneratingConnectionDownload(config.Urls.LargeUrl, sslKeyFileConcurrentWriter, config.ConnectToAddr, *insecureSkipVerify) return &lgd } - - generateLguc := func() lgc.LoadGeneratingConnection { + uploadDirection.CreateLgdc = func() lgc.LoadGeneratingConnection { lgu := lgc.NewLoadGeneratingConnectionUpload(config.Urls.UploadUrl, sslKeyFileConcurrentWriter, config.ConnectToAddr, *insecureSkipVerify) return &lgu } + downloadDirection.DirectionDebugging = debug.NewDebugWithPrefix(debugLevel, "download") + downloadDirection.ProbeDebugging = debug.NewDebugWithPrefix(debugLevel, "download probe") + + uploadDirection.DirectionDebugging = debug.NewDebugWithPrefix(debugLevel, "upload") + uploadDirection.ProbeDebugging = debug.NewDebugWithPrefix(debugLevel, "upload probe") + + downloadDirection.Lgcc = lgc.NewLoadGeneratingConnectionCollection() + uploadDirection.Lgcc = lgc.NewLoadGeneratingConnectionCollection() + + // We do not do tracing on upload connections so there are no extended stats for those connections! + uploadDirection.ExtendedStatsEligible = false + downloadDirection.ExtendedStatsEligible = true + generateSelfProbeConfiguration := func() probe.ProbeConfiguration { return probe.ProbeConfiguration{ URL: config.Urls.SmallUrl, @@ -388,421 +442,391 @@ func main() { } } - var downloadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "download") - var uploadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "upload") - var combinedProbeDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "combined probe") - - downloadLoadGeneratingConnectionCollection := lgc.NewLoadGeneratingConnectionCollection() - uploadLoadGeneratingConnectionCollection := lgc.NewLoadGeneratingConnectionCollection() + downloadDirection.DirectionLabel = "Download" + uploadDirection.DirectionLabel = "Upload" - // TODO: Separate contexts for load generation and data collection. If we do that, if either of the two - // data collection go routines stops well before the other, they will continue to send probes and we can - // generate additional information! + directions := []*direction.Direction{&downloadDirection, &uploadDirection} - selfDownProbeConnectionCommunicationChannel, downloadThroughputChannel := rpm.LoadGenerator( - networkActivityCtx, - downloadLoadGeneratorOperatorCtx, - time.Second, - generateLgdc, - &downloadLoadGeneratingConnectionCollection, - *calculateExtendedStats, - downloadDebugging, - ) - selfUpProbeConnectionCommunicationChannel, uploadThroughputChannel := rpm.LoadGenerator( - networkActivityCtx, - uploadLoadGeneratorOperatorCtx, - time.Second, - generateLguc, - &uploadLoadGeneratingConnectionCollection, - *calculateExtendedStats, - uploadDebugging, + // print the banner + dt := time.Now().UTC() + fmt.Printf( + "%s UTC Go Responsiveness to %s...\n", + dt.Format("01-02-2006 15:04:05"), + configHostPort, ) - // Handles for the first connection that the load-generating go routines (both up and - // download) open are passed back on the self[Down|Up]ProbeConnectionCommunicationChannel - // so that we can then start probes on those connections. - selfDownProbeConnection := <-selfDownProbeConnectionCommunicationChannel - selfUpProbeConnection := <-selfUpProbeConnectionCommunicationChannel + if len(*profile) != 0 { + f, err := os.Create(*profile) + if err != nil { + fmt.Fprintf( + os.Stderr, + "Error: Profiling requested but could not open the log file ( %s ) for writing: %v\n", + *profile, + err, + ) + os.Exit(1) + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } + + // All tests will accumulate data to these series because it will all matter for RPM calculation! + selfRtts := ms.NewInfiniteMathematicalSeries[float64]() + foreignRtts := ms.NewInfiniteMathematicalSeries[float64]() - // The combined prober will handle launching, monitoring, etc of *both* the self and foreign - // probes. - probeDataPointsChannel := rpm.CombinedProber( - proberOperatorCtx, - networkActivityCtx, - generateForeignProbeConfiguration, - generateSelfProbeConfiguration, - selfDownProbeConnection, - selfUpProbeConnection, - time.Millisecond*(time.Duration(*probeIntervalTime)), - sslKeyFileConcurrentWriter, - *calculateExtendedStats, - combinedProbeDebugging, - ) + var selfRttsQualityAttenuation *qualityattenuation.SimpleQualityAttenuation = nil + if *printQualityAttenuation { + selfRttsQualityAttenuation = qualityattenuation.NewSimpleQualityAttenuation() + } - responsivenessIsStable := false - downloadThroughputIsStable := false - uploadThroughputIsStable := false + for _, direction := range directions { - // Test parameters: - // 1. I: The number of previous instantaneous measurements to consider when generating - // the so-called instantaneous moving averages. - // 2. K: The number of instantaneous moving averages to consider when determining stability. - // 3: S: The standard deviation cutoff used to determine stability among the K preceding - // moving averages of a measurement. - // See + timeoutDuration := specParameters.TestTimeout + timeoutAbsoluteTime := time.Now().Add(timeoutDuration) - throughputI := constants.InstantaneousThroughputMeasurementCount - probeI := constants.InstantaneousProbeMeasurementCount - K := constants.InstantaneousMovingAverageStabilityCount - S := constants.StabilityStandardDeviation + timeoutChannel := timeoutat.TimeoutAt( + operatingCtx, + timeoutAbsoluteTime, + debugLevel, + ) + if debug.IsDebug(debugLevel) { + fmt.Printf("%s Test will end no later than %v\n", direction.DirectionLabel, timeoutAbsoluteTime) + } - downloadThroughputStabilizerDebugConfig := - debug.NewDebugWithPrefix(debug.Debug, "Download Throughput Stabilizer") - downloadThroughputStabilizerDebugLevel := debug.Error - if *debugCliFlag { - downloadThroughputStabilizerDebugLevel = debug.Debug - } - downloadThroughputStabilizer := stabilizer.NewThroughputStabilizer(throughputI, K, S, - downloadThroughputStabilizerDebugLevel, downloadThroughputStabilizerDebugConfig) + throughputCtx, throughputCtxCancel := context.WithCancel(operatingCtx) + proberOperatorCtx, proberOperatorCtxCancel := context.WithCancel(operatingCtx) - uploadThroughputStabilizerDebugConfig := - debug.NewDebugWithPrefix(debug.Debug, "Upload Throughput Stabilizer") - uploadThroughputStabilizerDebugLevel := debug.Error - if *debugCliFlag { - uploadThroughputStabilizerDebugLevel = debug.Debug - } - uploadThroughputStabilizer := stabilizer.NewThroughputStabilizer(throughputI, K, S, - uploadThroughputStabilizerDebugLevel, uploadThroughputStabilizerDebugConfig) + // This context is used to control the network activity (i.e., it controls all + // the connections that are open to do load generation and probing). Cancelling this context will close + // all the network connections that are responsible for generating the load. + networkActivityCtx, networkActivityCtxCancel := context.WithCancel(operatingCtx) - probeStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, "Probe Stabilizer") - probeStabilizerDebugLevel := debug.Error - if *debugCliFlag { - probeStabilizerDebugLevel = debug.Debug - } - probeStabilizer := stabilizer.NewProbeStabilizer(probeI, K, S, probeStabilizerDebugLevel, probeStabilizerDebugConfig) + throughputGeneratorCtx, throughputGeneratorCtxCancel := context.WithCancel(throughputCtx) - selfRtts := ms.NewInfiniteMathematicalSeries[float64]() - selfRttsQualityAttenuation := qualityattenuation.NewSimpleQualityAttenuation() - foreignRtts := ms.NewInfiniteMathematicalSeries[float64]() + lgStabilizationCommunicationChannel := rpm.LoadGenerator( + throughputCtx, + networkActivityCtx, + throughputGeneratorCtx, + specParameters.EvalInterval, + direction.CreateLgdc, + &direction.Lgcc, + specParameters.MaxParallelConns, + *calculateExtendedStats, + direction.DirectionDebugging, + ) - // For later debugging output, record the last throughputs on load-generating connectings - // and the number of open connections. - lastUploadThroughputRate := float64(0) - lastUploadThroughputOpenConnectionCount := int(0) - lastDownloadThroughputRate := float64(0) - lastDownloadThroughputOpenConnectionCount := int(0) + throughputStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, + fmt.Sprintf("%v Throughput Stabilizer", direction.DirectionLabel)) + downloadThroughputStabilizerDebugLevel := debug.Error + if *debugCliFlag { + downloadThroughputStabilizerDebugLevel = debug.Debug + } + throughputStabilizer := stabilizer.NewStabilizer[float64]( + uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance, 0, "bytes", + downloadThroughputStabilizerDebugLevel, throughputStabilizerDebugConfig) - // Every time that there is a new measurement, the possibility exists that the measurements become unstable. - // This allows us to continue pushing until *everything* is stable at the same time. -timeout: - for !(responsivenessIsStable && downloadThroughputIsStable && uploadThroughputIsStable) { - select { + responsivenessStabilizerDebugConfig := debug.NewDebugWithPrefix(debug.Debug, + fmt.Sprintf("%v Responsiveness Stabilizer", direction.DirectionLabel)) + responsivenessStabilizerDebugLevel := debug.Error + if *debugCliFlag { + responsivenessStabilizerDebugLevel = debug.Debug + } + responsivenessStabilizer := stabilizer.NewStabilizer[int64]( + uint(specParameters.MovingAvgDist), specParameters.StdDevTolerance, + specParameters.TrimmedMeanPct, "milliseconds", + responsivenessStabilizerDebugLevel, responsivenessStabilizerDebugConfig) - case downloadThroughputMeasurement := <-downloadThroughputChannel: - { - downloadThroughputStabilizer.AddMeasurement(downloadThroughputMeasurement) - downloadThroughputIsStable = downloadThroughputStabilizer.IsStable() - if *debugCliFlag { - fmt.Printf( - "################# Download is instantaneously %s.\n", - utilities.Conditional(downloadThroughputIsStable, "stable", "unstable")) - } - downloadThroughputDataLogger.LogRecord(downloadThroughputMeasurement) - for i := range downloadThroughputMeasurement.GranularThroughputDataPoints { - datapoint := downloadThroughputMeasurement.GranularThroughputDataPoints[i] - datapoint.Direction = "Download" - granularThroughputDataLogger.LogRecord(datapoint) - } + // For later debugging output, record the last throughputs on load-generating connectings + // and the number of open connections. + lastThroughputRate := float64(0) + lastThroughputOpenConnectionCount := int(0) - lastDownloadThroughputRate = downloadThroughputMeasurement.Throughput - lastDownloadThroughputOpenConnectionCount = - downloadThroughputMeasurement.Connections - } + lg_timeout: + for !direction.StableThroughput { + select { + case throughputMeasurement := <-lgStabilizationCommunicationChannel: + { + throughputStabilizer.AddMeasurement( + throughputMeasurement.Throughput) + direction.StableThroughput = throughputStabilizer.IsStable() + if *debugCliFlag { + fmt.Printf( + "################# %v is instantaneously %s.\n", direction.DirectionLabel, + utilities.Conditional(direction.StableThroughput, "stable", "unstable")) + } + direction.ThroughputDataLogger.LogRecord(throughputMeasurement) + for i := range throughputMeasurement.GranularThroughputDataPoints { + datapoint := throughputMeasurement.GranularThroughputDataPoints[i] + datapoint.Direction = "Download" + direction.GranularThroughputDataLogger.LogRecord(datapoint) + } + + lastThroughputRate = throughputMeasurement.Throughput + lastThroughputOpenConnectionCount = throughputMeasurement.Connections - case uploadThroughputMeasurement := <-uploadThroughputChannel: - { - uploadThroughputStabilizer.AddMeasurement(uploadThroughputMeasurement) - uploadThroughputIsStable = uploadThroughputStabilizer.IsStable() - if *debugCliFlag { - fmt.Printf( - "################# Upload is instantaneously %s.\n", - utilities.Conditional(uploadThroughputIsStable, "stable", "unstable")) + if direction.StableThroughput { + throughputGeneratorCtxCancel() + } } - uploadThroughputDataLogger.LogRecord(uploadThroughputMeasurement) - for i := range uploadThroughputMeasurement.GranularThroughputDataPoints { - datapoint := uploadThroughputMeasurement.GranularThroughputDataPoints[i] - datapoint.Direction = "Upload" - granularThroughputDataLogger.LogRecord(datapoint) + case <-timeoutChannel: + { + break lg_timeout } + } + } - lastUploadThroughputRate = uploadThroughputMeasurement.Throughput - lastUploadThroughputOpenConnectionCount = uploadThroughputMeasurement.Connections + if direction.StableThroughput { + if *debugCliFlag { + fmt.Printf("################# Throughput is stable; beginning responsiveness testing.\n") } - case probeMeasurement := <-probeDataPointsChannel: - { - probeStabilizer.AddMeasurement(probeMeasurement) + } else { + fmt.Fprintf(os.Stderr, "Warning: Throughput stability could not be reached. Adding 15 seconds to calculate speculative RPM results.\n") + speculativeTimeoutDuration := time.Second * 15 + speculativeAbsoluteTimeoutTime := time.Now().Add(speculativeTimeoutDuration) + timeoutChannel = timeoutat.TimeoutAt( + operatingCtx, + speculativeAbsoluteTimeoutTime, + debugLevel, + ) + } - // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy - // is *actually* important, but it can't hurt? - responsivenessIsStable = probeStabilizer.IsStable() + perDirectionSelfRtts := ms.NewInfiniteMathematicalSeries[float64]() + perDirectionForeignRtts := ms.NewInfiniteMathematicalSeries[float64]() - if *debugCliFlag { - fmt.Printf( - "################# Responsiveness is instantaneously %s.\n", - utilities.Conditional(responsivenessIsStable, "stable", "unstable")) - } - if probeMeasurement.Type == probe.Foreign { + responsivenessStabilizationCommunicationChannel := rpm.ResponsivenessProber( + proberOperatorCtx, + networkActivityCtx, + generateForeignProbeConfiguration, + generateSelfProbeConfiguration, + &direction.Lgcc, + direction.CreateLgdc().Direction(), // TODO: This could be better! + specParameters.ProbeInterval, + sslKeyFileConcurrentWriter, + *calculateExtendedStats, + direction.ProbeDebugging, + ) + + responsiveness_timeout: + for !direction.StableResponsiveness { + select { + case probeMeasurement := <-responsivenessStabilizationCommunicationChannel: + { + foreignDataPoint := probeMeasurement.First + selfDataPoint := probeMeasurement.Second + + responsivenessStabilizer.AddMeasurement( + (foreignDataPoint.Duration + selfDataPoint.Duration).Milliseconds()) + + // Check stabilization immediately -- this could change if we wait. Not sure if the immediacy + // is *actually* important, but it can't hurt? + direction.StableResponsiveness = responsivenessStabilizer.IsStable() + + if *debugCliFlag { + fmt.Printf( + "################# Responsiveness is instantaneously %s.\n", + utilities.Conditional(direction.StableResponsiveness, "stable", "unstable")) + } // There may be more than one round trip accumulated together. If that is the case, // we will blow them apart in to three separate measurements and each one will just // be 1 / measurement.RoundTripCount of the total length. - for range utilities.Iota(0, int(probeMeasurement.RoundTripCount)) { - foreignRtts.AddElement(probeMeasurement.Duration.Seconds() / - float64(probeMeasurement.RoundTripCount)) + for range utilities.Iota(0, int(foreignDataPoint.RoundTripCount)) { + foreignRtts.AddElement(foreignDataPoint.Duration.Seconds() / + float64(foreignDataPoint.RoundTripCount)) + perDirectionForeignRtts.AddElement(foreignDataPoint.Duration.Seconds() / + float64(foreignDataPoint.RoundTripCount)) } - } else if probeMeasurement.Type == probe.SelfDown || probeMeasurement.Type == probe.SelfUp { - selfRtts.AddElement(probeMeasurement.Duration.Seconds()) - if *printQualityAttenuation { - selfRttsQualityAttenuation.AddSample(probeMeasurement.Duration.Seconds()) + selfRtts.AddElement(selfDataPoint.Duration.Seconds()) + perDirectionSelfRtts.AddElement(selfDataPoint.Duration.Seconds()) + + if selfRttsQualityAttenuation != nil { + selfRttsQualityAttenuation.AddSample(selfDataPoint.Duration.Seconds()) } + + direction.ForeignProbeDataLogger.LogRecord(*foreignDataPoint) + direction.SelfProbeDataLogger.LogRecord(*selfDataPoint) } + case throughputMeasurement := <-lgStabilizationCommunicationChannel: + { + if *debugCliFlag { + fmt.Printf("Adding a throughput measurement.\n") + } + // There may be more than one round trip accumulated together. If that is the case, + direction.ThroughputDataLogger.LogRecord(throughputMeasurement) + for i := range throughputMeasurement.GranularThroughputDataPoints { + datapoint := throughputMeasurement.GranularThroughputDataPoints[i] + datapoint.Direction = direction.DirectionLabel + direction.GranularThroughputDataLogger.LogRecord(datapoint) + } + + lastThroughputRate = throughputMeasurement.Throughput + lastThroughputOpenConnectionCount = throughputMeasurement.Connections - if probeMeasurement.Type == probe.Foreign { - foreignProbeDataLogger.LogRecord(probeMeasurement) - } else if probeMeasurement.Type == probe.SelfDown || probeMeasurement.Type == probe.SelfUp { - selfProbeDataLogger.LogRecord(probeMeasurement) } - } - case <-timeoutChannel: - { - break timeout + case <-timeoutChannel: + { + break responsiveness_timeout + } } } - } - - // TODO: Reset timeout to RPM timeout stat? - // Did the test run to stability? - testRanToStability := (downloadThroughputIsStable && uploadThroughputIsStable && responsivenessIsStable) + // Did the test run to stability? + testRanToStability := direction.StableThroughput && direction.StableResponsiveness - if *debugCliFlag { - fmt.Printf("Stopping all the load generating data generators (stability: %s).\n", - utilities.Conditional(testRanToStability, "success", "failure")) - } - - /* At this point there are - 1. Load generators running - -- uploadLoadGeneratorOperatorCtx - -- downloadLoadGeneratorOperatorCtx - 2. Network connections opened by those load generators: - -- lgNetworkActivityCtx - 3. Probes - -- proberCtx - */ + if *debugCliFlag { + fmt.Printf("Stopping all the load generating data generators (stability: %s).\n", + utilities.Conditional(testRanToStability, "success", "failure")) + } - // First, stop the load generator and the probe operators (but *not* the network activity) - proberOperatorCtxCancel() - downloadLoadGeneratorOperatorCtxCancel() - uploadLoadGeneratorOperatorCtxCancel() + /* At this point there are + 1. Load generators running + -- uploadLoadGeneratorOperatorCtx + -- downloadLoadGeneratorOperatorCtx + 2. Network connections opened by those load generators: + -- lgNetworkActivityCtx + 3. Probes + -- proberCtx + */ - // Second, calculate the extended stats (if the user requested) + // First, stop the load generator and the probe operators (but *not* the network activity) + proberOperatorCtxCancel() + throughputCtxCancel() - extendedStats := extendedstats.AggregateExtendedStats{} - if *calculateExtendedStats { - if extendedstats.ExtendedStatsAvailable() { - func() { - // Put inside an IIFE so that we can use a defer! - downloadLoadGeneratingConnectionCollection.Lock.Lock() - defer downloadLoadGeneratingConnectionCollection.Lock.Unlock() + // Second, calculate the extended stats (if the user requested and they are available for the direction) + extendedStats := extendedstats.AggregateExtendedStats{} + if *calculateExtendedStats && direction.ExtendedStatsEligible { + if extendedstats.ExtendedStatsAvailable() { + func() { + // Put inside an IIFE so that we can use a defer! + direction.Lgcc.Lock.Lock() + defer direction.Lgcc.Lock.Unlock() - // Note: We do not trace upload connections! - for i := 0; i < downloadLoadGeneratingConnectionCollection.Len(); i++ { - // Assume that extended statistics are available -- the check was done explicitly at - // program startup if the calculateExtendedStats flag was set by the user on the command line. - currentLgc, _ := downloadLoadGeneratingConnectionCollection.Get(i) - if err := extendedStats.IncorporateConnectionStats( - (*currentLgc).Stats().ConnInfo.Conn); err != nil { + // Note: We do not trace upload connections! + downloadLgcCount, err := direction.Lgcc.Len() + if err != nil { fmt.Fprintf( os.Stderr, - "Warning: Could not add extended stats for the connection: %v\n", - err, + "Warning: Could not calculate the number of download load-generating connections; aborting extended stats preparation.\n", ) + return } - } - }() - } else { - // TODO: Should we just log here? - panic("Extended stats are not available but the user requested their calculation.") + for i := 0; i < downloadLgcCount; i++ { + // Assume that extended statistics are available -- the check was done explicitly at + // program startup if the calculateExtendedStats flag was set by the user on the command line. + currentLgc, _ := direction.Lgcc.Get(i) + if err := extendedStats.IncorporateConnectionStats( + (*currentLgc).Stats().ConnInfo.Conn); err != nil { + fmt.Fprintf( + os.Stderr, + "Warning: Could not add extended stats for the connection: %v\n", + err, + ) + } + } + }() + } else { + // TODO: Should we just log here? + panic("Extended stats are not available but the user requested their calculation.") + } } - } - // Third, stop the network connections opened by the load generators and probers. - networkActivityCtxCancel() + // Third, stop the network connections opened by the load generators and probers. + networkActivityCtxCancel() - // Finally, stop the world. - operatingCtxCancel() - - // Calculate the RPM - - // First, let's do a double-sided trim of the top/bottom 10% of our measurements. - selfRttsTotalCount := selfRtts.Len() - foreignRttsTotalCount := foreignRtts.Len() - - selfRttsTrimmed := selfRtts.DoubleSidedTrim(10) - foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(10) - - selfRttsTrimmedCount := selfRttsTrimmed.Len() - foreignRttsTrimmedCount := foreignRttsTrimmed.Len() - - // Then, let's take the mean of those ... - selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage() - foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage() + fmt.Printf( + "%v: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", + direction.DirectionLabel, + utilities.ToMbps(lastThroughputRate), + utilities.ToMBps(lastThroughputRate), + lastThroughputOpenConnectionCount, + ) - // Second, let's do the P90 calculations. - selfProbeRoundTripTimeP90 := selfRtts.Percentile(90) - foreignProbeRoundTripTimeP90 := foreignRtts.Percentile(90) + if *calculateExtendedStats { + fmt.Println(extendedStats.Repr()) + } + directionResult := rpm.CalculateRpm(perDirectionSelfRtts, perDirectionForeignRtts, specParameters.TrimmedMeanPct, 90) + if *debugCliFlag { + fmt.Printf("(%s RPM Calculation stats): %v\n", direction.DirectionLabel, directionResult.ToString()) + } - // Note: The specification indicates that we want to calculate the foreign probes as such: - // 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign - // where tcp_foreign, tls_foreign, http_foreign are the P90 RTTs for the connection - // of the tcp, tls and http connections, respectively. However, we cannot break out - // the individual RTTs so we assume that they are roughly equal. + if !testRanToStability { + fmt.Printf("Test did not run to stability, these results are estimates:\n") + } - // This is 60 because we measure in seconds not ms - p90Rpm := 60.0 / (float64(selfProbeRoundTripTimeP90+foreignProbeRoundTripTimeP90) / 2.0) - meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0) + fmt.Printf("%s RPM: %5.0f (P%d)\n", direction.DirectionLabel, directionResult.PNRpm, 90) + fmt.Printf("%s RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n", direction.DirectionLabel, + directionResult.MeanRpm, specParameters.TrimmedMeanPct) - if *debugCliFlag { - fmt.Printf( - `Total Self Probes: %d -Total Foreign Probes: %d -Trimmed Self Probes Count: %d -Trimmed Foreign Probes Count: %d -P90 Self RTT: %f -P90 Foreign RTT: %f -Trimmed Mean Self RTT: %f -Trimmed Mean Foreign RTT: %f -`, - selfRttsTotalCount, - foreignRttsTotalCount, - selfRttsTrimmedCount, - foreignRttsTrimmedCount, - selfProbeRoundTripTimeP90, - foreignProbeRoundTripTimeP90, - selfProbeRoundTripTimeMean, - foreignProbeRoundTripTimeMean, - ) - } + if len(*prometheusStatsFilename) > 0 { + var testStable int + if testRanToStability { + testStable = 1 + } + var buffer bytes.Buffer + buffer.WriteString(fmt.Sprintf("networkquality_%v_test_stable %d\n", + strings.ToLower(direction.DirectionLabel), testStable)) + buffer.WriteString(fmt.Sprintf("networkquality_%v_p90_rpm_value %d\n", + strings.ToLower(direction.DirectionLabel), int64(directionResult.PNRpm))) + buffer.WriteString(fmt.Sprintf("networkquality_%v_trimmed_rpm_value %d\n", + strings.ToLower(direction.DirectionLabel), + int64(directionResult.MeanRpm))) - if *printQualityAttenuation { - fmt.Println("Quality Attenuation Statistics:") - fmt.Printf( - `Number of losses: %d -Number of samples: %d -Loss: %f -Min: %.6f -Max: %.6f -Mean: %.6f -Variance: %.6f -Standard Deviation: %.6f -PDV(90): %.6f -PDV(99): %.6f -P(90): %.6f -P(99): %.6f -`, selfRttsQualityAttenuation.GetNumberOfLosses(), - selfRttsQualityAttenuation.GetNumberOfSamples(), - selfRttsQualityAttenuation.GetLossPercentage(), - selfRttsQualityAttenuation.GetMinimum(), - selfRttsQualityAttenuation.GetMaximum(), - selfRttsQualityAttenuation.GetAverage(), - selfRttsQualityAttenuation.GetVariance(), - selfRttsQualityAttenuation.GetStandardDeviation(), - selfRttsQualityAttenuation.GetPDV(90), - selfRttsQualityAttenuation.GetPDV(99), - selfRttsQualityAttenuation.GetPercentile(90), - selfRttsQualityAttenuation.GetPercentile(99)) - } + buffer.WriteString(fmt.Sprintf("networkquality_%v_bits_per_second %d\n", + strings.ToLower(direction.DirectionLabel), int64(lastThroughputRate))) + buffer.WriteString(fmt.Sprintf("networkquality_%v_connections %d\n", + strings.ToLower(direction.DirectionLabel), + int64(lastThroughputOpenConnectionCount))) - if !testRanToStability { - fmt.Printf("Test did not run to stability, these results are estimates:\n") - } + if err := os.WriteFile(*prometheusStatsFilename, buffer.Bytes(), 0o644); err != nil { + fmt.Printf("could not write %s: %s", *prometheusStatsFilename, err) + os.Exit(1) + } + } - fmt.Printf("RPM: %5.0f (P90)\n", p90Rpm) - fmt.Printf("RPM: %5.0f (Double-Sided 10%% Trimmed Mean)\n", meanRpm) + direction.ThroughputDataLogger.Export() + if *debugCliFlag { + fmt.Printf("Closing the %v throughput data logger.\n", direction.DirectionLabel) + } + direction.ThroughputDataLogger.Close() - fmt.Printf( - "Download: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", - utilities.ToMbps(lastDownloadThroughputRate), - utilities.ToMBps(lastDownloadThroughputRate), - lastDownloadThroughputOpenConnectionCount, - ) - fmt.Printf( - "Upload: %7.3f Mbps (%7.3f MBps), using %d parallel connections.\n", - utilities.ToMbps(lastUploadThroughputRate), - utilities.ToMBps(lastUploadThroughputRate), - lastUploadThroughputOpenConnectionCount, - ) + direction.GranularThroughputDataLogger.Export() + if *debugCliFlag { + fmt.Printf("Closing the %v granular throughput data logger.\n", direction.DirectionLabel) + } + direction.GranularThroughputDataLogger.Close() - if *calculateExtendedStats { - fmt.Println(extendedStats.Repr()) + if *debugCliFlag { + fmt.Printf("In debugging mode, we will cool down between tests.\n") + time.Sleep(constants.CooldownPeriod) + fmt.Printf("Done cooling down.\n") + } } - selfProbeDataLogger.Export() - if *debugCliFlag { - fmt.Printf("Closing the self data logger.\n") - } - selfProbeDataLogger.Close() + result := rpm.CalculateRpm(selfRtts, foreignRtts, specParameters.TrimmedMeanPct, 90) - foreignProbeDataLogger.Export() if *debugCliFlag { - fmt.Printf("Closing the foreign data logger.\n") + fmt.Printf("(Final RPM Calculation stats): %v\n", result.ToString()) } - foreignProbeDataLogger.Close() - downloadThroughputDataLogger.Export() - if *debugCliFlag { - fmt.Printf("Closing the download throughput data logger.\n") - } - downloadThroughputDataLogger.Close() + fmt.Printf("Final RPM: %5.0f (P%d)\n", result.PNRpm, 90) + fmt.Printf("Final RPM: %5.0f (Double-Sided %v%% Trimmed Mean)\n", + result.MeanRpm, specParameters.TrimmedMeanPct) - uploadThroughputDataLogger.Export() - if *debugCliFlag { - fmt.Printf("Closing the upload throughput data logger.\n") - } - uploadThroughputDataLogger.Close() + // Stop the world. + operatingCtxCancel() - granularThroughputDataLogger.Export() + // Note: We do *not* have to export/close the upload *and* download + // sides of the self/foreign probe data loggers because they both + // refer to the same logger. Closing/exporting one will close/export + // the other. + uploadDirection.SelfProbeDataLogger.Export() if *debugCliFlag { - fmt.Printf("Closing the granular throughput data logger.\n") + fmt.Printf("Closing the self data loggers.\n") } - granularThroughputDataLogger.Close() + uploadDirection.SelfProbeDataLogger.Close() + uploadDirection.ForeignProbeDataLogger.Export() if *debugCliFlag { - fmt.Printf("In debugging mode, we will cool down.\n") - time.Sleep(constants.CooldownPeriod) - fmt.Printf("Done cooling down.\n") - } - - if len(*prometheusStatsFilename) > 0 { - var testStable int - if testRanToStability { - testStable = 1 - } - var buffer bytes.Buffer - buffer.WriteString(fmt.Sprintf("networkquality_test_stable %d\n", testStable)) - buffer.WriteString(fmt.Sprintf("networkquality_rpm_value %d\n", int64(p90Rpm))) - buffer.WriteString(fmt.Sprintf("networkquality_trimmed_rpm_value %d\n", - int64(meanRpm))) // utilities.ToMbps(lastDownloadThroughputRate), - - buffer.WriteString(fmt.Sprintf("networkquality_download_bits_per_second %d\n", int64(lastDownloadThroughputRate))) - buffer.WriteString(fmt.Sprintf("networkquality_download_connections %d\n", - int64(lastDownloadThroughputOpenConnectionCount))) - buffer.WriteString(fmt.Sprintf("networkquality_upload_bits_per_second %d\n", int64(lastUploadThroughputRate))) - buffer.WriteString(fmt.Sprintf("networkquality_upload_connections %d\n", - lastUploadThroughputOpenConnectionCount)) - - if err := os.WriteFile(*prometheusStatsFilename, buffer.Bytes(), 0o644); err != nil { - fmt.Printf("could not write %s: %s", *prometheusStatsFilename, err) - os.Exit(1) - } + fmt.Printf("Closing the foreign data loggers.\n") } + uploadDirection.SelfProbeDataLogger.Close() } diff --git a/probe/probe.go b/probe/probe.go index 3a38e3f..fa19411 100644 --- a/probe/probe.go +++ b/probe/probe.go @@ -21,7 +21,6 @@ import ( "net/http" "net/http/httptrace" "os" - "sync" "time" "github.com/network-quality/goresponsiveness/debug" @@ -72,28 +71,25 @@ func (pt ProbeType) Value() string { return "Foreign" } +func (pt ProbeType) IsSelf() bool { + return pt == SelfUp || pt == SelfDown +} + func Probe( managingCtx context.Context, - waitGroup *sync.WaitGroup, client *http.Client, - lgc lgc.LoadGeneratingConnection, probeUrl string, probeHost string, // optional: for use with a test_endpoint + probeDirection lgc.LgcDirection, probeType ProbeType, - result *chan ProbeDataPoint, + probeId uint, captureExtendedStats bool, debugging *debug.DebugWithPrefix, -) error { - if waitGroup != nil { - waitGroup.Add(1) - defer waitGroup.Done() - } - +) (*ProbeDataPoint, error) { if client == nil { - return fmt.Errorf("cannot start a probe with a nil client") + return nil, fmt.Errorf("cannot start a probe with a nil client") } - probeId := utilities.GenerateUniqueId() probeTracer := NewProbeTracer(client, probeType, probeId, debugging) time_before_probe := time.Now() probe_req, err := http.NewRequestWithContext( @@ -103,7 +99,7 @@ func Probe( nil, ) if err != nil { - return err + return nil, err } // Used to disable compression @@ -112,18 +108,18 @@ func Probe( probe_resp, err := client.Do(probe_req) if err != nil { - return err + return nil, err } // Header.Get returns "" when not set if probe_resp.Header.Get("Content-Encoding") != "" { - return fmt.Errorf("Content-Encoding header was set (compression not allowed)") + return nil, fmt.Errorf("Content-Encoding header was set (compression not allowed)") } // TODO: Make this interruptable somehow by using _ctx_. _, err = io.ReadAll(probe_resp.Body) if err != nil { - return err + return nil, err } time_after_probe := time.Now() @@ -144,16 +140,13 @@ func Probe( ) + probeTracer.GetTCPDelta() // We must have reused the connection if we are a self probe! - if (probeType == SelfUp || probeType == SelfDown) && !probeTracer.stats.ConnectionReused { - if !utilities.IsInterfaceNil(lgc) { - fmt.Fprintf(os.Stderr, - "(%s) (%s Probe %v) Probe should have reused a connection, but it didn't (connection status: %v)!\n", - debugging.Prefix, - probeType.Value(), - probeId, - lgc.Status(), - ) - } + if probeType.IsSelf() && !probeTracer.stats.ConnectionReused { + fmt.Fprintf(os.Stderr, + "(%s) (%s Probe %v) Probe should have reused a connection, but it didn't!\n", + debugging.Prefix, + probeType.Value(), + probeId, + ) panic(!probeTracer.stats.ConnectionReused) } @@ -199,14 +192,12 @@ func Probe( fmt.Printf("Warning: Could not fetch the extended stats for a probe: %v\n", err) } } - dataPoint := ProbeDataPoint{ + return &ProbeDataPoint{ Time: time_before_probe, RoundTripCount: uint64(roundTripCount), Duration: totalDelay, TCPRtt: tcpRtt, TCPCwnd: tcpCwnd, Type: probeType, - } - *result <- dataPoint - return nil + }, nil } diff --git a/probe/tracer.go b/probe/tracer.go index bea1334..e59e1aa 100644 --- a/probe/tracer.go +++ b/probe/tracer.go @@ -33,7 +33,7 @@ type ProbeTracer struct { stats *stats.TraceStats trace *httptrace.ClientTrace debug debug.DebugLevel - probeid uint64 + probeid uint probeType ProbeType } @@ -41,7 +41,7 @@ func (p *ProbeTracer) String() string { return fmt.Sprintf("(Probe %v): stats: %v\n", p.probeid, p.stats) } -func (p *ProbeTracer) ProbeId() uint64 { +func (p *ProbeTracer) ProbeId() uint { return p.probeid } @@ -293,7 +293,7 @@ func (probe *ProbeTracer) SetHttpResponseReadyTime( func NewProbeTracer( client *http.Client, probeType ProbeType, - probeId uint64, + probeId uint, debugging *debug.DebugWithPrefix, ) *ProbeTracer { probe := &ProbeTracer{ diff --git a/rpm/calculations.go b/rpm/calculations.go new file mode 100644 index 0000000..5387aa7 --- /dev/null +++ b/rpm/calculations.go @@ -0,0 +1,94 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package rpm + +import ( + "fmt" + + "github.com/network-quality/goresponsiveness/ms" +) + +type Rpm struct { + SelfRttsTotal int + ForeignRttsTotal int + SelfRttsTrimmed int + ForeignRttsTrimmed int + SelfProbeRttPN float64 + ForeignProbeRttPN float64 + SelfProbeRttMean float64 + ForeignProbeRttMean float64 + PNRpm float64 + MeanRpm float64 +} + +func CalculateRpm(selfRtts ms.MathematicalSeries[float64], foreignRtts ms.MathematicalSeries[float64], trimming uint, percentile int) Rpm { + // First, let's do a double-sided trim of the top/bottom 10% of our measurements. + selfRttsTotalCount := selfRtts.Len() + foreignRttsTotalCount := foreignRtts.Len() + + selfRttsTrimmed := selfRtts.DoubleSidedTrim(trimming) + foreignRttsTrimmed := foreignRtts.DoubleSidedTrim(trimming) + + selfRttsTrimmedCount := selfRttsTrimmed.Len() + foreignRttsTrimmedCount := foreignRttsTrimmed.Len() + + // Then, let's take the mean of those ... + selfProbeRoundTripTimeMean := selfRttsTrimmed.CalculateAverage() + foreignProbeRoundTripTimeMean := foreignRttsTrimmed.CalculateAverage() + + // Second, let's do the P90 calculations. + selfProbeRoundTripTimePN := selfRtts.Percentile(percentile) + foreignProbeRoundTripTimePN := foreignRtts.Percentile(percentile) + + // Note: The specification indicates that we want to calculate the foreign probes as such: + // 1/3*tcp_foreign + 1/3*tls_foreign + 1/3*http_foreign + // where tcp_foreign, tls_foreign, http_foreign are the P90 RTTs for the connection + // of the tcp, tls and http connections, respectively. However, we cannot break out + // the individual RTTs so we assume that they are roughly equal. + + // This is 60 because we measure in seconds not ms + pnRpm := 60.0 / (float64(selfProbeRoundTripTimePN+foreignProbeRoundTripTimePN) / 2.0) + meanRpm := 60.0 / (float64(selfProbeRoundTripTimeMean+foreignProbeRoundTripTimeMean) / 2.0) + + return Rpm{ + SelfRttsTotal: selfRttsTotalCount, ForeignRttsTotal: foreignRttsTotalCount, + SelfRttsTrimmed: selfRttsTrimmedCount, ForeignRttsTrimmed: foreignRttsTrimmedCount, + SelfProbeRttPN: selfProbeRoundTripTimePN, ForeignProbeRttPN: foreignProbeRoundTripTimePN, + SelfProbeRttMean: selfProbeRoundTripTimeMean, ForeignProbeRttMean: foreignProbeRoundTripTimeMean, + PNRpm: pnRpm, MeanRpm: meanRpm, + } +} + +func (rpm *Rpm) ToString() string { + return fmt.Sprintf( + `Total Self Probes: %d +Total Foreign Probes: %d +Trimmed Self Probes Count: %d +Trimmed Foreign Probes Count: %d +P90 Self RTT: %f +P90 Foreign RTT: %f +Trimmed Mean Self RTT: %f +Trimmed Mean Foreign RTT: %f +`, + rpm.SelfRttsTotal, + rpm.ForeignRttsTotal, + rpm.SelfRttsTrimmed, + rpm.ForeignRttsTrimmed, + rpm.SelfProbeRttPN, + rpm.ForeignProbeRttPN, + rpm.SelfProbeRttMean, + rpm.ForeignProbeRttMean, + ) +} diff --git a/rpm/parameters.go b/rpm/parameters.go new file mode 100644 index 0000000..aff8639 --- /dev/null +++ b/rpm/parameters.go @@ -0,0 +1,85 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package rpm + +import ( + "fmt" + "time" + + "github.com/network-quality/goresponsiveness/utilities" +) + +type SpecParameters struct { + TestTimeout time.Duration // Total test time. + MovingAvgDist int + EvalInterval time.Duration // How often to reevaluate network conditions. + TrimmedMeanPct uint + StdDevTolerance float64 + MaxParallelConns int + ProbeInterval time.Duration + ProbeCapacityPct float64 +} + +func SpecParametersFromArguments(timeout int, mad int, id int, tmp uint, sdt float64, mnp int, mps int, ptc float64) (*SpecParameters, error) { + if timeout <= 0 { + return nil, fmt.Errorf("cannot specify a 0 or negative timeout for the test") + } + if mad <= 0 { + return nil, fmt.Errorf("cannot specify a 0 or negative moving-average distance for the test") + } + if id <= 0 { + return nil, fmt.Errorf("cannot specify a 0 or negative reevaluation interval for the test") + } + if tmp < 0 { + return nil, fmt.Errorf("cannot specify a negative trimming percentage for the test") + } + if sdt < 0 { + return nil, fmt.Errorf("cannot specify a negative standard-deviation tolerance for the test") + } + if mnp <= 0 { + return nil, fmt.Errorf("cannot specify a 0 or negative maximum number of parallel connections for the test") + } + if mps <= 0 { + return nil, fmt.Errorf("cannot specify a 0 or negative probing interval for the test") + } + if ptc <= 0 { + return nil, fmt.Errorf("cannot specify a 0 or negative probe capacity for the test") + } + testTimeout := time.Second * time.Duration(timeout) + evalInterval := time.Second * time.Duration(id) + probeInterval := utilities.PerSecondToInterval(int64(mps)) + + params := SpecParameters{ + TestTimeout: testTimeout, MovingAvgDist: mad, + EvalInterval: evalInterval, TrimmedMeanPct: tmp, StdDevTolerance: sdt, + MaxParallelConns: mnp, ProbeInterval: probeInterval, ProbeCapacityPct: ptc, + } + return ¶ms, nil +} + +func (parameters *SpecParameters) ToString() string { + return fmt.Sprintf( + `Timeout: %v, +Moving-Average Distance: %v, +Interval Duration: %v, +Trimmed-Mean Percentage: %v, +Standard-Deviation Tolerance: %v, +Maximum number of parallel connections: %v, +Probe Interval: %v (derived from given maximum-probes-per-second parameter), +Maximum Percentage Of Throughput For Probes: %v`, + parameters.TestTimeout, parameters.MovingAvgDist, parameters.EvalInterval, parameters.TrimmedMeanPct, + parameters.StdDevTolerance, parameters.MaxParallelConns, parameters.ProbeInterval, parameters.ProbeCapacityPct, + ) +} diff --git a/rpm/parameters_test.go b/rpm/parameters_test.go new file mode 100644 index 0000000..4a955c5 --- /dev/null +++ b/rpm/parameters_test.go @@ -0,0 +1,93 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package rpm + +import ( + "strings" + "testing" +) + +func TestSpecParametersFromArgumentsBadTimeout(t *testing.T) { + _, err := SpecParametersFromArguments(0, 0, 0, 0, 0, 0, 0, 0) + if err == nil || !strings.Contains(err.Error(), "timeout") { + t.Fatalf("0 timeout improperly allowed.") + } + _, err = SpecParametersFromArguments(-1, 0, 0, 0, 0, 0, 0, 0) + if err == nil || !strings.Contains(err.Error(), "timeout") { + t.Fatalf("negative timeout improperly allowed.") + } +} + +func TestSpecParametersFromArgumentsBadMad(t *testing.T) { + _, err := SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0) + if err == nil || !strings.Contains(err.Error(), "moving-average") { + t.Fatalf("0 mad improperly allowed.") + } + _, err = SpecParametersFromArguments(1, 0, 0, 0, 0, 0, 0, 0) + if err == nil || !strings.Contains(err.Error(), "moving-average") { + t.Fatalf("negative mad improperly allowed.") + } +} + +func TestSpecParametersFromArgumentsBadId(t *testing.T) { + _, err := SpecParametersFromArguments(1, 1, 0, 0, 0, 0, 0, 0) + if err == nil || !strings.Contains(err.Error(), "reevaluation") { + t.Fatalf("0 id improperly allowed.") + } + _, err = SpecParametersFromArguments(1, 1, -1, 0, 0, 0, 0, 0) + if err == nil || !strings.Contains(err.Error(), "reevaluation") { + t.Fatalf("negative id improperly allowed.") + } +} + +func TestSpecParametersFromArgumentsBadSdt(t *testing.T) { + _, err := SpecParametersFromArguments(1, 1, 1, 1, -1, 0, 0, 0) + if err == nil || !strings.Contains(err.Error(), "deviation") { + t.Fatalf("0 sdt improperly allowed.") + } +} + +func TestSpecParametersFromArgumentsBadMnp(t *testing.T) { + _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 0, 0, 0) + if err == nil || !strings.Contains(err.Error(), "parallel") { + t.Fatalf("0 mnp improperly allowed.") + } + _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, -1, 0, 0) + if err == nil || !strings.Contains(err.Error(), "parallel") { + t.Fatalf("negative mnp improperly allowed.") + } +} + +func TestSpecParametersFromArgumentsBadMps(t *testing.T) { + _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 0, 0) + if err == nil || !strings.Contains(err.Error(), "probing interval") { + t.Fatalf("0 mps improperly allowed.") + } + _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, -1, 0) + if err == nil || !strings.Contains(err.Error(), "probing interval") { + t.Fatalf("negative mps improperly allowed.") + } +} + +func TestSpecParametersFromArgumentsBadPtc(t *testing.T) { + _, err := SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, 0) + if err == nil || !strings.Contains(err.Error(), "capacity") { + t.Fatalf("0 ptc improperly allowed.") + } + _, err = SpecParametersFromArguments(1, 1, 1, 1, 1, 1, 1, -1) + if err == nil || !strings.Contains(err.Error(), "capacity") { + t.Fatalf("negative ptc improperly allowed.") + } +} @@ -37,19 +37,22 @@ func addFlows( toAdd uint64, lgcc *lgc.LoadGeneratingConnectionCollection, lgcGenerator func() lgc.LoadGeneratingConnection, - debug debug.DebugLevel, + debugging debug.DebugLevel, ) uint64 { lgcc.Lock.Lock() defer lgcc.Lock.Unlock() for i := uint64(0); i < toAdd; i++ { // First, generate the connection. - newGenerator := lgcGenerator() - lgcc.Append(newGenerator) + newConnection := lgcGenerator() + lgcc.Append(newConnection) + if debug.IsDebug(debugging) { + fmt.Printf("Added a new %s load-generating connection.\n", newConnection.Direction()) + } // Second, try to start the connection. - if !newGenerator.Start(ctx, debug) { + if !newConnection.Start(ctx, debugging) { // If there was an error, we'll make sure that the caller knows it. fmt.Printf( - "Error starting lgc with id %d!\n", newGenerator.ClientId(), + "Error starting lgc with id %d!\n", newConnection.ClientId(), ) return i } @@ -81,49 +84,60 @@ type SelfDataCollectionResult struct { LoggingContinuation func() } -func CombinedProber( +func ResponsivenessProber( proberCtx context.Context, networkActivityCtx context.Context, foreignProbeConfigurationGenerator func() probe.ProbeConfiguration, selfProbeConfigurationGenerator func() probe.ProbeConfiguration, - selfDownProbeConnection lgc.LoadGeneratingConnection, - selfUpProbeConnection lgc.LoadGeneratingConnection, + selfProbeConnectionCollection *lgc.LoadGeneratingConnectionCollection, + probeDirection lgc.LgcDirection, probeInterval time.Duration, keyLogger io.Writer, captureExtendedStats bool, debugging *debug.DebugWithPrefix, -) (dataPoints chan probe.ProbeDataPoint) { +) (dataPoints chan utilities.Pair[*probe.ProbeDataPoint]) { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) Starting to collect responsiveness information at an interval of %v!\n", + debugging.Prefix, + probeInterval, + ) + } + // Make a channel to send back all the generated data points // when we are probing. - dataPoints = make(chan probe.ProbeDataPoint) + dataPoints = make(chan utilities.Pair[*probe.ProbeDataPoint]) go func() { wg := sync.WaitGroup{} - probeCount := 0 + probeCount := uint(0) + + dataPointsLock := sync.Mutex{} // As long as our context says that we can continue to probe! for proberCtx.Err() == nil { - time.Sleep(probeInterval) - foreignProbeConfiguration := foreignProbeConfigurationGenerator() - selfProbeConfiguration := selfProbeConfigurationGenerator() - - if debug.IsDebug(debugging.Level) { - fmt.Printf( - "(%s) About to send round %d of probes!\n", - debugging.Prefix, - probeCount+1, - ) + // We may have slept for a very long time. So, let's check to see if we are + // still active, just for fun! + if proberCtx.Err() != nil { + break } - transport := &http.Transport{} - transport.TLSClientConfig = &tls.Config{} - transport.Proxy = http.ProxyFromEnvironment - if !utilities.IsInterfaceNil(keyLogger) { + wg.Add(1) + go func() { + defer wg.Done() + probeCount++ + probeCount := probeCount + + foreignProbeConfiguration := foreignProbeConfigurationGenerator() + selfProbeConfiguration := selfProbeConfigurationGenerator() + if debug.IsDebug(debugging.Level) { fmt.Printf( - "Using an SSL Key Logger for this foreign probe.\n", + "(%s) About to send round %d of probes!\n", + debugging.Prefix, + probeCount, ) } @@ -134,112 +148,160 @@ func CombinedProber( // depend on whether the url contains // https:// or http://: // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L74 - transport.TLSClientConfig.KeyLogWriter = keyLogger - } + transport := &http.Transport{} + transport.TLSClientConfig = &tls.Config{} + transport.Proxy = http.ProxyFromEnvironment - transport.TLSClientConfig.InsecureSkipVerify = - foreignProbeConfiguration.InsecureSkipVerify + if !utilities.IsInterfaceNil(keyLogger) { + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "Using an SSL Key Logger for a foreign probe.\n", + ) + } - utilities.OverrideHostTransport(transport, - foreignProbeConfiguration.ConnectToAddr) + transport.TLSClientConfig.KeyLogWriter = keyLogger + } - foreignProbeClient := &http.Client{Transport: transport} + transport.TLSClientConfig.InsecureSkipVerify = + foreignProbeConfiguration.InsecureSkipVerify - // Start Foreign Connection Prober - probeCount++ - go probe.Probe( - networkActivityCtx, - &wg, - foreignProbeClient, - nil, - foreignProbeConfiguration.URL, - foreignProbeConfiguration.Host, - probe.Foreign, - &dataPoints, - captureExtendedStats, - debugging, - ) + utilities.OverrideHostTransport(transport, + foreignProbeConfiguration.ConnectToAddr) - // Start Self Download Connection Prober + foreignProbeClient := &http.Client{Transport: transport} - // TODO: Make the following sanity check more than just a check. - // We only want to start a SelfDown probe on a connection that is - // in the RUNNING state. - if selfDownProbeConnection.Status() == lgc.LGC_STATUS_RUNNING { - go probe.Probe( + // Start Foreign Connection Prober + foreignProbeDataPoint, err := probe.Probe( networkActivityCtx, - &wg, - selfDownProbeConnection.Client(), - selfDownProbeConnection, - selfProbeConfiguration.URL, - selfProbeConfiguration.Host, - probe.SelfDown, - &dataPoints, + foreignProbeClient, + foreignProbeConfiguration.URL, + foreignProbeConfiguration.Host, + probeDirection, + probe.Foreign, + probeCount, captureExtendedStats, debugging, ) - } else { - panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n", - debugging.Prefix, selfDownProbeConnection.Status())) - } + if err != nil { + return + } + + var selfProbeConnection *lgc.LoadGeneratingConnection = nil + func() { + selfProbeConnectionCollection.Lock.Lock() + defer selfProbeConnectionCollection.Lock.Unlock() + selfProbeConnection, err = selfProbeConnectionCollection.GetRandom() + if err != nil { + if debug.IsWarn(debugging.Level) { + fmt.Printf( + "(%s) Failed to get a random %s load-generating connection on which to send a probe: %v.\n", + debugging.Prefix, + utilities.Conditional(probeDirection == lgc.LGC_DOWN, "download", "upload"), + err, + ) + } + return + } + }() + if selfProbeConnection == nil { + return + } - // Start Self Upload Connection Prober + // TODO: Make the following sanity check more than just a check. + // We only want to start a SelfUp probe on a connection that is + // in the RUNNING state. + if (*selfProbeConnection).Status() != lgc.LGC_STATUS_RUNNING { + if debug.IsWarn(debugging.Level) { + fmt.Printf( + "(%s) The selected random %s load-generating connection on which to send a probe was not running.\n", + debugging.Prefix, + utilities.Conditional(probeDirection == lgc.LGC_DOWN, "download", "upload"), + ) + } + return + } - // TODO: Make the following sanity check more than just a check. - // We only want to start a SelfDown probe on a connection that is - // in the RUNNING state. - if selfUpProbeConnection.Status() == lgc.LGC_STATUS_RUNNING { - go probe.Probe( + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) Selected %s load-generating connection with ID %d to send a self probe with Id %d.\n", + debugging.Prefix, + utilities.Conditional(probeDirection == lgc.LGC_DOWN, "download", "upload"), + (*selfProbeConnection).ClientId(), + probeCount, + ) + } + selfProbeDataPoint, err := probe.Probe( proberCtx, - &wg, - selfUpProbeConnection.Client(), - nil, + (*selfProbeConnection).Client(), selfProbeConfiguration.URL, selfProbeConfiguration.Host, - probe.SelfUp, - &dataPoints, + probeDirection, + utilities.Conditional(probeDirection == lgc.LGC_DOWN, probe.SelfDown, probe.SelfUp), + probeCount, captureExtendedStats, debugging, ) - } else { - panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n", - debugging.Prefix, selfUpProbeConnection.Status())) - } + if err != nil { + fmt.Printf( + "(%s) There was an error sending a self probe with Id %d: %v\n", + debugging.Prefix, + probeCount, + err, + ) + return + } + + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "(%s) About to report results for round %d of probes!\n", + debugging.Prefix, + probeCount, + ) + } + + dataPointsLock.Lock() + // Now we have our four data points (three in the foreign probe data point and one in the self probe data point) + if dataPoints != nil { + dataPoints <- utilities.Pair[*probe.ProbeDataPoint]{ + First: foreignProbeDataPoint, Second: selfProbeDataPoint, + } + } + dataPointsLock.Unlock() + }() } if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) Combined probe driver is going to start waiting for its probes to finish.\n", + "(%s) Probe driver is going to start waiting for its probes to finish.\n", debugging.Prefix, ) } utilities.OrTimeout(func() { wg.Wait() }, 2*time.Second) if debug.IsDebug(debugging.Level) { fmt.Printf( - "(%s) Combined probe driver is done waiting for its probes to finish.\n", + "(%s) Probe driver is done waiting for its probes to finish.\n", debugging.Prefix, ) } + dataPointsLock.Lock() close(dataPoints) + dataPoints = nil + dataPointsLock.Unlock() }() return } func LoadGenerator( + throughputCtx context.Context, // Stop our activity when we no longer need any throughput networkActivityCtx context.Context, // Create all network connections in this context. - loadGeneratorCtx context.Context, // Stop our activity when we no longer need to generate load. + generateLoadCtx context.Context, // Stop adding additional throughput when we are stable. rampupInterval time.Duration, lgcGenerator func() lgc.LoadGeneratingConnection, // Use this to generate a new load-generating connection. loadGeneratingConnectionsCollection *lgc.LoadGeneratingConnectionCollection, + mnp int, captureExtendedStats bool, // do we want to attempt to gather TCP information on these connections? debugging *debug.DebugWithPrefix, // How can we forget debugging? -) (probeConnectionCommunicationChannel chan lgc.LoadGeneratingConnection, // Send back a channel to communicate the connection to be used for self probes. - throughputCalculations chan ThroughputDataPoint, // Send back all the instantaneous throughputs that we generate. -) { - throughputCalculations = make(chan ThroughputDataPoint) - // The channel that we are going to use to send back the connection to use for probing may not immediately - // be read by the caller. We don't want to wait around until they are ready before we start doing our work. - // So, we'll make it buffered. - probeConnectionCommunicationChannel = make(chan lgc.LoadGeneratingConnection, 1) +) (stabilizerCommunicationChannel chan ThroughputDataPoint) { // Send back all the instantaneous throughputs that we generate. + stabilizerCommunicationChannel = make(chan ThroughputDataPoint) go func() { flowsCreated := uint64(0) @@ -252,32 +314,12 @@ func LoadGenerator( debugging.Level, ) - // We have at least a single load-generating channel. This channel will be the one that - // the self probes use. - go func() { - loadGeneratingConnectionsCollection.Lock.Lock() - zerothConnection, err := loadGeneratingConnectionsCollection.Get(0) - loadGeneratingConnectionsCollection.Lock.Unlock() - if err != nil { - panic("Could not get the zeroth connection!\n") - } - // We are going to wait until it is started. - if !(*zerothConnection).WaitUntilStarted(loadGeneratorCtx) { - fmt.Fprintf(os.Stderr, "Could not wait until the zeroth load-generating connection was started!\n") - return - } - // Now that it is started, we will send it back to the caller so that - // they can pass it on to the CombinedProber which will use it for the - // self probes. - probeConnectionCommunicationChannel <- *zerothConnection - }() - nextSampleStartTime := time.Now().Add(rampupInterval) for currentInterval := uint64(0); true; currentInterval++ { - // If the loadGeneratorCtx is canceled, then that means our work here is done ... - if loadGeneratorCtx.Err() != nil { + // If the throughputCtx is canceled, then that means our work here is done ... + if throughputCtx.Err() != nil { break } @@ -297,6 +339,12 @@ func LoadGenerator( } nextSampleStartTime = time.Now().Add(time.Second) + // Waiting is the hardest part -- that was a long time asleep + // and we may have been cancelled during that time! + if throughputCtx.Err() != nil { + break + } + // Compute "instantaneous aggregate" goodput which is the number of // bytes transferred within the last second. var instantaneousThroughputTotal float64 = 0 @@ -406,16 +454,50 @@ func LoadGenerator( len(*loadGeneratingConnectionsCollection.LGCs), granularThroughputDatapoints, } - throughputCalculations <- throughputDataPoint + stabilizerCommunicationChannel <- throughputDataPoint - // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now! - flowsCreated += addFlows( - networkActivityCtx, - constants.AdditiveNumberOfLoadGeneratingConnections, - loadGeneratingConnectionsCollection, - lgcGenerator, - debugging.Level, - ) + if generateLoadCtx.Err() != nil { + // No need to add additional data points because the controller told us + // that we were stable. But, we want to continue taking measurements! + if debug.IsDebug(debugging.Level) { + fmt.Printf( + "%v: Throughput is stable; not adding any additional load-generating connections.\n", + debugging, + ) + } + continue + } + + loadGeneratingConnectionsCollection.Lock.Lock() + currentParallelConnectionCount, err := + loadGeneratingConnectionsCollection.Len() + loadGeneratingConnectionsCollection.Lock.Unlock() + + if err != nil { + if debug.IsWarn(debugging.Level) { + fmt.Printf( + "%v: Failed to get a count of the number of parallel load-generating connections: %v.\n", + debugging, + err, + ) + } + } + if currentParallelConnectionCount < mnp { + // Just add another constants.AdditiveNumberOfLoadGeneratingConnections flows -- that's our only job now! + flowsCreated += addFlows( + networkActivityCtx, + constants.AdditiveNumberOfLoadGeneratingConnections, + loadGeneratingConnectionsCollection, + lgcGenerator, + debugging.Level, + ) + } else if debug.IsWarn(debugging.Level) { + fmt.Printf( + "%v: Maximum number of parallel transport-layer connections reached (%d). Not adding another.\n", + debugging, + mnp, + ) + } } if debug.IsDebug(debugging.Level) { diff --git a/stabilizer/algorithm.go b/stabilizer/algorithm.go new file mode 100644 index 0000000..45c34d9 --- /dev/null +++ b/stabilizer/algorithm.go @@ -0,0 +1,130 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package stabilizer + +import ( + "fmt" + "sync" + + "github.com/network-quality/goresponsiveness/debug" + "github.com/network-quality/goresponsiveness/ms" + "golang.org/x/exp/constraints" +) + +type MeasurementStablizer[T constraints.Float | constraints.Integer] struct { + instantaneousses ms.MathematicalSeries[T] + aggregates ms.MathematicalSeries[float64] + stabilityStandardDeviation float64 + trimmingLevel uint + m sync.Mutex + dbgLevel debug.DebugLevel + dbgConfig *debug.DebugWithPrefix + units string +} + +// Stabilizer parameters: +// 1. MAD: An all-purpose value that determines the hysteresis of various calculations +// that will affect saturation (of either throughput or responsiveness). +// 2: SDT: The standard deviation cutoff used to determine stability among the K preceding +// moving averages of a measurement. +// 3: TMP: The percentage by which to trim the values before calculating the standard deviation +// to determine whether the value is within acceptable range for stability (SDT). + +// Stabilizer Algorithm: +// Throughput stabilization is achieved when the standard deviation of the MAD number of the most +// recent moving averages of instantaneous measurements is within an upper bound. +// +// Yes, that *is* a little confusing: +// The user will deliver us a steady diet of measurements of the number of bytes transmitted during the immediately +// previous interval. We will keep the MAD most recent of those measurements. Every time that we get a new +// measurement, we will recalculate the moving average of the MAD most instantaneous measurements. We will call that +// the moving average aggregate throughput at interval p. We keep the MAD most recent of those values. +// If the calculated standard deviation of *those* values is less than SDT, we declare +// stability. + +func NewStabilizer[T constraints.Float | constraints.Integer]( + mad uint, + sdt float64, + trimmingLevel uint, + units string, + debugLevel debug.DebugLevel, + debug *debug.DebugWithPrefix, +) MeasurementStablizer[T] { + return MeasurementStablizer[T]{ + instantaneousses: ms.NewCappedMathematicalSeries[T](mad), + aggregates: ms.NewCappedMathematicalSeries[float64](mad), + stabilityStandardDeviation: sdt, + trimmingLevel: trimmingLevel, + units: units, + dbgConfig: debug, + dbgLevel: debugLevel, + } +} + +func (r3 *MeasurementStablizer[T]) AddMeasurement(measurement T) { + r3.m.Lock() + defer r3.m.Unlock() + // Add this instantaneous measurement to the mix of the MAD previous instantaneous measurements. + r3.instantaneousses.AddElement(measurement) + // Calculate the moving average of the MAD previous instantaneous measurements (what the + // algorithm calls moving average aggregate throughput at interval p) and add it to + // the mix of MAD previous moving averages. + r3.aggregates.AddElement(r3.instantaneousses.CalculateAverage()) + + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf( + "%s: MA: %f Mbps (previous %d intervals).\n", + r3.dbgConfig.String(), + r3.aggregates.CalculateAverage(), + r3.aggregates.Len(), + ) + } +} + +func (r3 *MeasurementStablizer[T]) IsStable() bool { + // There are MAD number of measurements of the _moving average aggregate throughput + // at interval p_ in movingAverages. + isvalid, stddev := r3.aggregates.StandardDeviation() + + if !isvalid { + // If there are not enough values in the series to be able to calculate a + // standard deviation, then we know that we are not yet stable. Vamoose. + return false + } + + // Stability is determined by whether or not the standard deviation of the values + // is within some percentage of the average. + stabilityCutoff := r3.aggregates.CalculateAverage() * (r3.stabilityStandardDeviation / 100.0) + isStable := stddev <= stabilityCutoff + + if debug.IsDebug(r3.dbgLevel) { + fmt.Printf( + "%s: Is Stable? %v; Standard Deviation: %f %s; Is Normally Distributed? %v; Standard Deviation Cutoff: %v %s).\n", + r3.dbgConfig.String(), + isStable, + stddev, + r3.units, + r3.aggregates.IsNormallyDistributed(), + stabilityCutoff, + r3.units, + ) + fmt.Printf("%s: Values: ", r3.dbgConfig.String()) + for _, v := range r3.aggregates.Values() { + fmt.Printf("%v, ", v) + } + fmt.Printf("\n") + } + return isStable +} diff --git a/stabilizer/rev3.go b/stabilizer/rev3.go deleted file mode 100644 index 4c24f54..0000000 --- a/stabilizer/rev3.go +++ /dev/null @@ -1,199 +0,0 @@ -/* - * This file is part of Go Responsiveness. - * - * Go Responsiveness is free software: you can redistribute it and/or modify it under - * the terms of the GNU General Public License as published by the Free Software Foundation, - * either version 2 of the License, or (at your option) any later version. - * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY - * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A - * PARTICULAR PURPOSE. See the GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. - */ - -package stabilizer - -import ( - "fmt" - "sync" - - "github.com/network-quality/goresponsiveness/debug" - "github.com/network-quality/goresponsiveness/ms" - "github.com/network-quality/goresponsiveness/probe" - "github.com/network-quality/goresponsiveness/rpm" - "github.com/network-quality/goresponsiveness/utilities" -) - -type DataPointStabilizer struct { - instantaneousMeasurements ms.MathematicalSeries[float64] - movingAverages ms.MathematicalSeries[float64] - stabilityStandardDeviation float64 - m sync.Mutex - dbgLevel debug.DebugLevel - dbgConfig *debug.DebugWithPrefix -} - -type ProbeStabilizer DataPointStabilizer -type ThroughputStabilizer DataPointStabilizer - -// Stabilizer parameters: -// 1. I: The number of previous instantaneous measurements to consider when generating -// the so-called instantaneous moving averages. -// 2. K: The number of instantaneous moving averages to consider when determining stability. -// 3: S: The standard deviation cutoff used to determine stability among the K preceding -// moving averages of a measurement. - -// Rev3 Stabilizer Algorithm: -// Stabilization is achieved when the standard deviation of a given number of the most recent moving averages of -// instantaneous measurements is within an upper bound. -// -// Yes, that *is* a little confusing: -// The user will deliver us a steady diet of so-called instantaneous measurements. We will keep the I most recent -// of those measurements. Every time that we get a new instantaneous measurement, we will recalculate the moving -// average of the I most instantaneous measurements. We will call that an instantaneous moving average. We keep the K -// most recent instantaneous moving averages. Every time that we calculate a new instantaneous moving average, we will -// calculate the standard deviation of those values. If the calculated standard deviation is less than S, we declare -// stability. - -func NewProbeStabilizer( - i uint64, - k uint64, - s float64, - debugLevel debug.DebugLevel, - debug *debug.DebugWithPrefix, -) ProbeStabilizer { - return ProbeStabilizer{instantaneousMeasurements: ms.NewCappedMathematicalSeries[float64](i), - movingAverages: ms.NewCappedMathematicalSeries[float64](k), - stabilityStandardDeviation: s, - dbgConfig: debug, - dbgLevel: debugLevel} -} - -func (r3 *ProbeStabilizer) AddMeasurement(measurement probe.ProbeDataPoint) { - r3.m.Lock() - defer r3.m.Unlock() - - // There may be more than one round trip accumulated together. If that is the case, - // we will blow them apart in to three separate measurements and each one will just - // be 1 / measurement.RoundTripCount of the total length. - for range utilities.Iota(0, int(measurement.RoundTripCount)) { - // Add this instantaneous measurement to the mix of the I previous instantaneous measurements. - r3.instantaneousMeasurements.AddElement( - measurement.Duration.Seconds() / float64(measurement.RoundTripCount), - ) - } - // Calculate the moving average of the I previous instantaneous measurements and add it to - // the mix of K previous moving averages. - r3.movingAverages.AddElement(r3.instantaneousMeasurements.CalculateAverage()) - - if debug.IsDebug(r3.dbgLevel) { - fmt.Printf( - "%s: MA: %f ns (previous %d intervals).\n", - r3.dbgConfig.String(), - r3.movingAverages.CalculateAverage(), - r3.movingAverages.Len(), - ) - } -} - -func (r3 *ProbeStabilizer) IsStable() bool { - // calculate whether the standard deviation of the K previous moving averages falls below S. - isvalid, stddev := r3.movingAverages.StandardDeviation() - - if !isvalid { - // If there are not enough values in the series to be able to calculate a - // standard deviation, then we know that we are not yet stable. Vamoose. - return false - } - - // Stability is determined by whether or not the standard deviation of the values - // is within some percentage of the average. - stabilityCutoff := r3.movingAverages.CalculateAverage() * (r3.stabilityStandardDeviation / 100.0) - isStable := stddev <= stabilityCutoff - - if debug.IsDebug(r3.dbgLevel) { - fmt.Printf( - "%s: Is Stable? %v; Standard Deviation: %f s; Is Normally Distributed? %v; Standard Deviation Cutoff: %v s).\n", - r3.dbgConfig.String(), - isStable, - stddev, - r3.movingAverages.IsNormallyDistributed(), - stabilityCutoff, - ) - fmt.Printf("%s: Values: ", r3.dbgConfig.String()) - for _, v := range r3.movingAverages.Values() { - fmt.Printf("%v, ", v) - } - fmt.Printf("\n") - } - - return isStable -} - -func NewThroughputStabilizer( - i uint64, - k uint64, - s float64, - debugLevel debug.DebugLevel, - debug *debug.DebugWithPrefix, -) ThroughputStabilizer { - return ThroughputStabilizer{ - instantaneousMeasurements: ms.NewCappedMathematicalSeries[float64](i), - movingAverages: ms.NewCappedMathematicalSeries[float64](k), - stabilityStandardDeviation: s, - dbgConfig: debug, - dbgLevel: debugLevel, - } -} - -func (r3 *ThroughputStabilizer) AddMeasurement(measurement rpm.ThroughputDataPoint) { - r3.m.Lock() - defer r3.m.Unlock() - // Add this instantaneous measurement to the mix of the I previous instantaneous measurements. - r3.instantaneousMeasurements.AddElement(utilities.ToMbps(measurement.Throughput)) - // Calculate the moving average of the I previous instantaneous measurements and add it to - // the mix of K previous moving averages. - r3.movingAverages.AddElement(r3.instantaneousMeasurements.CalculateAverage()) - - if debug.IsDebug(r3.dbgLevel) { - fmt.Printf( - "%s: MA: %f Mbps (previous %d intervals).\n", - r3.dbgConfig.String(), - r3.movingAverages.CalculateAverage(), - r3.movingAverages.Len(), - ) - } -} - -func (r3 *ThroughputStabilizer) IsStable() bool { - isvalid, stddev := r3.movingAverages.StandardDeviation() - - if !isvalid { - // If there are not enough values in the series to be able to calculate a - // standard deviation, then we know that we are not yet stable. Vamoose. - return false - } - - // Stability is determined by whether or not the standard deviation of the values - // is within some percentage of the average. - stabilityCutoff := r3.movingAverages.CalculateAverage() * (r3.stabilityStandardDeviation / 100.0) - isStable := stddev <= stabilityCutoff - - if debug.IsDebug(r3.dbgLevel) { - fmt.Printf( - "%s: Is Stable? %v; Standard Deviation: %f Mbps; Is Normally Distributed? %v; Standard Deviation Cutoff: %v Mbps).\n", - r3.dbgConfig.String(), - isStable, - stddev, - r3.movingAverages.IsNormallyDistributed(), - stabilityCutoff, - ) - fmt.Printf("%s: Values: ", r3.dbgConfig.String()) - for _, v := range r3.movingAverages.Values() { - fmt.Printf("%v, ", v) - } - fmt.Printf("\n") - } - return isStable -} diff --git a/utilities/utilities.go b/utilities/utilities.go index e75d373..ff04023 100644 --- a/utilities/utilities.go +++ b/utilities/utilities.go @@ -30,13 +30,10 @@ import ( "golang.org/x/exp/constraints" ) -var ( - // GitVersion is the Git revision hash - GitVersion = "dev" -) +// GitVersion is the Git revision hash +var GitVersion = "dev" func Iota(low int, high int) (made []int) { - made = make([]int, high-low) for counter := low; counter < high; counter++ { made[counter-low] = counter @@ -67,7 +64,7 @@ func AbsPercentDifference( ) } -func Conditional(condition bool, t string, f string) string { +func Conditional[T any](condition bool, t T, f T) T { if condition { return t } @@ -229,3 +226,12 @@ func ContextSignaler(ctxt context.Context, st time.Duration, condition *func() b return } } + +type Pair[T any] struct { + First T + Second T +} + +func PerSecondToInterval(rate int64) time.Duration { + return time.Duration(time.Second.Nanoseconds() / rate) +} diff --git a/utilities/utilities_test.go b/utilities/utilities_test.go index aa66f6b..9cd4ef0 100644 --- a/utilities/utilities_test.go +++ b/utilities/utilities_test.go @@ -116,3 +116,13 @@ func TestWaitWithContext(t *testing.T) { wg.Wait() } + +func TestPerSecondToInterval(t *testing.T) { + if time.Second != PerSecondToInterval(1) { + t.Fatalf("A number of nanoseconds is not equal to a second!") + } + + if time.Second/2 != PerSecondToInterval(2) { + t.Fatalf("Something that happens twice per second should happen every 5000ns.") + } +} |
