summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lgc/lgc.go4
-rw-r--r--networkQuality.go71
-rw-r--r--rpm/rpm.go290
-rw-r--r--utilities/utilities.go2
-rw-r--r--utilities/utilities_test.go39
5 files changed, 350 insertions, 56 deletions
diff --git a/lgc/lgc.go b/lgc/lgc.go
index b0694db..50b15c6 100644
--- a/lgc/lgc.go
+++ b/lgc/lgc.go
@@ -245,7 +245,7 @@ func (lgd *LoadGeneratingConnectionDownload) Start(
debugLevel debug.DebugLevel,
) bool {
lgd.downloaded = 0
- lgd.clientId = utilities.GenerateConnectionId()
+ lgd.clientId = utilities.GenerateUniqueId()
transport := http2.Transport{}
transport.TLSClientConfig = &tls.Config{}
@@ -400,7 +400,7 @@ func (lgu *LoadGeneratingConnectionUpload) Start(
debugLevel debug.DebugLevel,
) bool {
lgu.uploaded = 0
- lgu.clientId = utilities.GenerateConnectionId()
+ lgu.clientId = utilities.GenerateUniqueId()
lgu.debug = debugLevel
// See above for the rationale of doing http2.Transport{} here
diff --git a/networkQuality.go b/networkQuality.go
index d5c3523..ca51e51 100644
--- a/networkQuality.go
+++ b/networkQuality.go
@@ -98,6 +98,9 @@ func main() {
lgDataCollectionCtx, cancelLGDataCollectionCtx := context.WithCancel(
context.Background(),
)
+ newConnectionProberCtx, newConnectionProberCtxCancel := context.WithCancel(
+ context.Background(),
+ )
config := &config.Config{}
var debugLevel debug.DebugLevel = debug.Error
@@ -198,11 +201,20 @@ func main() {
}
generate_lg_probe_configuration := func() rpm.ProbeConfiguration {
- return rpm.ProbeConfiguration{URL: config.Urls.SmallUrl}
+ return rpm.ProbeConfiguration{URL: config.Urls.SmallUrl, Interval: 100 * time.Millisecond}
+ }
+
+ generate_nc_probe_configuration := func() rpm.ProbeConfiguration {
+ return rpm.ProbeConfiguration{URL: config.Urls.SmallUrl, Interval: 100 * time.Millisecond}
}
var downloadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "download")
var uploadDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "upload")
+ var newConnectionDebugging *debug.DebugWithPrefix = debug.NewDebugWithPrefix(debugLevel, "new connection probe")
+
+ // TODO: Separate contexts for load generation and data collection. If we do that, if either of the two
+ // data collection go routines stops well before the other, they will continue to send probes and we can
+ // generate additional information!
downloadDataCollectionChannel := rpm.LGCollectData(
lgDataCollectionCtx,
@@ -219,6 +231,13 @@ func main() {
uploadDebugging,
)
+ newConnectionProbeDataPoints := rpm.Prober(
+ newConnectionProberCtx,
+ generate_nc_probe_configuration,
+ sslKeyFileConcurrentWriter,
+ newConnectionDebugging,
+ )
+
dataCollectionTimeout := false
uploadDataCollectionComplete := false
downloadDataCollectionComple := false
@@ -298,6 +317,9 @@ func main() {
}
}
+ // Shutdown the new-connection prober!
+ newConnectionProberCtxCancel()
+
// In the new version we are no longer going to wait to send probes until after
// saturation. When we get here we are now only going to compute the results
// and/or extended statistics!
@@ -333,6 +355,53 @@ func main() {
len(uploadDataCollectionResult.LGCs),
)
+ totalNewConnectionRoundTripTime := float64(0)
+ totalNewConnectionRoundTrips := uint64(0)
+ for ncDp := range newConnectionProbeDataPoints {
+ totalNewConnectionRoundTripTime += ncDp.Duration.Seconds()
+ totalNewConnectionRoundTrips += uint64(ncDp.RoundTripCount)
+ }
+ averageNewConnectionRoundTripTime := totalNewConnectionRoundTripTime / float64(
+ totalNewConnectionRoundTrips,
+ )
+ newConnectionRpm := (1.0 / averageNewConnectionRoundTripTime) * 60.0
+ if *debugCliFlag {
+ fmt.Printf(
+ "Total New-Connection Round Trips: %d, Total New-Connection Round Trip Time: %f, Average New-Connection Round Trip Time (in seconds): %f\n",
+ totalNewConnectionRoundTrips,
+ totalNewConnectionRoundTripTime,
+ averageNewConnectionRoundTripTime,
+ )
+ fmt.Printf("(New-Connection) RPM: %f\n", newConnectionRpm)
+ }
+
+ totalLoadGeneratingRoundTripTime := float64(0)
+ totalLoadGeneratingRoundTrips := uint64(0)
+ for _, dp := range downloadDataCollectionResult.DataPoints {
+ totalLoadGeneratingRoundTripTime += dp.Duration.Seconds()
+ totalLoadGeneratingRoundTrips += uint64(dp.RoundTripCount)
+ }
+ for _, dp := range uploadDataCollectionResult.DataPoints {
+ totalLoadGeneratingRoundTripTime += dp.Duration.Seconds()
+ totalLoadGeneratingRoundTrips += uint64(dp.RoundTripCount)
+ }
+ averageLoadGeneratingRoundTripTime := totalLoadGeneratingRoundTripTime / float64(
+ totalLoadGeneratingRoundTrips,
+ )
+ loadGeneratingRPM := (1.0 / averageLoadGeneratingRoundTripTime) * 60.0
+ if *debugCliFlag {
+ fmt.Printf(
+ "Total Load-Generating Round Trips: %d, Total New-Connection Round Trip Time: %f, Average New-Connection Round Trip Time (in seconds): %f\n",
+ totalLoadGeneratingRoundTrips,
+ totalLoadGeneratingRoundTripTime,
+ averageLoadGeneratingRoundTripTime,
+ )
+ fmt.Printf("(Load-Generating) RPM: %f\n", loadGeneratingRPM)
+ }
+
+ rpm := (newConnectionRpm + loadGeneratingRPM) / 2.0
+ fmt.Printf("RPM: %5.0f\n", rpm)
+
if *calculateExtendedStats {
fmt.Println(extendedStats.Repr())
}
diff --git a/rpm/rpm.go b/rpm/rpm.go
index cee308f..6aa68e8 100644
--- a/rpm/rpm.go
+++ b/rpm/rpm.go
@@ -17,6 +17,7 @@ import (
"github.com/network-quality/goresponsiveness/stats"
"github.com/network-quality/goresponsiveness/traceable"
"github.com/network-quality/goresponsiveness/utilities"
+ "golang.org/x/net/http2"
)
func addFlows(
@@ -39,7 +40,8 @@ func addFlows(
}
type ProbeConfiguration struct {
- URL string
+ URL string
+ Interval time.Duration
}
type DataPoint struct {
@@ -53,26 +55,38 @@ type LGDataCollectionResult struct {
DataPoints []DataPoint
}
-func LGProbe(
+func Probe(
parentProbeCtx context.Context,
- connection lgc.LoadGeneratingConnection,
- lgProbeUrl string,
+ client *http.Client,
+ probeUrl string,
+ isLGProbe bool,
result *chan DataPoint,
debugging *debug.DebugWithPrefix,
) error {
- probeTracer := NewProbeTracer(connection.Client(), true, debugging)
+
+ probeTypeLabel := "New-Connection"
+ if isLGProbe {
+ probeTypeLabel = "Load-Generating"
+ }
+
+ if client == nil {
+ return fmt.Errorf("Cannot start a probe with a nil client")
+ }
+
+ probeId := utilities.GenerateUniqueId()
+ probeTracer := NewProbeTracer(client, isLGProbe, probeId, debugging)
time_before_probe := time.Now()
probe_req, err := http.NewRequestWithContext(
httptrace.WithClientTrace(parentProbeCtx, probeTracer.trace),
"GET",
- lgProbeUrl,
+ probeUrl,
nil,
)
if err != nil {
return err
}
- probe_resp, err := connection.Client().Do(probe_req)
+ probe_resp, err := client.Do(probe_req)
if err != nil {
return err
}
@@ -90,52 +104,161 @@ func LGProbe(
sanity := time_after_probe.Sub(time_before_probe)
- tlsAndHttpHeaderDelta := probeTracer.GetTLSAndHttpHeaderDelta()
- httpDownloadDelta := probeTracer.GetHttpDownloadDelta(
+ // When the probe is run on a load-generating connection there should
+ // only be a single round trip that is measured. We will take the accumulation of all these
+ // values just to be sure, though. Because of how this traced connection was launched, most
+ // of the values will be 0 (or very small where the time that go takes for delivering callbacks
+ // and doing context switches pokes through). When it is !isLGProbe then the values will
+ // be significant and we want to add them regardless!
+ totalDelay := probeTracer.GetTLSAndHttpHeaderDelta() + probeTracer.GetHttpDownloadDelta(
time_after_probe,
- ) // Combined with above, constitutes 2 time measurements, per the Spec.
- tcpDelta := probeTracer.GetTCPDelta() // Constitutes 1 time measurement, per the Spec.
- totalDelay := tlsAndHttpHeaderDelta + httpDownloadDelta + tcpDelta
+ ) + probeTracer.GetTCPDelta()
- // We must have reused the connection!
- if !probeTracer.stats.ConnectionReused {
+ // We must have reused the connection if we are a load-generating probe!
+ if isLGProbe && !probeTracer.stats.ConnectionReused {
panic(!probeTracer.stats.ConnectionReused)
}
if debug.IsDebug(debugging.Level) {
fmt.Printf(
- "(%s) (Probe %v) sanity vs total: %v vs %v\n",
+ "(%s) (%s Probe %v) sanity vs total: %v vs %v\n",
debugging.Prefix,
- probeTracer.ProbeId(),
+ probeTypeLabel,
+ probeId,
sanity,
totalDelay,
)
}
- *result <- DataPoint{RoundTripCount: 1, Duration: totalDelay}
+ roundTripCount := uint64(1)
+ if !isLGProbe {
+ roundTripCount = 3
+ }
+ // TODO: Careful!!! It's possible that this channel has been closed because the Prober that
+ // started it has been stopped. Writing to a closed channel will cause a panic. It might not
+ // matter because a panic just stops the go thread containing the paniced code and we are in
+ // a go thread that executes only this function.
+ defer func() {
+ _ = recover()
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) (%s Probe %v) Probe attempted to write to the result channel after its invoker ended.\n",
+ debugging.Prefix,
+ probeTypeLabel,
+ probeId,
+ )
+ }
+ }()
+ *result <- DataPoint{RoundTripCount: roundTripCount, Duration: totalDelay}
return nil
}
+func Prober(
+ proberCtx context.Context,
+ ncProbeConfigurationGenerator func() ProbeConfiguration,
+ keyLogger io.Writer,
+ debugging *debug.DebugWithPrefix,
+) (points chan DataPoint) {
+ points = make(chan DataPoint)
+
+ ncProbeConfiguration := ncProbeConfigurationGenerator()
+
+ go func() {
+ probeCount := 0
+
+ for proberCtx.Err() == nil {
+ time.Sleep(ncProbeConfiguration.Interval)
+
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) About to start new-connection probe number %d!\n",
+ debugging.Prefix,
+ probeCount,
+ )
+ }
+ transport := http2.Transport{}
+ transport.TLSClientConfig = &tls.Config{}
+
+ if !utilities.IsInterfaceNil(keyLogger) {
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "Using an SSL Key Logger for this new-connection probe.\n",
+ )
+ }
+
+ // The presence of a custom TLSClientConfig in a *generic* `transport`
+ // means that go will default to HTTP/1.1 and cowardly avoid HTTP/2:
+ // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L278
+ // Also, it would appear that the API's choice of HTTP vs HTTP2 can
+ // depend on whether the url contains
+ // https:// or http://:
+ // https://github.com/golang/go/blob/7ca6902c171b336d98adbb103d701a013229c806/src/net/http/transport.go#L74
+ transport.TLSClientConfig.KeyLogWriter = keyLogger
+ }
+ transport.TLSClientConfig.InsecureSkipVerify = true
+
+ client := &http.Client{Transport: &transport}
+
+ probeCount++
+ go Probe(proberCtx, client, ncProbeConfiguration.URL, false, &points, debugging)
+ }
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) New-connection probing driver is stopping after sending %d probes.\n",
+ debugging.Prefix,
+ probeCount,
+ )
+ }
+ close(points)
+ }()
+ return
+}
+
func LGProber(
proberCtx context.Context,
defaultConnection lgc.LoadGeneratingConnection,
altConnections *[]lgc.LoadGeneratingConnection,
- url string,
- interval time.Duration,
+ lgProbeConfiguration ProbeConfiguration,
debugging *debug.DebugWithPrefix,
) (points chan DataPoint) {
points = make(chan DataPoint)
+ debugging = debug.NewDebugWithPrefix(debugging.Level, debugging.Prefix+" load-generating probe")
+
go func() {
+ probeCount := 0
for proberCtx.Err() == nil {
- time.Sleep(interval)
+ time.Sleep(lgProbeConfiguration.Interval)
if debug.IsDebug(debugging.Level) {
- fmt.Printf("(%s) About to probe!\n", debugging.Prefix)
+ fmt.Printf(
+ "(%s) About to start load-generating probe number %d!\n",
+ debugging.Prefix,
+ probeCount,
+ )
}
- go LGProbe(proberCtx, defaultConnection, url, &points, debugging)
+ probeCount++
+ // TODO: We do not yet take in to account that the load-generating connection that we were given
+ // on which to perform measurements might go away during testing. We have access to all the open
+ // load-generating connections (altConnections) to handle this case, but we just aren't using them
+ // yet.
+ go Probe(
+ proberCtx,
+ defaultConnection.Client(),
+ lgProbeConfiguration.URL,
+ true,
+ &points,
+ debugging,
+ )
+ }
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) Load-generating probing driver is stopping after sending %d probes.\n",
+ debugging.Prefix,
+ probeCount,
+ )
}
+ close(points)
}()
-
return
}
@@ -159,14 +282,11 @@ func LGCollectData(
debugging.Level,
)
- lgProbeConfiguration := lgProbeConfigurationGenerator()
-
- LGProber(
- lgDataCollectionCtx,
+ lgProbeCtx, lgProbeCtxCancel := context.WithCancel(lgDataCollectionCtx)
+ probeDataPointsChannel := LGProber(lgProbeCtx,
lgcs[0],
&lgcs,
- lgProbeConfiguration.URL,
- time.Duration(100*time.Millisecond),
+ lgProbeConfigurationGenerator(),
debugging,
)
@@ -191,17 +311,13 @@ func LGCollectData(
for currentInterval := uint64(0); true; currentInterval++ {
- // When the program stops operating, then stop.
- if lgDataCollectionCtx.Err() != nil {
+ // When the program stops operating, then stop. When our invoker tells
+ // us to stop, then stop.
+ if operatingCtx.Err() != nil || lgDataCollectionCtx.Err() != nil {
+ lgProbeCtxCancel()
return
}
- // We may be asked to stop trying to saturate the
- // network and return our current status.
- if lgDataCollectionCtx.Err() != nil {
- //break
- }
-
now := time.Now()
// At each 1-second interval
if nextSampleStartTime.Sub(now) > 0 {
@@ -346,7 +462,19 @@ func LGCollectData(
}
}
- resulted <- LGDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs}
+ lgProbeCtxCancel()
+ probeDataPoints := make([]DataPoint, 0)
+ for dataPoint := range probeDataPointsChannel {
+ probeDataPoints = append(probeDataPoints, dataPoint)
+ }
+ if debug.IsDebug(debugging.Level) {
+ fmt.Printf(
+ "(%s) Collected %d load-generating probe data points\n",
+ debugging.Prefix,
+ len(probeDataPoints),
+ )
+ }
+ resulted <- LGDataCollectionResult{RateBps: movingAverage.CalculateAverage(), LGCs: lgcs, DataPoints: probeDataPoints}
}()
return
}
@@ -446,13 +574,18 @@ func (p *ProbeTracer) GetHttpDownloadDelta(httpDoneTime time.Time) time.Duration
return delta
}
-func NewProbeTracer(client *http.Client, isLG bool, debugging *debug.DebugWithPrefix) *ProbeTracer {
+func NewProbeTracer(
+ client *http.Client,
+ isLG bool,
+ probeId uint64,
+ debugging *debug.DebugWithPrefix,
+) *ProbeTracer {
probe := &ProbeTracer{
client: client,
stats: &stats.TraceStats{},
trace: nil,
debug: debugging.Level,
- probeid: utilities.GenerateConnectionId(),
+ probeid: probeId,
isLG: isLG,
}
trace := traceable.GenerateHttpTimingTracer(probe, debugging.Level)
@@ -468,8 +601,13 @@ func (probe *ProbeTracer) SetDnsStartTimeInfo(
probe.stats.DnsStartTime = now
probe.stats.DnsStart = dnsStartInfo
if debug.IsDebug(probe.debug) {
+ probeTypeLabel := "New-Connection"
+ if probe.isLG {
+ probeTypeLabel = "Load-Generating"
+ }
fmt.Printf(
- "(Probe) DNS Start for %v: %v\n",
+ "(%s Probe) DNS Start for Probe %v: %v\n",
+ probeTypeLabel,
probe.ProbeId(),
dnsStartInfo,
)
@@ -483,8 +621,13 @@ func (probe *ProbeTracer) SetDnsDoneTimeInfo(
probe.stats.DnsDoneTime = now
probe.stats.DnsDone = dnsDoneInfo
if debug.IsDebug(probe.debug) {
+ probeTypeLabel := "New-Connection"
+ if probe.isLG {
+ probeTypeLabel = "Load-Generating"
+ }
fmt.Printf(
- "(Probe) DNS Done for %v: %v\n",
+ "(%s Probe) DNS Done for Probe %v: %v\n",
+ probeTypeLabel,
probe.ProbeId(),
probe.stats.DnsDone,
)
@@ -496,8 +639,13 @@ func (probe *ProbeTracer) SetConnectStartTime(
) {
probe.stats.ConnectStartTime = now
if debug.IsDebug(probe.debug) {
+ probeTypeLabel := "New-Connection"
+ if probe.isLG {
+ probeTypeLabel = "Load-Generating"
+ }
fmt.Printf(
- "(Probe) TCP Start for %v at %v\n",
+ "(%s Probe) TCP Start for Probe %v at %v\n",
+ probeTypeLabel,
probe.ProbeId(),
probe.stats.ConnectStartTime,
)
@@ -511,8 +659,13 @@ func (probe *ProbeTracer) SetConnectDoneTimeError(
probe.stats.ConnectDoneTime = now
probe.stats.ConnectDoneError = err
if debug.IsDebug(probe.debug) {
+ probeTypeLabel := "New-Connection"
+ if probe.isLG {
+ probeTypeLabel = "Load-Generating"
+ }
fmt.Printf(
- "(Probe) TCP Done for %v (with error %v) @ %v\n",
+ "(%s Probe) TCP Done for Probe %v (with error %v) @ %v\n",
+ probeTypeLabel,
probe.ProbeId(),
probe.stats.ConnectDoneError,
probe.stats.ConnectDoneTime,
@@ -523,8 +676,13 @@ func (probe *ProbeTracer) SetConnectDoneTimeError(
func (probe *ProbeTracer) SetGetConnTime(now time.Time) {
probe.stats.GetConnectionStartTime = now
if debug.IsDebug(probe.debug) {
+ probeTypeLabel := "New-Connection"
+ if probe.isLG {
+ probeTypeLabel = "Load-Generating"
+ }
fmt.Printf(
- "(Probe) Started getting connection for %v @ %v\n",
+ "(%s Probe) Started getting connection for Probe %v @ %v\n",
+ probeTypeLabel,
probe.ProbeId(),
probe.stats.GetConnectionStartTime,
)
@@ -539,13 +697,21 @@ func (probe *ProbeTracer) SetGotConnTimeInfo(
probe.stats.ConnInfo = gotConnInfo
probe.stats.ConnectionReused = gotConnInfo.Reused
if probe.isLG && !gotConnInfo.Reused {
- fmt.Fprintf(os.Stderr, "A probe sent on an LG Connection used a new connection!\n")
+ fmt.Fprintf(
+ os.Stderr,
+ "A probe sent on an load-generating connection used a new connection!\n",
+ )
} else if debug.IsDebug(probe.debug) {
- fmt.Printf("Properly reused a connection when probing on an LG Connection!\n")
+ fmt.Printf("Properly reused a connection when probing on a load-generating connection!\n")
}
if debug.IsDebug(probe.debug) {
+ probeTypeLabel := "New-Connection"
+ if probe.isLG {
+ probeTypeLabel = "Load-Generating"
+ }
fmt.Printf(
- "(Probe) Got a reused connection for %v at %v with info %v\n",
+ "(%s Probe) Got a reused connection for Probe %v at %v with info %v\n",
+ probeTypeLabel,
probe.ProbeId(),
probe.stats.GetConnectionDoneTime,
probe.stats.ConnInfo,
@@ -558,8 +724,13 @@ func (probe *ProbeTracer) SetTLSHandshakeStartTime(
) {
probe.stats.TLSStartTime = utilities.Some(now)
if debug.IsDebug(probe.debug) {
+ probeTypeLabel := "New-Connection"
+ if probe.isLG {
+ probeTypeLabel = "Load-Generating"
+ }
fmt.Printf(
- "(Probe) Started TLS Handshake for %v @ %v\n",
+ "(%s Probe) Started TLS Handshake for Probe %v @ %v\n",
+ probeTypeLabel,
probe.ProbeId(),
probe.stats.TLSStartTime,
)
@@ -573,8 +744,13 @@ func (probe *ProbeTracer) SetTLSHandshakeDoneTimeState(
probe.stats.TLSDoneTime = utilities.Some(now)
probe.stats.TLSConnInfo = connectionState
if debug.IsDebug(probe.debug) {
+ probeTypeLabel := "New-Connection"
+ if probe.isLG {
+ probeTypeLabel = "Load-Generating"
+ }
fmt.Printf(
- "(Probe) Completed TLS handshake for %v at %v with info %v\n",
+ "(%s Probe) Completed TLS handshake for Probe %v at %v with info %v\n",
+ probeTypeLabel,
probe.ProbeId(),
probe.stats.TLSDoneTime,
probe.stats.TLSConnInfo,
@@ -589,8 +765,13 @@ func (probe *ProbeTracer) SetHttpWroteRequestTimeInfo(
probe.stats.HttpWroteRequestTime = now
probe.stats.HttpInfo = info
if debug.IsDebug(probe.debug) {
+ probeTypeLabel := "New-Connection"
+ if probe.isLG {
+ probeTypeLabel = "Load-Generating"
+ }
fmt.Printf(
- "(Probe) Http finished writing request for %v at %v with info %v\n",
+ "(%s Probe) Http finished writing request for Probe %v at %v with info %v\n",
+ probeTypeLabel,
probe.ProbeId(),
probe.stats.HttpWroteRequestTime,
probe.stats.HttpInfo,
@@ -603,8 +784,13 @@ func (probe *ProbeTracer) SetHttpResponseReadyTime(
) {
probe.stats.HttpResponseReadyTime = now
if debug.IsDebug(probe.debug) {
+ probeTypeLabel := "New-Connection"
+ if probe.isLG {
+ probeTypeLabel = "Load-Generating"
+ }
fmt.Printf(
- "(Probe) Http response is ready for %v at %v\n",
+ "(%s Probe) Http response is ready for Probe %v at %v\n",
+ probeTypeLabel,
probe.ProbeId(),
probe.stats.HttpResponseReadyTime,
)
diff --git a/utilities/utilities.go b/utilities/utilities.go
index 76acbd2..7b96bef 100644
--- a/utilities/utilities.go
+++ b/utilities/utilities.go
@@ -76,7 +76,7 @@ func SeekForAppend(file *os.File) (err error) {
return
}
-var GenerateConnectionId func() uint64 = func() func() uint64 {
+var GenerateUniqueId func() uint64 = func() func() uint64 {
var nextConnectionId uint64 = 0
return func() uint64 {
return atomic.AddUint64(&nextConnectionId, 1)
diff --git a/utilities/utilities_test.go b/utilities/utilities_test.go
new file mode 100644
index 0000000..72a11fb
--- /dev/null
+++ b/utilities/utilities_test.go
@@ -0,0 +1,39 @@
+package utilities
+
+import (
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestReadAfterCloseOnBufferedChannel(t *testing.T) {
+ communication := make(chan int, 100)
+
+ maxC := 0
+
+ wg := sync.WaitGroup{}
+ wg.Add(2)
+
+ go func() {
+ counter := 0
+ for range make([]int, 50) {
+ communication <- counter
+ counter++
+ }
+ close(communication)
+ wg.Done()
+ }()
+
+ go func() {
+ time.Sleep(2 * time.Second)
+ for c := range communication {
+ maxC = c
+ }
+ wg.Done()
+ }()
+
+ wg.Wait()
+ if maxC != 49 {
+ t.Fatalf("Did not read all sent items from a buffered channel after channel.")
+ }
+}