diff options
Diffstat (limited to 'lgc/upload.go')
| -rw-r--r-- | lgc/upload.go | 206 |
1 files changed, 206 insertions, 0 deletions
diff --git a/lgc/upload.go b/lgc/upload.go new file mode 100644 index 0000000..f0c772e --- /dev/null +++ b/lgc/upload.go @@ -0,0 +1,206 @@ +/* + * This file is part of Go Responsiveness. + * + * Go Responsiveness is free software: you can redistribute it and/or modify it under + * the terms of the GNU General Public License as published by the Free Software Foundation, + * either version 2 of the License, or (at your option) any later version. + * Go Responsiveness is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with Go Responsiveness. If not, see <https://www.gnu.org/licenses/>. + */ + +package lgc + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net/http" + "sync" + "sync/atomic" + "time" + + "github.com/network-quality/goresponsiveness/debug" + "github.com/network-quality/goresponsiveness/stats" + "github.com/network-quality/goresponsiveness/utilities" +) + +// TODO: All 64-bit fields that are accessed atomically must +// appear at the top of this struct. +type LoadGeneratingConnectionUpload struct { + uploaded uint64 + lastIntervalEnd int64 + URL string + ConnectToAddr string + uploadStartTime time.Time + lastUploaded uint64 + client *http.Client + debug debug.DebugLevel + InsecureSkipVerify bool + KeyLogger io.Writer + clientId uint64 + status LgcStatus + statusLock *sync.Mutex + statusWaiter *sync.Cond +} + +func NewLoadGeneratingConnectionUpload(url string, keyLogger io.Writer, connectToAddr string, insecureSkipVerify bool) LoadGeneratingConnectionUpload { + lgu := LoadGeneratingConnectionUpload{ + URL: url, + KeyLogger: keyLogger, + ConnectToAddr: connectToAddr, + InsecureSkipVerify: insecureSkipVerify, + statusLock: &sync.Mutex{}, + } + lgu.status = LGC_STATUS_NOT_STARTED + lgu.statusWaiter = sync.NewCond(lgu.statusLock) + return lgu +} + +func (lgu *LoadGeneratingConnectionUpload) WaitUntilStarted(ctxt context.Context) bool { + conditional := func() bool { return lgu.status != LGC_STATUS_NOT_STARTED } + go utilities.ContextSignaler(ctxt, 500*time.Millisecond, &conditional, lgu.statusWaiter) + return utilities.WaitWithContext(ctxt, &conditional, lgu.statusLock, lgu.statusWaiter) +} + +func (lgu *LoadGeneratingConnectionUpload) ClientId() uint64 { + return lgu.clientId +} + +func (lgu *LoadGeneratingConnectionUpload) TransferredInInterval() (uint64, time.Duration) { + transferred := atomic.SwapUint64(&lgu.uploaded, 0) + newIntervalEnd := (time.Now().Sub(lgu.uploadStartTime)).Nanoseconds() + previousIntervalEnd := atomic.SwapInt64(&lgu.lastIntervalEnd, newIntervalEnd) + intervalLength := time.Duration(newIntervalEnd - previousIntervalEnd) + if debug.IsDebug(lgu.debug) { + fmt.Printf("upload: Transferred: %v bytes in %v.\n", transferred, intervalLength) + } + return transferred, intervalLength +} + +func (lgu *LoadGeneratingConnectionUpload) Client() *http.Client { + return lgu.client +} + +func (lgu *LoadGeneratingConnectionUpload) Status() LgcStatus { + return lgu.status +} + +type syntheticCountingReader struct { + n *uint64 + ctx context.Context + lgu *LoadGeneratingConnectionUpload +} + +func (s *syntheticCountingReader) Read(p []byte) (n int, err error) { + if s.ctx.Err() != nil { + return 0, io.EOF + } + if n == 0 { + s.lgu.statusLock.Lock() + s.lgu.status = LGC_STATUS_RUNNING + s.lgu.statusWaiter.Broadcast() + s.lgu.statusLock.Unlock() + } + err = nil + n = len(p) + + atomic.AddUint64(s.n, uint64(n)) + return +} + +func (lgu *LoadGeneratingConnectionUpload) doUpload(ctx context.Context) error { + lgu.uploaded = 0 + s := &syntheticCountingReader{n: &lgu.uploaded, ctx: ctx, lgu: lgu} + var resp *http.Response = nil + var request *http.Request = nil + var err error + + if request, err = http.NewRequest( + "POST", + lgu.URL, + s, + ); err != nil { + lgu.statusLock.Lock() + lgu.status = LGC_STATUS_ERROR + lgu.statusWaiter.Broadcast() + lgu.statusLock.Unlock() + return err + } + + // Used to disable compression + request.Header.Set("Accept-Encoding", "identity") + request.Header.Set("User-Agent", utilities.UserAgent()) + + lgu.uploadStartTime = time.Now() + lgu.lastIntervalEnd = 0 + + lgu.statusLock.Lock() + lgu.status = LGC_STATUS_RUNNING + lgu.statusWaiter.Broadcast() + lgu.statusLock.Unlock() + + if resp, err = lgu.client.Do(request); err != nil { + lgu.statusLock.Lock() + lgu.status = LGC_STATUS_ERROR + lgu.statusWaiter.Broadcast() + lgu.statusLock.Unlock() + return err + } + + lgu.statusLock.Lock() + lgu.status = LGC_STATUS_DONE + lgu.statusWaiter.Broadcast() + lgu.statusLock.Unlock() + + resp.Body.Close() + if debug.IsDebug(lgu.debug) { + fmt.Printf("Ending a load-generating upload.\n") + } + return nil +} + +func (lgu *LoadGeneratingConnectionUpload) Start( + parentCtx context.Context, + debugLevel debug.DebugLevel, +) bool { + lgu.uploaded = 0 + lgu.clientId = utilities.GenerateUniqueId() + lgu.debug = debugLevel + + transport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: lgu.InsecureSkipVerify, + }, + } + + if !utilities.IsInterfaceNil(lgu.KeyLogger) { + if debug.IsDebug(lgu.debug) { + fmt.Printf( + "Using an SSL Key Logger for this load-generating upload.\n", + ) + } + transport.TLSClientConfig.KeyLogWriter = lgu.KeyLogger + } + + utilities.OverrideHostTransport(transport, lgu.ConnectToAddr) + + lgu.client = &http.Client{Transport: transport} + + if debug.IsDebug(lgu.debug) { + fmt.Printf("Started a load-generating upload (id: %v).\n", lgu.clientId) + } + + go lgu.doUpload(parentCtx) + return true +} + +func (lgu *LoadGeneratingConnectionUpload) Stats() *stats.TraceStats { + // Get all your stats from the download side of the LGC. + return nil +} |
