diff options
| -rw-r--r-- | lbc/lbc.go | 10 | ||||
| -rw-r--r-- | networkQuality.go | 34 |
2 files changed, 29 insertions, 15 deletions
@@ -105,7 +105,6 @@ func (lbd *LoadBearingConnectionDownload) doDownload(ctx context.Context) { } cr := &countingReader{n: &lbd.downloaded, ctx: ctx, readable: get.Body} _, _ = io.Copy(ioutil.Discard, cr) - lbd.valid = false get.Body.Close() if lbd.debug { fmt.Printf("Ending a load-bearing download.\n") @@ -156,10 +155,13 @@ func (s *syntheticCountingReader) Read(p []byte) (n int, err error) { func (lbu *LoadBearingConnectionUpload) doUpload(ctx context.Context) bool { lbu.uploaded = 0 s := &syntheticCountingReader{n: &lbu.uploaded, ctx: ctx} - if resp, err := lbu.client.Post(lbu.Path, "application/octet-stream", s); err == nil { - resp.Body.Close() + var resp *http.Response = nil + var err error + + if resp, err = lbu.client.Post(lbu.Path, "application/octet-stream", s); err != nil { + lbu.valid = false } - lbu.valid = false + resp.Body.Close() if lbu.debug { fmt.Printf("Ending a load-bearing upload.\n") } diff --git a/networkQuality.go b/networkQuality.go index f390990..50a0ff0 100644 --- a/networkQuality.go +++ b/networkQuality.go @@ -172,7 +172,7 @@ func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGe lbcsPreviousTransferred := make([]uint64, 0) // Create 4 load bearing connections - addFlows(operatingCtx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil) + addFlows(saturationCtx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil) previousFlowIncreaseIteration := uint64(0) previousMovingAverage := float64(0) @@ -184,7 +184,7 @@ func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGe for currentIteration := uint64(0); true; currentIteration++ { // When the program stops operating, then stop. - if operatingCtx.Err() != nil { + if saturationCtx.Err() != nil { return } @@ -258,7 +258,7 @@ func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGe if debug != nil { fmt.Printf("%v: Adding flows because we are unsaturated and waited a while.\n", debug) } - addFlows(operatingCtx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil) + addFlows(saturationCtx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil) previousFlowIncreaseIteration = currentIteration } else { if debug != nil { @@ -280,7 +280,7 @@ func saturate(saturationCtx context.Context, operatingCtx context.Context, lbcGe if debug != nil { fmt.Printf("%v: New flows to add to try to increase our saturation!\n", debug) } - addFlows(operatingCtx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil) + addFlows(saturationCtx, 4, &lbcs, &lbcsPreviousTransferred, lbcGenerator, debug != nil) previousFlowIncreaseIteration = currentIteration } } @@ -400,9 +400,12 @@ func main() { return } saturationTimeout = true - timeoutAbsoluteTime = time.Now().Add(RPMCalculationTime * time.Second) - timeoutChannel = timeoutat.TimeoutAt(operatingCtx, timeoutAbsoluteTime, *debug) + + // We timed out attempting to saturate the link. So, we will shut down all the saturation xfers cancelSaturationCtx() + // and then we will give ourselves some additional time in order to calculate a provisional saturation. + timeoutAbsoluteTime = time.Now().Add(RPMCalculationTime) + timeoutChannel = timeoutat.TimeoutAt(operatingCtx, timeoutAbsoluteTime, *debug) if *debug { fmt.Printf("################# timeout reaching saturation!\n") } @@ -410,11 +413,18 @@ func main() { } } - // If there was a timeout achieving saturation then we already added another 5 seconds - // to the available time for testing. However, if saturation was achieved before the timeout - // then we want to give ourselves another 5 seconds to calculate the RPM. + // TODO: Confirm the following ... + // We are now going to do some RTT measurement so that we can calculate the RPM. The next operation + // might be controversial: + cancelSaturationCtx() + // We are closing down all the transfers that we used to saturate the link. If we do *not* do this + // cancellation then the "calculation" small transfers below will get starved out. + + // Give ourselves no more than 15 seconds to complete the RPM calculation. + // This is conditional because (above) we may have already added the time. We did it up there so that + // we could also limit the amount of time waiting for a conditional saturation calculation. if !saturationTimeout { - timeoutAbsoluteTime = time.Now().Add(RPMCalculationTime * time.Second) + timeoutAbsoluteTime = time.Now().Add(RPMCalculationTime) timeoutChannel = timeoutat.TimeoutAt(operatingCtx, timeoutAbsoluteTime, *debug) } @@ -428,7 +438,7 @@ func main() { } randomLbcsIndex := rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int() % len(downloadSaturation.Lbcs) if !downloadSaturation.Lbcs[randomLbcsIndex].IsValid() { - if debug != nil { + if *debug { fmt.Printf("%v: The randomly selected download LBC (at index %d) was invalid. Skipping.\n", debug, randomLbcsIndex) } @@ -476,6 +486,8 @@ func main() { cancelOperatingCtx() if *debug { + fmt.Printf("In debugging mode, we will cool down.\n") time.Sleep(cooldownPeriod) + fmt.Printf("Done cooling down.\n") } } |
