diff options
| author | jEzEk <[email protected]> | 2018-10-13 19:34:49 +0200 |
|---|---|---|
| committer | jEzEk <[email protected]> | 2018-10-25 18:33:32 +0200 |
| commit | a2583299cf9dcb5b8e59d636b0aa8341fab9a3af (patch) | |
| tree | ddd74a8f48e21f872ac2df402e78d63e34dab54f /xgb_test.go | |
| parent | 6a67513dc990e60178c2de432a2e5a1042351b56 (diff) | |
test server w tests, test *Conn open/close
Diffstat (limited to 'xgb_test.go')
| -rw-r--r-- | xgb_test.go | 839 |
1 files changed, 590 insertions, 249 deletions
diff --git a/xgb_test.go b/xgb_test.go index 7eda5e4..5540e5c 100644 --- a/xgb_test.go +++ b/xgb_test.go @@ -10,7 +10,6 @@ import ( "runtime" "strconv" "strings" - "sync" "testing" "time" ) @@ -27,11 +26,11 @@ type errTimeout struct{ error } func (_ errTimeout) Timeout() bool { return true } var ( - serverErrNotImplemented = errors.New("command not implemented") - serverErrEOF = io.EOF - serverErrClosed = errors.New("server closed") - serverErrWrite = errors.New("server write failed") - serverErrRead = errors.New("server read failed") + dXErrNotImplemented = errors.New("command not implemented") + dXErrClosed = errors.New("server closed") + dXErrWrite = errors.New("server write failed") + dXErrRead = errors.New("server read failed") + dXErrResponse = errors.New("server response error") ) type dXIoResult struct { @@ -43,8 +42,30 @@ type dXIo struct { result chan dXIoResult } -// dumm server implementing net.Conn interface, -// Read blocks until Write, pipes Write to Read, than Read blocks again. +type dXCSendEvent struct{} +type dXCWriteLock struct{} +type dXCWriteUnlock struct{} +type dXCWriteError struct{} +type dXCWriteSuccess struct{ errorResponse bool } +type dXCReadLock struct{} +type dXCReadUnlock struct{} +type dXCReadError struct{} +type dXCReadSuccess struct{} + +type dXEvent struct{} + +func (_ dXEvent) Bytes() []byte { return nil } +func (_ dXEvent) String() string { return "dummy X server event" } + +type dXError struct { + seqId uint16 +} + +func (e dXError) SequenceId() uint16 { return e.seqId } +func (_ dXError) BadId() uint32 { return 0 } +func (_ dXError) Error() string { return "dummy X server error reply" } + +// dummy X server implementing net.Conn interface, type dX struct { addr addr in, out chan dXIo @@ -52,6 +73,10 @@ type dX struct { done chan struct{} } +// Results running dummy X server, satisfying net.Conn interface for test purposes. +// It is users responsibility to stop and clean up resources with (*dX).Close, if not needed anymore. +// By default, the read and write method are unlocked and will not result in error and read response will be a non error X reply (like with (*dX).WriteSuccess(false), (*dX).ReadSuccess()). +//TODO make (*dX).SetDeadline, (*dX).SetReadDeadline, (*dX).SetWriteDeadline work proprely. func newDX(name string) *dX { s := &dX{ addr{name}, @@ -72,24 +97,48 @@ func newDX(name string) *dX { in, out := s.in, chan dXIo(nil) buf := &bytes.Buffer{} + errorRead, errorWrite, errorResponse := false, false, false + lockRead := false + + NewErrorFuncs[255] = func(buf []byte) Error { + return dXError{Get16(buf[2:])} + } + NewEventFuncs[128&127] = func(buf []byte) Event { + return dXEvent{} + } go func() { defer close(s.done) for { select { case dxsio := <-in: + if errorWrite { + dxsio.result <- dXIoResult{0, dXErrWrite} + break + } + response := make([]byte, 32) - response[0] = 1 // not error reply + if errorResponse { // response will be error + response[0] = 0 // error + response[1] = 255 // error function + } else { // response will by reply with no additional reply + response[0] = 1 // reply + } Put16(response[2:], seqId) // sequence number + incrementSequenceId() buf.Write(response) - incrementSequenceId() dxsio.result <- dXIoResult{len(dxsio.b), nil} - if out == nil && buf.Len() > 0 { + if !lockRead && out == nil { out = s.out } case dxsio := <-out: + if errorRead { + dxsio.result <- dXIoResult{0, dXErrRead} + break + } + n, err := buf.Read(dxsio.b) dxsio.result <- dXIoResult{n, err} @@ -100,11 +149,47 @@ func newDX(name string) *dX { if ci == nil { return } + switch cs := ci.(type) { + case dXCSendEvent: + response := make([]byte, 32) + response[0] = 128 + buf.Write(response) + + if !lockRead && out == nil { + out = s.out + } + case dXCWriteLock: + in = nil + case dXCWriteUnlock: + in = s.in + case dXCWriteError: + errorWrite = true + case dXCWriteSuccess: + errorWrite = false + errorResponse = cs.errorResponse + case dXCReadLock: + out = nil + lockRead = true + case dXCReadUnlock: + lockRead = false + if buf.Len() > 0 && out == nil { + out = s.out + } + case dXCReadError: + errorRead = true + case dXCReadSuccess: + errorRead = false + default: + } } } }() return s } + +// Shuts down dummy X server. Every blocking or future method calls will do nothing and result in error. +// Result will be dXErrClosed if server was allready closed. +// Server can not be unclosed. func (s *dX) Close() error { select { case s.control <- nil: @@ -112,203 +197,135 @@ func (s *dX) Close() error { return nil case <-s.done: } - return serverErrClosed + return dXErrClosed } + +// Imitates write action to X server. +// If not locked by (*dX).WriteLock, it results in error or success. +// +// If Write errors, the second result parameter will be an error {dXErrWrite|dXErrClosed}, the resulting first parameter will be 0, +// no response will be generated and the internal sequence number will not be incremented. +// +// If Write succeedes, it results in (len(b), nil), the (*dX).Read will be unblocked (if not locked with (*dX).ReadLock), +// an [32]byte response will be written to buffer from which (*dX).Read reads, +// with sequence number and proper X response type (error or reply) and the internal sequence number will be increased. +// +// If server was closed previously, result will be (0, dXErrClosed). func (s *dX) Write(b []byte) (int, error) { resChan := make(chan dXIoResult) - fmt.Printf("(*dX).Write: got write request: %v\n", b) + //fmt.Printf("(*dX).Write: got write request: %v\n", b) select { case s.in <- dXIo{b, resChan}: - fmt.Printf("(*dX).Write: input channel has accepted request\n") + //fmt.Printf("(*dX).Write: input channel has accepted request\n") res := <-resChan - fmt.Printf("(*dX).Write: got result: %v\n", res) + //fmt.Printf("(*dX).Write: got result: %v\n", res) return res.n, res.err case <-s.done: } - fmt.Printf("(*dX).Write: server was closed\n") - return 0, serverErrClosed + //fmt.Printf("(*dX).Write: server was closed\n") + return 0, dXErrClosed } + +// Imitates read action from X server. +// If locked by (*dX).ReadLock, read will block until unlocking with (*dX).ReadUnlock, or server closes. +// +// If not locked, Read will result in error, or block until internal read buffer is not empty, depending on internal state. +// The internal state can be modified via (*dX).ReadError, or (*dX).ReadSuccess +// ReadError makes it return (0, dXErrRead), with no changes to internal buffer or state. +// ReadSuccess makes it block until there are some write responses. After emtying the internal read buffer, all next Read requests will block untill another successful write requests. +// +// The resulting read success response type can be altered with (*dX).WriteSuccess method. +// +// If server was closed previously, result will be (0, io.EOF). func (s *dX) Read(b []byte) (int, error) { resChan := make(chan dXIoResult) - fmt.Printf("(*dX).Read: got read request of length: %v\n", len(b)) + //fmt.Printf("(*dX).Read: got read request of length: %v\n", len(b)) select { case s.out <- dXIo{b, resChan}: - fmt.Printf("(*dX).Read: output channel has accepted request\n") + //fmt.Printf("(*dX).Read: output channel has accepted request\n") res := <-resChan - fmt.Printf("(*dX).Read: got result: %v\n", res) - fmt.Printf("(*dX).Read: result bytes: %v\n", b) + //fmt.Printf("(*dX).Read: got result: %v\n", res) + //fmt.Printf("(*dX).Read: result bytes: %v\n", b) return res.n, res.err case <-s.done: - fmt.Printf("(*dX).Read: server was closed\n") + //fmt.Printf("(*dX).Read: server was closed\n") } - return 0, serverErrClosed + return 0, io.EOF } func (s *dX) LocalAddr() net.Addr { return s.addr } func (s *dX) RemoteAddr() net.Addr { return s.addr } -func (s *dX) SetDeadline(t time.Time) error { return serverErrNotImplemented } -func (s *dX) SetReadDeadline(t time.Time) error { return serverErrNotImplemented } -func (s *dX) SetWriteDeadline(t time.Time) error { return serverErrNotImplemented } - -type serverBlocking struct { - addr addr - control chan interface{} - done chan struct{} -} - -func newServerBlocking(name string) *serverBlocking { - s := &serverBlocking{ - addr{name}, - make(chan interface{}), - make(chan struct{}), - } - runned := make(chan struct{}) - go func() { - close(runned) - defer close(s.done) - for { - select { - case ci := <-s.control: - if ci == nil { - return - } - } - } - }() - <-runned - return s -} +func (s *dX) SetDeadline(t time.Time) error { return dXErrNotImplemented } +func (s *dX) SetReadDeadline(t time.Time) error { return dXErrNotImplemented } +func (s *dX) SetWriteDeadline(t time.Time) error { return dXErrNotImplemented } -func (s *serverBlocking) Write(b []byte) (int, error) { - select { - case <-s.done: - } - return 0, serverErrClosed -} -func (s *serverBlocking) Read(b []byte) (int, error) { - select { - case <-s.done: - } - return 0, serverErrEOF -} -func (s *serverBlocking) Close() error { +func (s *dX) Control(i interface{}) error { select { - case s.control <- nil: - <-s.done + case s.control <- i: return nil case <-s.done: - return serverErrClosed } + return dXErrClosed } -func (s *serverBlocking) LocalAddr() net.Addr { return s.addr } -func (s *serverBlocking) RemoteAddr() net.Addr { return s.addr } -func (s *serverBlocking) SetDeadline(t time.Time) error { return nil } -func (s *serverBlocking) SetReadDeadline(t time.Time) error { return nil } -func (s *serverBlocking) SetWriteDeadline(t time.Time) error { return nil } -type serverWriteErrorReadError struct { - *serverBlocking +// Adds an Event into read buffer. +func (s *dX) SendEvent() error { + return s.Control(dXCSendEvent{}) } -func newServerWriteErrorReadError(name string) *serverWriteErrorReadError { - return &serverWriteErrorReadError{newServerBlocking(name)} -} -func (s *serverWriteErrorReadError) Write(b []byte) (int, error) { - select { - case <-s.done: - return 0, serverErrClosed - default: - } - return 0, serverErrWrite -} -func (s *serverWriteErrorReadError) Read(b []byte) (int, error) { - select { - case <-s.done: - return 0, serverErrClosed - default: - } - return 0, serverErrRead +// Locks writing. All write requests will be blocked until write is unlocked with (*dX).WriteUnlock, or server closes. +func (s *dX) WriteLock() error { + return s.Control(dXCWriteLock{}) } -type serverWriteErrorReadBlocking struct { - *serverBlocking +// Unlocks writing. All blocked write requests until now will be accepted. +func (s *dX) WriteUnlock() error { + return s.Control(dXCWriteUnlock{}) } -func newServerWriteErrorReadBlocking(name string) *serverWriteErrorReadBlocking { - return &serverWriteErrorReadBlocking{newServerBlocking(name)} -} -func (s *serverWriteErrorReadBlocking) Write(b []byte) (int, error) { - select { - case <-s.done: - return 0, serverErrClosed - default: +// Unlocks writing and makes (*dX).Write to result (0, dXErrWrite). +func (s *dX) WriteError() error { + if err := s.WriteUnlock(); err != nil { + return err } - return 0, serverErrWrite + return s.Control(dXCWriteError{}) } -type serverWriteSuccessReadBlocking struct { - *serverBlocking -} - -func newServerWriteSuccessReadBlocking(name string) *serverWriteSuccessReadBlocking { - return &serverWriteSuccessReadBlocking{newServerBlocking(name)} -} -func (s *serverWriteSuccessReadBlocking) Write(b []byte) (int, error) { - select { - case <-s.done: - return 0, serverErrClosed - default: +// Unlocks writing and makes (*dX).Write to result (len(b), nil), with a proper X reply. +// If errorResult is true, the response will be an X error response, +// else an normal reply. See (*dX).Write for details. +func (s *dX) WriteSuccess(errorResult bool) error { + if err := s.WriteUnlock(); err != nil { + return err } - return len(b), nil + return s.Control(dXCWriteSuccess{errorResult}) } -type serverWriteSuccessReadErrorAfterWrite struct { - *serverBlocking - out chan struct{} - wg *sync.WaitGroup +// Locks reading. All read requests will be blocked until read is unlocked with (*dX).ReadUnlock, or server closes. +// (*dX).Read wil block even after successful write. +func (s *dX) ReadLock() error { + return s.Control(dXCReadLock{}) } -func newServerWriteSuccessReadErrorAfterWrite(name string) *serverWriteSuccessReadErrorAfterWrite { - return &serverWriteSuccessReadErrorAfterWrite{ - newServerBlocking(name), - make(chan struct{}), - &sync.WaitGroup{}, - } +// Unlocks reading. If there are any unresponded requests in reading buffer, read will be unblocked. +func (s *dX) ReadUnlock() error { + return s.Control(dXCReadUnlock{}) } -func (s *serverWriteSuccessReadErrorAfterWrite) Write(b []byte) (int, error) { - select { - case <-s.done: - return 0, serverErrClosed - default: - } - s.wg.Add(1) - go func() { - select { - case s.out <- struct{}{}: - case <-s.done: - } - s.wg.Done() - }() - return len(b), nil -} -func (s *serverWriteSuccessReadErrorAfterWrite) Read(b []byte) (int, error) { - select { - case <-s.done: - return 0, serverErrClosed - default: - } - select { - case <-s.out: - return 0, serverErrRead - case <-s.done: + +// Unlocks read and makes every blocked and following (*dX).Read requests fail. See (*dX).Read for details. +func (s *dX) ReadError() error { + if err := s.ReadUnlock(); err != nil { + return err } - return 0, serverErrClosed + return s.Control(dXCReadError{}) } -func (s *serverWriteSuccessReadErrorAfterWrite) Close() error { - if err := s.serverBlocking.Close(); err != nil { + +// Unlocks read and makes every blocked and following (*dX).Read requests be handled, if there are any in read buffer. +// See (*dX).Read for details. +func (s *dX) ReadSuccess() error { + if err := s.ReadUnlock(); err != nil { return err } - s.wg.Wait() - return nil + return s.Control(dXCReadSuccess{}) } type goroutine struct { @@ -318,12 +335,16 @@ type goroutine struct { } type leaks struct { + name string goroutines map[int]goroutine + report []*leaks } -func leaksMonitor() leaks { - return leaks{ +func leaksMonitor(name string, monitors ...*leaks) *leaks { + return &leaks{ + name, leaks{}.collectGoroutines(), + monitors, } } @@ -375,126 +396,446 @@ func (l leaks) collectGoroutines() map[int]goroutine { return res } -func (l leaks) checkTesting(t *testing.T) { - { - goroutines := l.collectGoroutines() - if len(l.goroutines) >= len(goroutines) { - return +func (l leaks) leakingGoroutines() []goroutine { + goroutines := l.collectGoroutines() + res := []goroutine{} + for id, gr := range goroutines { + if _, ok := l.goroutines[id]; ok { + continue } + res = append(res, gr) + } + return res +} +func (l leaks) checkTesting(t *testing.T) { + if len(l.leakingGoroutines()) == 0 { + return } leakTimeout := time.Second time.Sleep(leakTimeout) //t.Logf("possible goroutine leakage, waiting %v", leakTimeout) - goroutines := l.collectGoroutines() - if len(l.goroutines) >= len(goroutines) { - return + grs := l.leakingGoroutines() + for _, gr := range grs { + t.Errorf("%s: %s is leaking", l.name, gr.name) + //t.Errorf("%s: %s is leaking\n%v", l.name, gr.name, string(gr.stack)) } - t.Errorf("%d goroutine leaks: start(%d) != end(%d)", len(goroutines)-len(l.goroutines), len(l.goroutines), len(goroutines)) - for id, gr := range goroutines { - if _, ok := l.goroutines[id]; ok { - continue - } - t.Log(gr.name, "\n", string(gr.stack)) + for _, rl := range l.report { + rl.ignoreLeak(grs...) + } +} +func (l *leaks) ignoreLeak(grs ...goroutine) { + for _, gr := range grs { + l.goroutines[gr.id] = gr } } -func TestDummyServersRunClose(t *testing.T) { - - testCases := []struct { - name string - serverConstuctor func(string) net.Conn - }{ - {"write blocking,read blocking server", func(n string) net.Conn { return newServerBlocking(n) }}, - {"write error,read error server", func(n string) net.Conn { return newServerWriteErrorReadError(n) }}, - {"write error,read blocking server", func(n string) net.Conn { return newServerWriteErrorReadBlocking(n) }}, - {"write success,read blocking server", func(n string) net.Conn { return newServerWriteSuccessReadBlocking(n) }}, - {"write success,read error afer write server", func(n string) net.Conn { return newServerWriteSuccessReadErrorAfterWrite(n) }}, - {"write success,read success afer write server", func(n string) net.Conn { return newDX(n) }}, +func testDXCombinations(writeStates, readStates []string) []func() (*dX, error) { + writeSetters := map[string]func(*dX) error{ + "lock": (*dX).WriteLock, + "error": (*dX).WriteError, + "successReply": func(s *dX) error { return s.WriteSuccess(false) }, + "successError": func(s *dX) error { return s.WriteSuccess(true) }, + } + readSetters := map[string]func(*dX) error{ + "lock": (*dX).ReadLock, + "error": (*dX).ReadError, + "success": (*dX).ReadSuccess, } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - defer leaksMonitor().checkTesting(t) - serverConn := tc.serverConstuctor(tc.name) + res := []func() (*dX, error){} + for _, writeState := range writeStates { + writeState, writeSetter := writeState, writeSetters[writeState] + if writeSetter == nil { + panic("unknown write state: " + writeState) + continue + } + for _, readState := range readStates { + readState, readSetter := readState, readSetters[readState] + if readSetter == nil { + panic("unknown read state: " + readState) + continue + } + res = append(res, func() (*dX, error) { + s := newDX("write=" + writeState + ",read=" + readState) + + if err := readSetter(s); err != nil { + s.Close() + return nil, errors.New("set read " + readState + " error: " + err.Error()) + } + + if err := writeSetter(s); err != nil { + s.Close() + return nil, errors.New("set write " + writeState + " error: " + err.Error()) + } + + return s, nil + }) + } + } + return res +} - { - closeErr := make(chan error) - go func() { - closeErr <- serverConn.Close() - close(closeErr) - }() - closeTimeout := time.Second +func TestDummyXServer(t *testing.T) { + timeout := time.Millisecond + wantResponse := func(action func(*dX) error, want, block error) func(*dX) error { + return func(s *dX) error { + actionResult := make(chan error) + timedOut := make(chan struct{}) + go func() { + err := action(s) select { - case err := <-closeErr: - want := error(nil) - if err != want { - t.Errorf("(net.Conn).Close()=%v, want %v", err, want) + case <-timedOut: + if err != block { + t.Errorf("after unblocking, action result=%v, want %v", err, block) } - case <-time.After(closeTimeout): - t.Errorf("*Conn.Close() not responded for %v", closeTimeout) + case actionResult <- err: + } + }() + select { + case err := <-actionResult: + if err != want { + return errors.New(fmt.Sprintf("action result=%v, want %v", err, want)) } + case <-time.After(timeout): + close(timedOut) + return errors.New(fmt.Sprintf("action did not respond for %v, result want %v", timeout, want)) } - { - closeErr := make(chan error) - go func() { - closeErr <- serverConn.Close() - close(closeErr) - }() - closeTimeout := time.Second + return nil + } + } + wantBlock := func(action func(*dX) error, unblock error) func(*dX) error { + return func(s *dX) error { + actionResult := make(chan error) + timedOut := make(chan struct{}) + go func() { + err := action(s) select { - case err := <-closeErr: - want := serverErrClosed - if err != want { - t.Errorf("(net.Conn).Close()=%v, want %v", err, want) + case <-timedOut: + if err != unblock { + t.Errorf("after unblocking, action result=%v, want %v", err, unblock) } - case <-time.After(closeTimeout): - t.Errorf("*Conn.Close() not responded for %v", closeTimeout) + case actionResult <- err: } + }() + select { + case err := <-actionResult: + return errors.New(fmt.Sprintf("action result=%v, want to be blocked", err)) + case <-time.After(timeout): + close(timedOut) } - }) + return nil + } + } + write := func() func(*dX) error { + return func(s *dX) error { + _, err := s.Write([]byte{1}) + return err + } + } + read := func() func(*dX) error { + return func(s *dX) error { + b := make([]byte, 32) + _, err := s.Read(b) + return err + } + } + readSuccess := func(seqId uint16, errorResponse bool) func(*dX) error { + return func(s *dX) error { + b := make([]byte, 32) + _, err := s.Read(b) + if err != nil { + return err + } + if seqId != Get16(b[2:]) { + return errors.New(fmt.Sprintf("got read sequence number %d, want %d", Get16(b[2:]), seqId)) + } + b0, desc := 0, "error" + if !errorResponse { + b0, desc = 1, "valid" + } + if int(b[0]) != b0 { + return errors.New(fmt.Sprintf("response is not an %s reply: %v", desc, b)) + } + return nil + } } -} - -func TestConnOpenClose(t *testing.T) { testCases := []struct { - name string - serverConstuctor func(string) net.Conn + description string + servers []func() (*dX, error) + actions []func(*dX) error // actions per server }{ - //{"blocking server", func(n string) net.Conn { return newServerBlocking(n) }}, // i'm not ready to handle this yet - //{"write error,read error server", func(n string) net.Conn { return newServerWriteErrorReadError(n) }}, - //{"write error,read blocking server", func(n string) net.Conn { return newServerWriteErrorReadBlocking(n) }}, - //{"write success,read blocking server", func(n string) net.Conn { return newServerWriteSuccessReadBlocking(n) }}, - //{"write success,read error afer write server", func(n string) net.Conn { return newServerWriteSuccessReadErrorAfterWrite(n) }}, - {"write success,read success afer write server", func(n string) net.Conn { return newDX(n) }}, + {"empty", + []func() (*dX, error){ + func() (*dX, error) { return newDX("server"), nil }, + }, + []func(*dX) error{ + func(s *dX) error { return nil }, + }, + }, + {"close,close", + testDXCombinations( + []string{"lock", "error", "successError", "successReply"}, + []string{"lock", "error", "success"}, + ), + []func(*dX) error{ + wantResponse((*dX).Close, nil, dXErrClosed), + wantResponse((*dX).Close, dXErrClosed, dXErrClosed), + }, + }, + {"write,close,write", + testDXCombinations( + []string{"lock"}, + []string{"lock", "error", "success"}, + ), + []func(*dX) error{ + wantBlock(write(), dXErrClosed), + wantResponse((*dX).Close, nil, dXErrClosed), + wantResponse(write(), dXErrClosed, dXErrClosed), + }, + }, + {"write,close,write", + testDXCombinations( + []string{"error"}, + []string{"lock", "error", "success"}, + ), + []func(*dX) error{ + wantResponse(write(), dXErrWrite, dXErrClosed), + wantResponse((*dX).Close, nil, dXErrClosed), + wantResponse(write(), dXErrClosed, dXErrClosed), + }, + }, + {"write,close,write", + testDXCombinations( + []string{"successError", "successReply"}, + []string{"lock", "error", "success"}, + ), + []func(*dX) error{ + wantResponse(write(), nil, dXErrClosed), + wantResponse((*dX).Close, nil, dXErrClosed), + wantResponse(write(), dXErrClosed, dXErrClosed), + }, + }, + {"read,close,read", + testDXCombinations( + []string{"lock", "error", "successError", "successReply"}, + []string{"lock", "error", "success"}, + ), + []func(*dX) error{ + wantBlock(read(), io.EOF), + wantResponse((*dX).Close, nil, dXErrClosed), + wantResponse(read(), io.EOF, io.EOF), + }, + }, + {"write,read", + testDXCombinations( + []string{"lock"}, + []string{"lock", "error", "success"}, + ), + []func(*dX) error{ + wantBlock(write(), dXErrClosed), + wantBlock(read(), io.EOF), + }, + }, + {"write,read", + testDXCombinations( + []string{"error"}, + []string{"lock", "error", "success"}, + ), + []func(*dX) error{ + wantResponse(write(), dXErrWrite, dXErrClosed), + wantBlock(read(), io.EOF), + }, + }, + {"write,read", + testDXCombinations( + []string{"successError"}, + []string{"lock"}, + ), + []func(*dX) error{ + wantResponse(write(), nil, dXErrClosed), + wantBlock(read(), io.EOF), + }, + }, + {"write,read", + testDXCombinations( + []string{"successError"}, + []string{"error"}, + ), + []func(*dX) error{ + wantResponse(write(), nil, dXErrClosed), + wantResponse(read(), dXErrRead, io.EOF), + }, + }, + {"write,read", + testDXCombinations( + []string{"successError"}, + []string{"success"}, + ), + []func(*dX) error{ + wantResponse(write(), nil, dXErrClosed), + wantResponse(readSuccess(1, true), nil, io.EOF), + }, + }, + {"write,read", + testDXCombinations( + []string{"successReply"}, + []string{"success"}, + ), + []func(*dX) error{ + wantResponse(write(), nil, dXErrClosed), + wantResponse(readSuccess(1, false), nil, io.EOF), + }, + }, } for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - serverConn := tc.serverConstuctor(tc.name) - defer serverConn.Close() + t.Run(tc.description, func(t *testing.T) { + defer leaksMonitor(tc.description).checkTesting(t) - defer leaksMonitor().checkTesting(t) + for _, server := range tc.servers { + s, err := server() + if err != nil { + t.Error(err) + continue + } + if s == nil { + t.Error("nil server in testcase") + continue + } - c, err := postNewConn(&Conn{conn: serverConn}) - if err != nil { - t.Fatalf("connect error: %v", err) + t.Run(s.LocalAddr().String(), func(t *testing.T) { + defer leaksMonitor(s.LocalAddr().String()).checkTesting(t) + for _, action := range tc.actions { + if err := action(s); err != nil { + t.Error(err) + break + } + } + s.Close() + }) } - //t.Logf("connection to server created: %v", c) + }) + } +} - closeErr := make(chan struct{}) +func TestConnOnNonBlockingDummyXServer(t *testing.T) { + timeout := time.Millisecond + wantResponse := func(action func(*Conn) error, want, block error) func(*Conn) error { + return func(c *Conn) error { + actionResult := make(chan error) + timedOut := make(chan struct{}) go func() { - //t.Logf("closing connection to server") - c.Close() - close(closeErr) + err := action(c) + select { + case <-timedOut: + if err != block { + t.Errorf("after unblocking, action result=%v, want %v", err, block) + } + case actionResult <- err: + } }() - closeTimeout := time.Second select { - case <-closeErr: - //t.Logf("connection to server closed") - case <-time.After(closeTimeout): - t.Errorf("*Conn.Close() not responded for %v", closeTimeout) + case err := <-actionResult: + if err != want { + return errors.New(fmt.Sprintf("action result=%v, want %v", err, want)) + } + case <-time.After(timeout): + close(timedOut) + return errors.New(fmt.Sprintf("action did not respond for %v, result want %v", timeout, want)) } - }) + return nil + } } + crequest := func(checked, reply bool) func(*Conn) error { + return func(c *Conn) error { + cookie := c.NewCookie(checked, reply) + c.NewRequest([]byte("crequest"), cookie) + _, err := cookie.Reply() + return err + } + } + + testCases := []struct { + description string + servers []func() (*dX, error) + actions []func(*Conn) error + }{ + {"cclose", + testDXCombinations( + []string{"successError", "successReply"}, + []string{"success"}, + ), + []func(*Conn) error{}, + }, + {"crequest", + testDXCombinations([]string{"successError"}, []string{"success"}), + []func(*Conn) error{ + wantResponse(crequest(true, true), dXError{1}, io.ErrShortWrite), + }, + }, + {"crequest", + testDXCombinations([]string{"successReply"}, []string{"success"}), + []func(*Conn) error{ + wantResponse(crequest(true, true), nil, io.ErrShortWrite), + }, + }, + // sometimes panic on unfixed branch - close of closed channel + {"cclose", + testDXCombinations([]string{"error"}, []string{"error", "success"}), + []func(*Conn) error{}, + }, + } + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + tclm := leaksMonitor("test case " + tc.description) + defer tclm.checkTesting(t) + + for _, server := range tc.servers { + s, err := server() + if err != nil { + t.Error(err) + continue + } + if s == nil { + t.Error("nil *dX in testcase") + continue + } + + t.Run(s.LocalAddr().String(), func(t *testing.T) { + sclm := leaksMonitor(s.LocalAddr().String()+" after sever close", tclm) + defer sclm.checkTesting(t) + c, err := postNewConn(&Conn{conn: s}) + if err != nil { + t.Errorf("connect to dummy server error: %v", err) + return + } + + rlm := leaksMonitor(c.conn.LocalAddr().String() + " after actions end") + for _, action := range tc.actions { + if err := action(c); err != nil { + t.Error(err) + break + } + } + c.Close() + if err := wantResponse( + func(c *Conn) error { + if ev, err := c.WaitForEvent(); ev != nil || err != nil { + return fmt.Errorf("after (*Conn).Close, (*Conn).WaitForEvent() = (%v,%v), want (nil,nil)", ev, err) + } + return nil + }, + nil, + io.ErrShortWrite, + )(c); err != nil { + t.Error(err) + } + rlm.checkTesting(t) + }) + + s.Close() + } + }) + } } |
