From b74e4d4261d8618ef5f99118d2b784cde2614ca2 Mon Sep 17 00:00:00 2001 From: Will Hawkins Date: Wed, 15 Dec 2021 13:36:44 -0500 Subject: Rename MeasurableConnection interface to LoadBearingConnection. --- lbc/lbc.go | 125 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ mc/mc.go | 125 ------------------------------------------------------ networkQuality.go | 52 +++++++++++------------ 3 files changed, 151 insertions(+), 151 deletions(-) create mode 100644 lbc/lbc.go delete mode 100644 mc/mc.go diff --git a/lbc/lbc.go b/lbc/lbc.go new file mode 100644 index 0000000..82fe550 --- /dev/null +++ b/lbc/lbc.go @@ -0,0 +1,125 @@ +package lbc + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "net/http" +) + +var chunkSize int = 5000 + +type LoadBearingConnection interface { + Start(context.Context, bool) bool + Transferred() uint64 + Client() *http.Client +} + +type LoadBearingConnectionDownload struct { + Path string + downloaded uint64 + client *http.Client +} + +func (lbd *LoadBearingConnectionDownload) Transferred() uint64 { + return lbd.downloaded +} + +func (lbd *LoadBearingConnectionDownload) Client() *http.Client { + return lbd.client +} + +func (lbd *LoadBearingConnectionDownload) Start(ctx context.Context, debug bool) bool { + lbd.downloaded = 0 + lbd.client = &http.Client{} + + // At some point this might be useful: It is a snippet of code that will enable + // logging of per-session TLS key material in order to make debugging easier in + // Wireshark. + /* + lbd.client = &http.Client{ + Transport: &http2.Transport{ + TLSClientConfig: &tls.Config{ + KeyLogWriter: w, + + Rand: utilities.RandZeroSource{}, // for reproducible output; don't do this. + InsecureSkipVerify: true, // test server certificate is not trusted. + }, + }, + } + */ + + if debug { + fmt.Printf("Started a load bearing download.\n") + } + go doDownload(ctx, lbd.client, lbd.Path, &lbd.downloaded, debug) + return true +} + +func doDownload(ctx context.Context, client *http.Client, path string, count *uint64, debug bool) { + get, err := client.Get(path) + if err != nil { + return + } + for ctx.Err() == nil { + n, err := io.CopyN(ioutil.Discard, get.Body, int64(chunkSize)) + if err != nil { + break + } + *count += uint64(n) + } + get.Body.Close() + if debug { + fmt.Printf("Ending a load-bearing download.\n") + } +} + +type LoadBearingConnectionUpload struct { + Path string + uploaded uint64 + client *http.Client +} + +func (lbu *LoadBearingConnectionUpload) Transferred() uint64 { + return lbu.uploaded +} + +func (lbd *LoadBearingConnectionUpload) Client() *http.Client { + return lbd.client +} + +type syntheticCountingReader struct { + n *uint64 + ctx context.Context +} + +func (s *syntheticCountingReader) Read(p []byte) (n int, err error) { + if s.ctx.Err() != nil { + return 0, io.EOF + } + err = nil + n = len(p) + n = chunkSize + *s.n += uint64(n) + return +} + +func doUpload(ctx context.Context, client *http.Client, path string, count *uint64, debug bool) bool { + *count = 0 + s := &syntheticCountingReader{n: count, ctx: ctx} + resp, _ := client.Post(path, "application/octet-stream", s) + resp.Body.Close() + if debug { + fmt.Printf("Ending a load-bearing upload.\n") + } + return true +} + +func (lbu *LoadBearingConnectionUpload) Start(ctx context.Context, debug bool) bool { + lbu.uploaded = 0 + lbu.client = &http.Client{} + fmt.Printf("Started a load bearing upload.\n") + go doUpload(ctx, lbu.client, lbu.Path, &lbu.uploaded, debug) + return true +} diff --git a/mc/mc.go b/mc/mc.go deleted file mode 100644 index 710e203..0000000 --- a/mc/mc.go +++ /dev/null @@ -1,125 +0,0 @@ -package mc - -import ( - "context" - "fmt" - "io" - "io/ioutil" - "net/http" -) - -var chunkSize int = 5000 - -type MeasurableConnection interface { - Start(context.Context, bool) bool - Transferred() uint64 - Client() *http.Client -} - -type LoadBearingDownload struct { - Path string - downloaded uint64 - client *http.Client -} - -func (lbd *LoadBearingDownload) Transferred() uint64 { - return lbd.downloaded -} - -func (lbd *LoadBearingDownload) Client() *http.Client { - return lbd.client -} - -func (lbd *LoadBearingDownload) Start(ctx context.Context, debug bool) bool { - lbd.downloaded = 0 - lbd.client = &http.Client{} - - // At some point this might be useful: It is a snippet of code that will enable - // logging of per-session TLS key material in order to make debugging easier in - // Wireshark. - /* - lbd.client = &http.Client{ - Transport: &http2.Transport{ - TLSClientConfig: &tls.Config{ - KeyLogWriter: w, - - Rand: utilities.RandZeroSource{}, // for reproducible output; don't do this. - InsecureSkipVerify: true, // test server certificate is not trusted. - }, - }, - } - */ - - if debug { - fmt.Printf("Started a load bearing download.\n") - } - go doDownload(ctx, lbd.client, lbd.Path, &lbd.downloaded, debug) - return true -} - -func doDownload(ctx context.Context, client *http.Client, path string, count *uint64, debug bool) { - get, err := client.Get(path) - if err != nil { - return - } - for ctx.Err() == nil { - n, err := io.CopyN(ioutil.Discard, get.Body, int64(chunkSize)) - if err != nil { - break - } - *count += uint64(n) - } - get.Body.Close() - if debug { - fmt.Printf("Ending a load-bearing download.\n") - } -} - -type LoadBearingUpload struct { - Path string - uploaded uint64 - client *http.Client -} - -func (lbu *LoadBearingUpload) Transferred() uint64 { - return lbu.uploaded -} - -func (lbd *LoadBearingUpload) Client() *http.Client { - return lbd.client -} - -type syntheticCountingReader struct { - n *uint64 - ctx context.Context -} - -func (s *syntheticCountingReader) Read(p []byte) (n int, err error) { - if s.ctx.Err() != nil { - return 0, io.EOF - } - err = nil - n = len(p) - n = chunkSize - *s.n += uint64(n) - return -} - -func doUpload(ctx context.Context, client *http.Client, path string, count *uint64, debug bool) bool { - *count = 0 - s := &syntheticCountingReader{n: count, ctx: ctx} - resp, _ := client.Post(path, "application/octet-stream", s) - resp.Body.Close() - if debug { - fmt.Printf("Ending a load-bearing upload.\n") - } - return true -} - -func (lbu *LoadBearingUpload) Start(ctx context.Context, debug bool) bool { - lbu.uploaded = 0 - lbu.client = &http.Client{} - fmt.Printf("Started a load bearing upload.\n") - go doUpload(ctx, lbu.client, lbu.Path, &lbu.uploaded, debug) - return true -} diff --git a/networkQuality.go b/networkQuality.go index 9231970..44d64b1 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -15,8 +15,8 @@ import ( "strings" "time" + "github.com/hawkinsw/goresponsiveness/lbc" "github.com/hawkinsw/goresponsiveness/ma" - "github.com/hawkinsw/goresponsiveness/mc" "github.com/hawkinsw/goresponsiveness/timeoutat" "github.com/hawkinsw/goresponsiveness/utilities" ) @@ -90,13 +90,13 @@ var ( storeSslKeys = flag.Bool("store-ssl-keys", false, "Store SSL keys from connections for debugging. (currently unused)") ) -func addFlows(ctx context.Context, toAdd uint64, mcs *[]mc.MeasurableConnection, mcsPreviousTransferred *[]uint64, lbcGenerator func() mc.MeasurableConnection, debug bool) { +func addFlows(ctx context.Context, toAdd uint64, lbcs *[]lbc.LoadBearingConnection, lbcsPreviousTransferred *[]uint64, lbcGenerator func() lbc.LoadBearingConnection, debug bool) { for i := uint64(0); i < toAdd; i++ { //mcs[i] = &mc.LoadBearingUpload{Path: config.Urls.UploadUrl} - *mcs = append(*mcs, lbcGenerator()) - *mcsPreviousTransferred = append(*mcsPreviousTransferred, 0) - if !(*mcs)[len(*mcs)-1].Start(ctx, debug) { - fmt.Printf("Error starting %dth MC!\n", i) + *lbcs = append(*lbcs, lbcGenerator()) + *lbcsPreviousTransferred = append(*lbcsPreviousTransferred, 0) + if !(*lbcs)[len(*lbcs)-1].Start(ctx, debug) { + fmt.Printf("Error starting %dth LBC!\n", i) return } } @@ -104,18 +104,18 @@ func addFlows(ctx context.Context, toAdd uint64, mcs *[]mc.MeasurableConnection, type SaturationResult struct { RateBps float64 - Mcs []mc.MeasurableConnection + Lbcs []lbc.LoadBearingConnection } -func saturate(ctx context.Context, lbcGenerator func() mc.MeasurableConnection, debug bool) (saturated chan SaturationResult) { +func saturate(ctx context.Context, lbcGenerator func() lbc.LoadBearingConnection, debug bool) (saturated chan SaturationResult) { saturated = make(chan SaturationResult) go func() { - mcs := make([]mc.MeasurableConnection, 0) - mcsPreviousTransferred := make([]uint64, 0) + lbcs := make([]lbc.LoadBearingConnection, 0) + lbcsPreviousTransferred := make([]uint64, 0) // Create 4 load bearing connections - addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug) + addFlows(ctx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug) previousFlowIncreaseIteration := uint64(0) previousMovingAverage := float64(0) @@ -145,11 +145,11 @@ func saturate(ctx context.Context, lbcGenerator func() mc.MeasurableConnection, // Compute "instantaneous aggregate" goodput which is the number of bytes transferred within the last second. totalTransfer := uint64(0) - for i := range mcs { - previousTransferred := mcsPreviousTransferred[i] - currentTransferred := mcs[i].Transferred() + for i := range lbcs { + previousTransferred := lbcsPreviousTransferred[i] + currentTransferred := lbcs[i].Transferred() totalTransfer += (currentTransferred - previousTransferred) - mcsPreviousTransferred[i] = currentTransferred + lbcsPreviousTransferred[i] = currentTransferred } // Compute a moving average of the last 4 "instantaneous aggregate goodput" measurements @@ -172,7 +172,7 @@ func saturate(ctx context.Context, lbcGenerator func() mc.MeasurableConnection, if debug { fmt.Printf("Adding flows because we are unsaturated and waited a while.\n") } - addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug) + addFlows(ctx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug) previousFlowIncreaseIteration = currentIteration } else { if debug { @@ -191,13 +191,13 @@ func saturate(ctx context.Context, lbcGenerator func() mc.MeasurableConnection, if debug { fmt.Printf("New flows to add to try to increase our saturation!\n") } - addFlows(ctx, 4, &mcs, &mcsPreviousTransferred, lbcGenerator, debug) + addFlows(ctx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug) previousFlowIncreaseIteration = currentIteration } } } - saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), Mcs: mcs} + saturated <- SaturationResult{RateBps: movingAverage.CalculateAverage(), Lbcs: lbcs} }() return } @@ -224,11 +224,11 @@ func main() { timeoutChannel := timeoutat.TimeoutAt(operatingCtx, time.Now().Add(timeoutDuration), *debug) - generate_lbd := func() mc.MeasurableConnection { - return &mc.LoadBearingDownload{Path: config.Urls.LargeUrl} + generate_lbd := func() lbc.LoadBearingConnection { + return &lbc.LoadBearingConnectionDownload{Path: config.Urls.LargeUrl} } - generate_lbu := func() mc.MeasurableConnection { - return &mc.LoadBearingUpload{Path: config.Urls.UploadUrl} + generate_lbu := func() lbc.LoadBearingConnection { + return &lbc.LoadBearingConnectionUpload{Path: config.Urls.UploadUrl} } downloadSaturationChannel := saturate(operatingCtx, generate_lbd, *debug) uploadSaturationChannel := saturate(operatingCtx, generate_lbu, *debug) @@ -245,14 +245,14 @@ func main() { { download_saturated = true if *debug { - fmt.Printf("################# download is saturated (%fMBps, %d flows)!\n", toMBs(downloadSaturation.RateBps), len(downloadSaturation.Mcs)) + fmt.Printf("################# download is saturated (%fMBps, %d flows)!\n", toMBs(downloadSaturation.RateBps), len(downloadSaturation.Lbcs)) } } case uploadSaturation = <-uploadSaturationChannel: { upload_saturated = true if *debug { - fmt.Printf("################# upload is saturated (%fMBps, %d flows)!\n", toMBs(uploadSaturation.RateBps), len(uploadSaturation.Mcs)) + fmt.Printf("################# upload is saturated (%fMBps, %d flows)!\n", toMBs(uploadSaturation.RateBps), len(uploadSaturation.Lbcs)) } } case <-timeoutChannel: @@ -276,13 +276,13 @@ func main() { totalRTTTime := float64(0) for i := 0; i < robustnessProbeIterationCount && !test_timeout; i++ { - randomMcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Mcs) + randomLbcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Lbcs) select { case <-timeoutChannel: { test_timeout = true } - case fiveRTTsTime := <-utilities.TimedSequentialRTTs(operatingCtx, downloadSaturation.Mcs[randomMcsIndex].Client(), &http.Client{}, config.Urls.SmallUrl): + case fiveRTTsTime := <-utilities.TimedSequentialRTTs(operatingCtx, downloadSaturation.Lbcs[randomLbcsIndex].Client(), &http.Client{}, config.Urls.SmallUrl): { actualRTTCount += 5 totalRTTTime += fiveRTTsTime.Delay.Seconds() -- cgit v1.2.3