diff options
| -rw-r--r-- | lgc/download.go | 18 | ||||
| -rw-r--r-- | lgc/upload.go | 2 | ||||
| -rw-r--r-- | rpm/rpm.go | 60 |
3 files changed, 50 insertions, 30 deletions
diff --git a/lgc/download.go b/lgc/download.go index 71ed647..a73cc37 100644 --- a/lgc/download.go +++ b/lgc/download.go @@ -138,10 +138,6 @@ func (lgd *LoadGeneratingConnectionDownload) SetGetConnTime(now time.Time) { lgd.stats.GetConnectionStartTime, ) } - lgd.statusLock.Lock() - lgd.status = LGC_STATUS_RUNNING - lgd.statusWaiter.Broadcast() - lgd.statusLock.Unlock() } func (lgd *LoadGeneratingConnectionDownload) SetGotConnTimeInfo( @@ -241,17 +237,25 @@ func (lgd *LoadGeneratingConnectionDownload) Client() *http.Client { return lgd.client } -type countingReader struct { +type loadGeneratingConnectionDownloadReader struct { n *uint64 ctx context.Context readable io.Reader + lgd *LoadGeneratingConnectionDownload } -func (cr *countingReader) Read(p []byte) (n int, err error) { +func (cr *loadGeneratingConnectionDownloadReader) Read(p []byte) (n int, err error) { if cr.ctx.Err() != nil { return 0, io.EOF } + if *cr.n == 0 { + cr.lgd.statusLock.Lock() + cr.lgd.status = LGC_STATUS_RUNNING + cr.lgd.statusWaiter.Broadcast() + cr.lgd.statusLock.Unlock() + } + n, err = cr.readable.Read(p) atomic.AddUint64(cr.n, uint64(n)) return @@ -356,7 +360,7 @@ func (lgd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) err fmt.Printf("Content-Encoding header was set (compression not allowed)") return fmt.Errorf("Content-Encoding header was set (compression not allowed)") } - cr := &countingReader{n: &lgd.downloaded, ctx: ctx, readable: get.Body} + cr := &loadGeneratingConnectionDownloadReader{n: &lgd.downloaded, ctx: ctx, lgd: lgd, readable: get.Body} _, _ = io.Copy(io.Discard, cr) lgd.statusLock.Lock() diff --git a/lgc/upload.go b/lgc/upload.go index f0c772e..5175fe0 100644 --- a/lgc/upload.go +++ b/lgc/upload.go @@ -100,7 +100,7 @@ func (s *syntheticCountingReader) Read(p []byte) (n int, err error) { if s.ctx.Err() != nil { return 0, io.EOF } - if n == 0 { + if *s.n == 0 { s.lgu.statusLock.Lock() s.lgu.status = LGC_STATUS_RUNNING s.lgu.statusWaiter.Broadcast() @@ -158,30 +158,46 @@ func CombinedProber( ) // Start Self Download Connection Prober - go probe.Probe( - networkActivityCtx, - &wg, - selfDownProbeConnection.Client(), - selfProbeConfiguration.URL, - selfProbeConfiguration.Host, - probe.SelfDown, - &dataPoints, - captureExtendedStats, - debugging, - ) + + // TODO: Make the following sanity check more than just a check. + // We only want to start a SelfDown probe on a connection that is + // in the RUNNING state. + if selfDownProbeConnection.Status() == lgc.LGC_STATUS_RUNNING { + go probe.Probe( + networkActivityCtx, + &wg, + selfDownProbeConnection.Client(), + selfProbeConfiguration.URL, + selfProbeConfiguration.Host, + probe.SelfDown, + &dataPoints, + captureExtendedStats, + debugging, + ) + } else { + panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n", debugging.Prefix, selfDownProbeConnection.Status())) + } // Start Self Upload Connection Prober - go probe.Probe( - proberCtx, - &wg, - selfUpProbeConnection.Client(), - selfProbeConfiguration.URL, - selfProbeConfiguration.Host, - probe.SelfUp, - &dataPoints, - captureExtendedStats, - debugging, - ) + + // TODO: Make the following sanity check more than just a check. + // We only want to start a SelfDown probe on a connection that is + // in the RUNNING state. + if selfUpProbeConnection.Status() == lgc.LGC_STATUS_RUNNING { + go probe.Probe( + proberCtx, + &wg, + selfUpProbeConnection.Client(), + selfProbeConfiguration.URL, + selfProbeConfiguration.Host, + probe.SelfUp, + &dataPoints, + captureExtendedStats, + debugging, + ) + } else { + panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n", debugging.Prefix, selfUpProbeConnection.Status())) + } } if debug.IsDebug(debugging.Level) { fmt.Printf( |
