summaryrefslogtreecommitdiff
path: root/xgb_test.go
diff options
context:
space:
mode:
authorjEzEk <[email protected]>2018-10-13 19:34:49 +0200
committerjEzEk <[email protected]>2018-10-25 18:33:32 +0200
commita2583299cf9dcb5b8e59d636b0aa8341fab9a3af (patch)
treeddd74a8f48e21f872ac2df402e78d63e34dab54f /xgb_test.go
parent6a67513dc990e60178c2de432a2e5a1042351b56 (diff)
test server w tests, test *Conn open/close
Diffstat (limited to 'xgb_test.go')
-rw-r--r--xgb_test.go839
1 files changed, 590 insertions, 249 deletions
diff --git a/xgb_test.go b/xgb_test.go
index 7eda5e4..5540e5c 100644
--- a/xgb_test.go
+++ b/xgb_test.go
@@ -10,7 +10,6 @@ import (
"runtime"
"strconv"
"strings"
- "sync"
"testing"
"time"
)
@@ -27,11 +26,11 @@ type errTimeout struct{ error }
func (_ errTimeout) Timeout() bool { return true }
var (
- serverErrNotImplemented = errors.New("command not implemented")
- serverErrEOF = io.EOF
- serverErrClosed = errors.New("server closed")
- serverErrWrite = errors.New("server write failed")
- serverErrRead = errors.New("server read failed")
+ dXErrNotImplemented = errors.New("command not implemented")
+ dXErrClosed = errors.New("server closed")
+ dXErrWrite = errors.New("server write failed")
+ dXErrRead = errors.New("server read failed")
+ dXErrResponse = errors.New("server response error")
)
type dXIoResult struct {
@@ -43,8 +42,30 @@ type dXIo struct {
result chan dXIoResult
}
-// dumm server implementing net.Conn interface,
-// Read blocks until Write, pipes Write to Read, than Read blocks again.
+type dXCSendEvent struct{}
+type dXCWriteLock struct{}
+type dXCWriteUnlock struct{}
+type dXCWriteError struct{}
+type dXCWriteSuccess struct{ errorResponse bool }
+type dXCReadLock struct{}
+type dXCReadUnlock struct{}
+type dXCReadError struct{}
+type dXCReadSuccess struct{}
+
+type dXEvent struct{}
+
+func (_ dXEvent) Bytes() []byte { return nil }
+func (_ dXEvent) String() string { return "dummy X server event" }
+
+type dXError struct {
+ seqId uint16
+}
+
+func (e dXError) SequenceId() uint16 { return e.seqId }
+func (_ dXError) BadId() uint32 { return 0 }
+func (_ dXError) Error() string { return "dummy X server error reply" }
+
+// dummy X server implementing net.Conn interface,
type dX struct {
addr addr
in, out chan dXIo
@@ -52,6 +73,10 @@ type dX struct {
done chan struct{}
}
+// Results running dummy X server, satisfying net.Conn interface for test purposes.
+// It is users responsibility to stop and clean up resources with (*dX).Close, if not needed anymore.
+// By default, the read and write method are unlocked and will not result in error and read response will be a non error X reply (like with (*dX).WriteSuccess(false), (*dX).ReadSuccess()).
+//TODO make (*dX).SetDeadline, (*dX).SetReadDeadline, (*dX).SetWriteDeadline work proprely.
func newDX(name string) *dX {
s := &dX{
addr{name},
@@ -72,24 +97,48 @@ func newDX(name string) *dX {
in, out := s.in, chan dXIo(nil)
buf := &bytes.Buffer{}
+ errorRead, errorWrite, errorResponse := false, false, false
+ lockRead := false
+
+ NewErrorFuncs[255] = func(buf []byte) Error {
+ return dXError{Get16(buf[2:])}
+ }
+ NewEventFuncs[128&127] = func(buf []byte) Event {
+ return dXEvent{}
+ }
go func() {
defer close(s.done)
for {
select {
case dxsio := <-in:
+ if errorWrite {
+ dxsio.result <- dXIoResult{0, dXErrWrite}
+ break
+ }
+
response := make([]byte, 32)
- response[0] = 1 // not error reply
+ if errorResponse { // response will be error
+ response[0] = 0 // error
+ response[1] = 255 // error function
+ } else { // response will by reply with no additional reply
+ response[0] = 1 // reply
+ }
Put16(response[2:], seqId) // sequence number
+ incrementSequenceId()
buf.Write(response)
- incrementSequenceId()
dxsio.result <- dXIoResult{len(dxsio.b), nil}
- if out == nil && buf.Len() > 0 {
+ if !lockRead && out == nil {
out = s.out
}
case dxsio := <-out:
+ if errorRead {
+ dxsio.result <- dXIoResult{0, dXErrRead}
+ break
+ }
+
n, err := buf.Read(dxsio.b)
dxsio.result <- dXIoResult{n, err}
@@ -100,11 +149,47 @@ func newDX(name string) *dX {
if ci == nil {
return
}
+ switch cs := ci.(type) {
+ case dXCSendEvent:
+ response := make([]byte, 32)
+ response[0] = 128
+ buf.Write(response)
+
+ if !lockRead && out == nil {
+ out = s.out
+ }
+ case dXCWriteLock:
+ in = nil
+ case dXCWriteUnlock:
+ in = s.in
+ case dXCWriteError:
+ errorWrite = true
+ case dXCWriteSuccess:
+ errorWrite = false
+ errorResponse = cs.errorResponse
+ case dXCReadLock:
+ out = nil
+ lockRead = true
+ case dXCReadUnlock:
+ lockRead = false
+ if buf.Len() > 0 && out == nil {
+ out = s.out
+ }
+ case dXCReadError:
+ errorRead = true
+ case dXCReadSuccess:
+ errorRead = false
+ default:
+ }
}
}
}()
return s
}
+
+// Shuts down dummy X server. Every blocking or future method calls will do nothing and result in error.
+// Result will be dXErrClosed if server was allready closed.
+// Server can not be unclosed.
func (s *dX) Close() error {
select {
case s.control <- nil:
@@ -112,203 +197,135 @@ func (s *dX) Close() error {
return nil
case <-s.done:
}
- return serverErrClosed
+ return dXErrClosed
}
+
+// Imitates write action to X server.
+// If not locked by (*dX).WriteLock, it results in error or success.
+//
+// If Write errors, the second result parameter will be an error {dXErrWrite|dXErrClosed}, the resulting first parameter will be 0,
+// no response will be generated and the internal sequence number will not be incremented.
+//
+// If Write succeedes, it results in (len(b), nil), the (*dX).Read will be unblocked (if not locked with (*dX).ReadLock),
+// an [32]byte response will be written to buffer from which (*dX).Read reads,
+// with sequence number and proper X response type (error or reply) and the internal sequence number will be increased.
+//
+// If server was closed previously, result will be (0, dXErrClosed).
func (s *dX) Write(b []byte) (int, error) {
resChan := make(chan dXIoResult)
- fmt.Printf("(*dX).Write: got write request: %v\n", b)
+ //fmt.Printf("(*dX).Write: got write request: %v\n", b)
select {
case s.in <- dXIo{b, resChan}:
- fmt.Printf("(*dX).Write: input channel has accepted request\n")
+ //fmt.Printf("(*dX).Write: input channel has accepted request\n")
res := <-resChan
- fmt.Printf("(*dX).Write: got result: %v\n", res)
+ //fmt.Printf("(*dX).Write: got result: %v\n", res)
return res.n, res.err
case <-s.done:
}
- fmt.Printf("(*dX).Write: server was closed\n")
- return 0, serverErrClosed
+ //fmt.Printf("(*dX).Write: server was closed\n")
+ return 0, dXErrClosed
}
+
+// Imitates read action from X server.
+// If locked by (*dX).ReadLock, read will block until unlocking with (*dX).ReadUnlock, or server closes.
+//
+// If not locked, Read will result in error, or block until internal read buffer is not empty, depending on internal state.
+// The internal state can be modified via (*dX).ReadError, or (*dX).ReadSuccess
+// ReadError makes it return (0, dXErrRead), with no changes to internal buffer or state.
+// ReadSuccess makes it block until there are some write responses. After emtying the internal read buffer, all next Read requests will block untill another successful write requests.
+//
+// The resulting read success response type can be altered with (*dX).WriteSuccess method.
+//
+// If server was closed previously, result will be (0, io.EOF).
func (s *dX) Read(b []byte) (int, error) {
resChan := make(chan dXIoResult)
- fmt.Printf("(*dX).Read: got read request of length: %v\n", len(b))
+ //fmt.Printf("(*dX).Read: got read request of length: %v\n", len(b))
select {
case s.out <- dXIo{b, resChan}:
- fmt.Printf("(*dX).Read: output channel has accepted request\n")
+ //fmt.Printf("(*dX).Read: output channel has accepted request\n")
res := <-resChan
- fmt.Printf("(*dX).Read: got result: %v\n", res)
- fmt.Printf("(*dX).Read: result bytes: %v\n", b)
+ //fmt.Printf("(*dX).Read: got result: %v\n", res)
+ //fmt.Printf("(*dX).Read: result bytes: %v\n", b)
return res.n, res.err
case <-s.done:
- fmt.Printf("(*dX).Read: server was closed\n")
+ //fmt.Printf("(*dX).Read: server was closed\n")
}
- return 0, serverErrClosed
+ return 0, io.EOF
}
func (s *dX) LocalAddr() net.Addr { return s.addr }
func (s *dX) RemoteAddr() net.Addr { return s.addr }
-func (s *dX) SetDeadline(t time.Time) error { return serverErrNotImplemented }
-func (s *dX) SetReadDeadline(t time.Time) error { return serverErrNotImplemented }
-func (s *dX) SetWriteDeadline(t time.Time) error { return serverErrNotImplemented }
-
-type serverBlocking struct {
- addr addr
- control chan interface{}
- done chan struct{}
-}
-
-func newServerBlocking(name string) *serverBlocking {
- s := &serverBlocking{
- addr{name},
- make(chan interface{}),
- make(chan struct{}),
- }
- runned := make(chan struct{})
- go func() {
- close(runned)
- defer close(s.done)
- for {
- select {
- case ci := <-s.control:
- if ci == nil {
- return
- }
- }
- }
- }()
- <-runned
- return s
-}
+func (s *dX) SetDeadline(t time.Time) error { return dXErrNotImplemented }
+func (s *dX) SetReadDeadline(t time.Time) error { return dXErrNotImplemented }
+func (s *dX) SetWriteDeadline(t time.Time) error { return dXErrNotImplemented }
-func (s *serverBlocking) Write(b []byte) (int, error) {
- select {
- case <-s.done:
- }
- return 0, serverErrClosed
-}
-func (s *serverBlocking) Read(b []byte) (int, error) {
- select {
- case <-s.done:
- }
- return 0, serverErrEOF
-}
-func (s *serverBlocking) Close() error {
+func (s *dX) Control(i interface{}) error {
select {
- case s.control <- nil:
- <-s.done
+ case s.control <- i:
return nil
case <-s.done:
- return serverErrClosed
}
+ return dXErrClosed
}
-func (s *serverBlocking) LocalAddr() net.Addr { return s.addr }
-func (s *serverBlocking) RemoteAddr() net.Addr { return s.addr }
-func (s *serverBlocking) SetDeadline(t time.Time) error { return nil }
-func (s *serverBlocking) SetReadDeadline(t time.Time) error { return nil }
-func (s *serverBlocking) SetWriteDeadline(t time.Time) error { return nil }
-type serverWriteErrorReadError struct {
- *serverBlocking
+// Adds an Event into read buffer.
+func (s *dX) SendEvent() error {
+ return s.Control(dXCSendEvent{})
}
-func newServerWriteErrorReadError(name string) *serverWriteErrorReadError {
- return &serverWriteErrorReadError{newServerBlocking(name)}
-}
-func (s *serverWriteErrorReadError) Write(b []byte) (int, error) {
- select {
- case <-s.done:
- return 0, serverErrClosed
- default:
- }
- return 0, serverErrWrite
-}
-func (s *serverWriteErrorReadError) Read(b []byte) (int, error) {
- select {
- case <-s.done:
- return 0, serverErrClosed
- default:
- }
- return 0, serverErrRead
+// Locks writing. All write requests will be blocked until write is unlocked with (*dX).WriteUnlock, or server closes.
+func (s *dX) WriteLock() error {
+ return s.Control(dXCWriteLock{})
}
-type serverWriteErrorReadBlocking struct {
- *serverBlocking
+// Unlocks writing. All blocked write requests until now will be accepted.
+func (s *dX) WriteUnlock() error {
+ return s.Control(dXCWriteUnlock{})
}
-func newServerWriteErrorReadBlocking(name string) *serverWriteErrorReadBlocking {
- return &serverWriteErrorReadBlocking{newServerBlocking(name)}
-}
-func (s *serverWriteErrorReadBlocking) Write(b []byte) (int, error) {
- select {
- case <-s.done:
- return 0, serverErrClosed
- default:
+// Unlocks writing and makes (*dX).Write to result (0, dXErrWrite).
+func (s *dX) WriteError() error {
+ if err := s.WriteUnlock(); err != nil {
+ return err
}
- return 0, serverErrWrite
+ return s.Control(dXCWriteError{})
}
-type serverWriteSuccessReadBlocking struct {
- *serverBlocking
-}
-
-func newServerWriteSuccessReadBlocking(name string) *serverWriteSuccessReadBlocking {
- return &serverWriteSuccessReadBlocking{newServerBlocking(name)}
-}
-func (s *serverWriteSuccessReadBlocking) Write(b []byte) (int, error) {
- select {
- case <-s.done:
- return 0, serverErrClosed
- default:
+// Unlocks writing and makes (*dX).Write to result (len(b), nil), with a proper X reply.
+// If errorResult is true, the response will be an X error response,
+// else an normal reply. See (*dX).Write for details.
+func (s *dX) WriteSuccess(errorResult bool) error {
+ if err := s.WriteUnlock(); err != nil {
+ return err
}
- return len(b), nil
+ return s.Control(dXCWriteSuccess{errorResult})
}
-type serverWriteSuccessReadErrorAfterWrite struct {
- *serverBlocking
- out chan struct{}
- wg *sync.WaitGroup
+// Locks reading. All read requests will be blocked until read is unlocked with (*dX).ReadUnlock, or server closes.
+// (*dX).Read wil block even after successful write.
+func (s *dX) ReadLock() error {
+ return s.Control(dXCReadLock{})
}
-func newServerWriteSuccessReadErrorAfterWrite(name string) *serverWriteSuccessReadErrorAfterWrite {
- return &serverWriteSuccessReadErrorAfterWrite{
- newServerBlocking(name),
- make(chan struct{}),
- &sync.WaitGroup{},
- }
+// Unlocks reading. If there are any unresponded requests in reading buffer, read will be unblocked.
+func (s *dX) ReadUnlock() error {
+ return s.Control(dXCReadUnlock{})
}
-func (s *serverWriteSuccessReadErrorAfterWrite) Write(b []byte) (int, error) {
- select {
- case <-s.done:
- return 0, serverErrClosed
- default:
- }
- s.wg.Add(1)
- go func() {
- select {
- case s.out <- struct{}{}:
- case <-s.done:
- }
- s.wg.Done()
- }()
- return len(b), nil
-}
-func (s *serverWriteSuccessReadErrorAfterWrite) Read(b []byte) (int, error) {
- select {
- case <-s.done:
- return 0, serverErrClosed
- default:
- }
- select {
- case <-s.out:
- return 0, serverErrRead
- case <-s.done:
+
+// Unlocks read and makes every blocked and following (*dX).Read requests fail. See (*dX).Read for details.
+func (s *dX) ReadError() error {
+ if err := s.ReadUnlock(); err != nil {
+ return err
}
- return 0, serverErrClosed
+ return s.Control(dXCReadError{})
}
-func (s *serverWriteSuccessReadErrorAfterWrite) Close() error {
- if err := s.serverBlocking.Close(); err != nil {
+
+// Unlocks read and makes every blocked and following (*dX).Read requests be handled, if there are any in read buffer.
+// See (*dX).Read for details.
+func (s *dX) ReadSuccess() error {
+ if err := s.ReadUnlock(); err != nil {
return err
}
- s.wg.Wait()
- return nil
+ return s.Control(dXCReadSuccess{})
}
type goroutine struct {
@@ -318,12 +335,16 @@ type goroutine struct {
}
type leaks struct {
+ name string
goroutines map[int]goroutine
+ report []*leaks
}
-func leaksMonitor() leaks {
- return leaks{
+func leaksMonitor(name string, monitors ...*leaks) *leaks {
+ return &leaks{
+ name,
leaks{}.collectGoroutines(),
+ monitors,
}
}
@@ -375,126 +396,446 @@ func (l leaks) collectGoroutines() map[int]goroutine {
return res
}
-func (l leaks) checkTesting(t *testing.T) {
- {
- goroutines := l.collectGoroutines()
- if len(l.goroutines) >= len(goroutines) {
- return
+func (l leaks) leakingGoroutines() []goroutine {
+ goroutines := l.collectGoroutines()
+ res := []goroutine{}
+ for id, gr := range goroutines {
+ if _, ok := l.goroutines[id]; ok {
+ continue
}
+ res = append(res, gr)
+ }
+ return res
+}
+func (l leaks) checkTesting(t *testing.T) {
+ if len(l.leakingGoroutines()) == 0 {
+ return
}
leakTimeout := time.Second
time.Sleep(leakTimeout)
//t.Logf("possible goroutine leakage, waiting %v", leakTimeout)
- goroutines := l.collectGoroutines()
- if len(l.goroutines) >= len(goroutines) {
- return
+ grs := l.leakingGoroutines()
+ for _, gr := range grs {
+ t.Errorf("%s: %s is leaking", l.name, gr.name)
+ //t.Errorf("%s: %s is leaking\n%v", l.name, gr.name, string(gr.stack))
}
- t.Errorf("%d goroutine leaks: start(%d) != end(%d)", len(goroutines)-len(l.goroutines), len(l.goroutines), len(goroutines))
- for id, gr := range goroutines {
- if _, ok := l.goroutines[id]; ok {
- continue
- }
- t.Log(gr.name, "\n", string(gr.stack))
+ for _, rl := range l.report {
+ rl.ignoreLeak(grs...)
+ }
+}
+func (l *leaks) ignoreLeak(grs ...goroutine) {
+ for _, gr := range grs {
+ l.goroutines[gr.id] = gr
}
}
-func TestDummyServersRunClose(t *testing.T) {
-
- testCases := []struct {
- name string
- serverConstuctor func(string) net.Conn
- }{
- {"write blocking,read blocking server", func(n string) net.Conn { return newServerBlocking(n) }},
- {"write error,read error server", func(n string) net.Conn { return newServerWriteErrorReadError(n) }},
- {"write error,read blocking server", func(n string) net.Conn { return newServerWriteErrorReadBlocking(n) }},
- {"write success,read blocking server", func(n string) net.Conn { return newServerWriteSuccessReadBlocking(n) }},
- {"write success,read error afer write server", func(n string) net.Conn { return newServerWriteSuccessReadErrorAfterWrite(n) }},
- {"write success,read success afer write server", func(n string) net.Conn { return newDX(n) }},
+func testDXCombinations(writeStates, readStates []string) []func() (*dX, error) {
+ writeSetters := map[string]func(*dX) error{
+ "lock": (*dX).WriteLock,
+ "error": (*dX).WriteError,
+ "successReply": func(s *dX) error { return s.WriteSuccess(false) },
+ "successError": func(s *dX) error { return s.WriteSuccess(true) },
+ }
+ readSetters := map[string]func(*dX) error{
+ "lock": (*dX).ReadLock,
+ "error": (*dX).ReadError,
+ "success": (*dX).ReadSuccess,
}
- for _, tc := range testCases {
- t.Run(tc.name, func(t *testing.T) {
- defer leaksMonitor().checkTesting(t)
- serverConn := tc.serverConstuctor(tc.name)
+ res := []func() (*dX, error){}
+ for _, writeState := range writeStates {
+ writeState, writeSetter := writeState, writeSetters[writeState]
+ if writeSetter == nil {
+ panic("unknown write state: " + writeState)
+ continue
+ }
+ for _, readState := range readStates {
+ readState, readSetter := readState, readSetters[readState]
+ if readSetter == nil {
+ panic("unknown read state: " + readState)
+ continue
+ }
+ res = append(res, func() (*dX, error) {
+ s := newDX("write=" + writeState + ",read=" + readState)
+
+ if err := readSetter(s); err != nil {
+ s.Close()
+ return nil, errors.New("set read " + readState + " error: " + err.Error())
+ }
+
+ if err := writeSetter(s); err != nil {
+ s.Close()
+ return nil, errors.New("set write " + writeState + " error: " + err.Error())
+ }
+
+ return s, nil
+ })
+ }
+ }
+ return res
+}
- {
- closeErr := make(chan error)
- go func() {
- closeErr <- serverConn.Close()
- close(closeErr)
- }()
- closeTimeout := time.Second
+func TestDummyXServer(t *testing.T) {
+ timeout := time.Millisecond
+ wantResponse := func(action func(*dX) error, want, block error) func(*dX) error {
+ return func(s *dX) error {
+ actionResult := make(chan error)
+ timedOut := make(chan struct{})
+ go func() {
+ err := action(s)
select {
- case err := <-closeErr:
- want := error(nil)
- if err != want {
- t.Errorf("(net.Conn).Close()=%v, want %v", err, want)
+ case <-timedOut:
+ if err != block {
+ t.Errorf("after unblocking, action result=%v, want %v", err, block)
}
- case <-time.After(closeTimeout):
- t.Errorf("*Conn.Close() not responded for %v", closeTimeout)
+ case actionResult <- err:
+ }
+ }()
+ select {
+ case err := <-actionResult:
+ if err != want {
+ return errors.New(fmt.Sprintf("action result=%v, want %v", err, want))
}
+ case <-time.After(timeout):
+ close(timedOut)
+ return errors.New(fmt.Sprintf("action did not respond for %v, result want %v", timeout, want))
}
- {
- closeErr := make(chan error)
- go func() {
- closeErr <- serverConn.Close()
- close(closeErr)
- }()
- closeTimeout := time.Second
+ return nil
+ }
+ }
+ wantBlock := func(action func(*dX) error, unblock error) func(*dX) error {
+ return func(s *dX) error {
+ actionResult := make(chan error)
+ timedOut := make(chan struct{})
+ go func() {
+ err := action(s)
select {
- case err := <-closeErr:
- want := serverErrClosed
- if err != want {
- t.Errorf("(net.Conn).Close()=%v, want %v", err, want)
+ case <-timedOut:
+ if err != unblock {
+ t.Errorf("after unblocking, action result=%v, want %v", err, unblock)
}
- case <-time.After(closeTimeout):
- t.Errorf("*Conn.Close() not responded for %v", closeTimeout)
+ case actionResult <- err:
}
+ }()
+ select {
+ case err := <-actionResult:
+ return errors.New(fmt.Sprintf("action result=%v, want to be blocked", err))
+ case <-time.After(timeout):
+ close(timedOut)
}
- })
+ return nil
+ }
+ }
+ write := func() func(*dX) error {
+ return func(s *dX) error {
+ _, err := s.Write([]byte{1})
+ return err
+ }
+ }
+ read := func() func(*dX) error {
+ return func(s *dX) error {
+ b := make([]byte, 32)
+ _, err := s.Read(b)
+ return err
+ }
+ }
+ readSuccess := func(seqId uint16, errorResponse bool) func(*dX) error {
+ return func(s *dX) error {
+ b := make([]byte, 32)
+ _, err := s.Read(b)
+ if err != nil {
+ return err
+ }
+ if seqId != Get16(b[2:]) {
+ return errors.New(fmt.Sprintf("got read sequence number %d, want %d", Get16(b[2:]), seqId))
+ }
+ b0, desc := 0, "error"
+ if !errorResponse {
+ b0, desc = 1, "valid"
+ }
+ if int(b[0]) != b0 {
+ return errors.New(fmt.Sprintf("response is not an %s reply: %v", desc, b))
+ }
+ return nil
+ }
}
-}
-
-func TestConnOpenClose(t *testing.T) {
testCases := []struct {
- name string
- serverConstuctor func(string) net.Conn
+ description string
+ servers []func() (*dX, error)
+ actions []func(*dX) error // actions per server
}{
- //{"blocking server", func(n string) net.Conn { return newServerBlocking(n) }}, // i'm not ready to handle this yet
- //{"write error,read error server", func(n string) net.Conn { return newServerWriteErrorReadError(n) }},
- //{"write error,read blocking server", func(n string) net.Conn { return newServerWriteErrorReadBlocking(n) }},
- //{"write success,read blocking server", func(n string) net.Conn { return newServerWriteSuccessReadBlocking(n) }},
- //{"write success,read error afer write server", func(n string) net.Conn { return newServerWriteSuccessReadErrorAfterWrite(n) }},
- {"write success,read success afer write server", func(n string) net.Conn { return newDX(n) }},
+ {"empty",
+ []func() (*dX, error){
+ func() (*dX, error) { return newDX("server"), nil },
+ },
+ []func(*dX) error{
+ func(s *dX) error { return nil },
+ },
+ },
+ {"close,close",
+ testDXCombinations(
+ []string{"lock", "error", "successError", "successReply"},
+ []string{"lock", "error", "success"},
+ ),
+ []func(*dX) error{
+ wantResponse((*dX).Close, nil, dXErrClosed),
+ wantResponse((*dX).Close, dXErrClosed, dXErrClosed),
+ },
+ },
+ {"write,close,write",
+ testDXCombinations(
+ []string{"lock"},
+ []string{"lock", "error", "success"},
+ ),
+ []func(*dX) error{
+ wantBlock(write(), dXErrClosed),
+ wantResponse((*dX).Close, nil, dXErrClosed),
+ wantResponse(write(), dXErrClosed, dXErrClosed),
+ },
+ },
+ {"write,close,write",
+ testDXCombinations(
+ []string{"error"},
+ []string{"lock", "error", "success"},
+ ),
+ []func(*dX) error{
+ wantResponse(write(), dXErrWrite, dXErrClosed),
+ wantResponse((*dX).Close, nil, dXErrClosed),
+ wantResponse(write(), dXErrClosed, dXErrClosed),
+ },
+ },
+ {"write,close,write",
+ testDXCombinations(
+ []string{"successError", "successReply"},
+ []string{"lock", "error", "success"},
+ ),
+ []func(*dX) error{
+ wantResponse(write(), nil, dXErrClosed),
+ wantResponse((*dX).Close, nil, dXErrClosed),
+ wantResponse(write(), dXErrClosed, dXErrClosed),
+ },
+ },
+ {"read,close,read",
+ testDXCombinations(
+ []string{"lock", "error", "successError", "successReply"},
+ []string{"lock", "error", "success"},
+ ),
+ []func(*dX) error{
+ wantBlock(read(), io.EOF),
+ wantResponse((*dX).Close, nil, dXErrClosed),
+ wantResponse(read(), io.EOF, io.EOF),
+ },
+ },
+ {"write,read",
+ testDXCombinations(
+ []string{"lock"},
+ []string{"lock", "error", "success"},
+ ),
+ []func(*dX) error{
+ wantBlock(write(), dXErrClosed),
+ wantBlock(read(), io.EOF),
+ },
+ },
+ {"write,read",
+ testDXCombinations(
+ []string{"error"},
+ []string{"lock", "error", "success"},
+ ),
+ []func(*dX) error{
+ wantResponse(write(), dXErrWrite, dXErrClosed),
+ wantBlock(read(), io.EOF),
+ },
+ },
+ {"write,read",
+ testDXCombinations(
+ []string{"successError"},
+ []string{"lock"},
+ ),
+ []func(*dX) error{
+ wantResponse(write(), nil, dXErrClosed),
+ wantBlock(read(), io.EOF),
+ },
+ },
+ {"write,read",
+ testDXCombinations(
+ []string{"successError"},
+ []string{"error"},
+ ),
+ []func(*dX) error{
+ wantResponse(write(), nil, dXErrClosed),
+ wantResponse(read(), dXErrRead, io.EOF),
+ },
+ },
+ {"write,read",
+ testDXCombinations(
+ []string{"successError"},
+ []string{"success"},
+ ),
+ []func(*dX) error{
+ wantResponse(write(), nil, dXErrClosed),
+ wantResponse(readSuccess(1, true), nil, io.EOF),
+ },
+ },
+ {"write,read",
+ testDXCombinations(
+ []string{"successReply"},
+ []string{"success"},
+ ),
+ []func(*dX) error{
+ wantResponse(write(), nil, dXErrClosed),
+ wantResponse(readSuccess(1, false), nil, io.EOF),
+ },
+ },
}
for _, tc := range testCases {
- t.Run(tc.name, func(t *testing.T) {
- serverConn := tc.serverConstuctor(tc.name)
- defer serverConn.Close()
+ t.Run(tc.description, func(t *testing.T) {
+ defer leaksMonitor(tc.description).checkTesting(t)
- defer leaksMonitor().checkTesting(t)
+ for _, server := range tc.servers {
+ s, err := server()
+ if err != nil {
+ t.Error(err)
+ continue
+ }
+ if s == nil {
+ t.Error("nil server in testcase")
+ continue
+ }
- c, err := postNewConn(&Conn{conn: serverConn})
- if err != nil {
- t.Fatalf("connect error: %v", err)
+ t.Run(s.LocalAddr().String(), func(t *testing.T) {
+ defer leaksMonitor(s.LocalAddr().String()).checkTesting(t)
+ for _, action := range tc.actions {
+ if err := action(s); err != nil {
+ t.Error(err)
+ break
+ }
+ }
+ s.Close()
+ })
}
- //t.Logf("connection to server created: %v", c)
+ })
+ }
+}
- closeErr := make(chan struct{})
+func TestConnOnNonBlockingDummyXServer(t *testing.T) {
+ timeout := time.Millisecond
+ wantResponse := func(action func(*Conn) error, want, block error) func(*Conn) error {
+ return func(c *Conn) error {
+ actionResult := make(chan error)
+ timedOut := make(chan struct{})
go func() {
- //t.Logf("closing connection to server")
- c.Close()
- close(closeErr)
+ err := action(c)
+ select {
+ case <-timedOut:
+ if err != block {
+ t.Errorf("after unblocking, action result=%v, want %v", err, block)
+ }
+ case actionResult <- err:
+ }
}()
- closeTimeout := time.Second
select {
- case <-closeErr:
- //t.Logf("connection to server closed")
- case <-time.After(closeTimeout):
- t.Errorf("*Conn.Close() not responded for %v", closeTimeout)
+ case err := <-actionResult:
+ if err != want {
+ return errors.New(fmt.Sprintf("action result=%v, want %v", err, want))
+ }
+ case <-time.After(timeout):
+ close(timedOut)
+ return errors.New(fmt.Sprintf("action did not respond for %v, result want %v", timeout, want))
}
- })
+ return nil
+ }
}
+ crequest := func(checked, reply bool) func(*Conn) error {
+ return func(c *Conn) error {
+ cookie := c.NewCookie(checked, reply)
+ c.NewRequest([]byte("crequest"), cookie)
+ _, err := cookie.Reply()
+ return err
+ }
+ }
+
+ testCases := []struct {
+ description string
+ servers []func() (*dX, error)
+ actions []func(*Conn) error
+ }{
+ {"cclose",
+ testDXCombinations(
+ []string{"successError", "successReply"},
+ []string{"success"},
+ ),
+ []func(*Conn) error{},
+ },
+ {"crequest",
+ testDXCombinations([]string{"successError"}, []string{"success"}),
+ []func(*Conn) error{
+ wantResponse(crequest(true, true), dXError{1}, io.ErrShortWrite),
+ },
+ },
+ {"crequest",
+ testDXCombinations([]string{"successReply"}, []string{"success"}),
+ []func(*Conn) error{
+ wantResponse(crequest(true, true), nil, io.ErrShortWrite),
+ },
+ },
+ // sometimes panic on unfixed branch - close of closed channel
+ {"cclose",
+ testDXCombinations([]string{"error"}, []string{"error", "success"}),
+ []func(*Conn) error{},
+ },
+ }
+ for _, tc := range testCases {
+ t.Run(tc.description, func(t *testing.T) {
+ tclm := leaksMonitor("test case " + tc.description)
+ defer tclm.checkTesting(t)
+
+ for _, server := range tc.servers {
+ s, err := server()
+ if err != nil {
+ t.Error(err)
+ continue
+ }
+ if s == nil {
+ t.Error("nil *dX in testcase")
+ continue
+ }
+
+ t.Run(s.LocalAddr().String(), func(t *testing.T) {
+ sclm := leaksMonitor(s.LocalAddr().String()+" after sever close", tclm)
+ defer sclm.checkTesting(t)
+ c, err := postNewConn(&Conn{conn: s})
+ if err != nil {
+ t.Errorf("connect to dummy server error: %v", err)
+ return
+ }
+
+ rlm := leaksMonitor(c.conn.LocalAddr().String() + " after actions end")
+ for _, action := range tc.actions {
+ if err := action(c); err != nil {
+ t.Error(err)
+ break
+ }
+ }
+ c.Close()
+ if err := wantResponse(
+ func(c *Conn) error {
+ if ev, err := c.WaitForEvent(); ev != nil || err != nil {
+ return fmt.Errorf("after (*Conn).Close, (*Conn).WaitForEvent() = (%v,%v), want (nil,nil)", ev, err)
+ }
+ return nil
+ },
+ nil,
+ io.ErrShortWrite,
+ )(c); err != nil {
+ t.Error(err)
+ }
+ rlm.checkTesting(t)
+ })
+
+ s.Close()
+ }
+ })
+ }
}