summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lgc/download.go18
-rw-r--r--lgc/upload.go2
-rw-r--r--rpm/rpm.go60
3 files changed, 50 insertions, 30 deletions
diff --git a/lgc/download.go b/lgc/download.go
index 71ed647..a73cc37 100644
--- a/lgc/download.go
+++ b/lgc/download.go
@@ -138,10 +138,6 @@ func (lgd *LoadGeneratingConnectionDownload) SetGetConnTime(now time.Time) {
lgd.stats.GetConnectionStartTime,
)
}
- lgd.statusLock.Lock()
- lgd.status = LGC_STATUS_RUNNING
- lgd.statusWaiter.Broadcast()
- lgd.statusLock.Unlock()
}
func (lgd *LoadGeneratingConnectionDownload) SetGotConnTimeInfo(
@@ -241,17 +237,25 @@ func (lgd *LoadGeneratingConnectionDownload) Client() *http.Client {
return lgd.client
}
-type countingReader struct {
+type loadGeneratingConnectionDownloadReader struct {
n *uint64
ctx context.Context
readable io.Reader
+ lgd *LoadGeneratingConnectionDownload
}
-func (cr *countingReader) Read(p []byte) (n int, err error) {
+func (cr *loadGeneratingConnectionDownloadReader) Read(p []byte) (n int, err error) {
if cr.ctx.Err() != nil {
return 0, io.EOF
}
+ if *cr.n == 0 {
+ cr.lgd.statusLock.Lock()
+ cr.lgd.status = LGC_STATUS_RUNNING
+ cr.lgd.statusWaiter.Broadcast()
+ cr.lgd.statusLock.Unlock()
+ }
+
n, err = cr.readable.Read(p)
atomic.AddUint64(cr.n, uint64(n))
return
@@ -356,7 +360,7 @@ func (lgd *LoadGeneratingConnectionDownload) doDownload(ctx context.Context) err
fmt.Printf("Content-Encoding header was set (compression not allowed)")
return fmt.Errorf("Content-Encoding header was set (compression not allowed)")
}
- cr := &countingReader{n: &lgd.downloaded, ctx: ctx, readable: get.Body}
+ cr := &loadGeneratingConnectionDownloadReader{n: &lgd.downloaded, ctx: ctx, lgd: lgd, readable: get.Body}
_, _ = io.Copy(io.Discard, cr)
lgd.statusLock.Lock()
diff --git a/lgc/upload.go b/lgc/upload.go
index f0c772e..5175fe0 100644
--- a/lgc/upload.go
+++ b/lgc/upload.go
@@ -100,7 +100,7 @@ func (s *syntheticCountingReader) Read(p []byte) (n int, err error) {
if s.ctx.Err() != nil {
return 0, io.EOF
}
- if n == 0 {
+ if *s.n == 0 {
s.lgu.statusLock.Lock()
s.lgu.status = LGC_STATUS_RUNNING
s.lgu.statusWaiter.Broadcast()
diff --git a/rpm/rpm.go b/rpm/rpm.go
index 99a52b6..c900642 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -158,30 +158,46 @@ func CombinedProber(
)
// Start Self Download Connection Prober
- go probe.Probe(
- networkActivityCtx,
- &wg,
- selfDownProbeConnection.Client(),
- selfProbeConfiguration.URL,
- selfProbeConfiguration.Host,
- probe.SelfDown,
- &dataPoints,
- captureExtendedStats,
- debugging,
- )
+
+ // TODO: Make the following sanity check more than just a check.
+ // We only want to start a SelfDown probe on a connection that is
+ // in the RUNNING state.
+ if selfDownProbeConnection.Status() == lgc.LGC_STATUS_RUNNING {
+ go probe.Probe(
+ networkActivityCtx,
+ &wg,
+ selfDownProbeConnection.Client(),
+ selfProbeConfiguration.URL,
+ selfProbeConfiguration.Host,
+ probe.SelfDown,
+ &dataPoints,
+ captureExtendedStats,
+ debugging,
+ )
+ } else {
+ panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n", debugging.Prefix, selfDownProbeConnection.Status()))
+ }
// Start Self Upload Connection Prober
- go probe.Probe(
- proberCtx,
- &wg,
- selfUpProbeConnection.Client(),
- selfProbeConfiguration.URL,
- selfProbeConfiguration.Host,
- probe.SelfUp,
- &dataPoints,
- captureExtendedStats,
- debugging,
- )
+
+ // TODO: Make the following sanity check more than just a check.
+ // We only want to start a SelfDown probe on a connection that is
+ // in the RUNNING state.
+ if selfUpProbeConnection.Status() == lgc.LGC_STATUS_RUNNING {
+ go probe.Probe(
+ proberCtx,
+ &wg,
+ selfUpProbeConnection.Client(),
+ selfProbeConfiguration.URL,
+ selfProbeConfiguration.Host,
+ probe.SelfUp,
+ &dataPoints,
+ captureExtendedStats,
+ debugging,
+ )
+ } else {
+ panic(fmt.Sprintf("(%s) Combined probe driver evidently lost its underlying connection (Status: %v).\n", debugging.Prefix, selfUpProbeConnection.Status()))
+ }
}
if debug.IsDebug(debugging.Level) {
fmt.Printf(