diff options
| -rw-r--r-- | dummyNetConn.go | 261 | ||||
| -rw-r--r-- | dummyNetConn_test.go | 273 | ||||
| -rw-r--r-- | xgb_test.go | 816 |
3 files changed, 704 insertions, 646 deletions
diff --git a/dummyNetConn.go b/dummyNetConn.go new file mode 100644 index 0000000..91bae4f --- /dev/null +++ b/dummyNetConn.go @@ -0,0 +1,261 @@ +package xgb + +import ( + "bytes" + "errors" + "io" + "net" + "time" +) + +type dAddr struct { + s string +} + +func (_ dAddr) Network() string { return "dummy" } +func (a dAddr) String() string { return a.s } + +var ( + dNCErrNotImplemented = errors.New("command not implemented") + dNCErrClosed = errors.New("server closed") + dNCErrWrite = errors.New("server write failed") + dNCErrRead = errors.New("server read failed") + dNCErrResponse = errors.New("server response error") +) + +type dNCIoResult struct { + n int + err error +} +type dNCIo struct { + b []byte + result chan dNCIoResult +} + +type dNCCWriteLock struct{} +type dNCCWriteUnlock struct{} +type dNCCWriteError struct{} +type dNCCWriteSuccess struct{} +type dNCCReadLock struct{} +type dNCCReadUnlock struct{} +type dNCCReadError struct{} +type dNCCReadSuccess struct{} + +// dummy net.Conn interface. Needs to be constructed via newDummyNetConn([...]) function. +type dNC struct { + reply func([]byte) []byte + addr dAddr + in, out chan dNCIo + control chan interface{} + done chan struct{} +} + +// Results running dummy server, satisfying net.Conn interface for test purposes. +// 'name' parameter will be returned via (*dNC).Local/RemoteAddr().String() +// 'reply' parameter function will be runned only on successful (*dNC).Write(b) with 'b' as parameter to 'reply'. The result will be stored in internal buffer and can be retrieved later via (*dNC).Read([...]) method. +// It is users responsibility to stop and clean up resources with (*dNC).Close, if not needed anymore. +// By default, the (*dNC).Write([...]) and (*dNC).Read([...]) methods are unlocked and will not result in error. +//TODO make (*dNC).SetDeadline, (*dNC).SetReadDeadline, (*dNC).SetWriteDeadline work proprely. +func newDummyNetConn(name string, reply func([]byte) []byte) *dNC { + + s := &dNC{ + reply, + dAddr{name}, + make(chan dNCIo), make(chan dNCIo), + make(chan interface{}), + make(chan struct{}), + } + + in, out := s.in, chan dNCIo(nil) + buf := &bytes.Buffer{} + errorRead, errorWrite := false, false + lockRead := false + + go func() { + defer close(s.done) + for { + select { + case dxsio := <-in: + if errorWrite { + dxsio.result <- dNCIoResult{0, dNCErrWrite} + break + } + + response := s.reply(dxsio.b) + + buf.Write(response) + dxsio.result <- dNCIoResult{len(dxsio.b), nil} + + if !lockRead && buf.Len() > 0 && out == nil { + out = s.out + } + case dxsio := <-out: + if errorRead { + dxsio.result <- dNCIoResult{0, dNCErrRead} + break + } + + n, err := buf.Read(dxsio.b) + dxsio.result <- dNCIoResult{n, err} + + if buf.Len() == 0 { + out = nil + } + case ci := <-s.control: + if ci == nil { + return + } + switch ci.(type) { + case dNCCWriteLock: + in = nil + case dNCCWriteUnlock: + in = s.in + case dNCCWriteError: + errorWrite = true + case dNCCWriteSuccess: + errorWrite = false + case dNCCReadLock: + out = nil + lockRead = true + case dNCCReadUnlock: + lockRead = false + if buf.Len() > 0 && out == nil { + out = s.out + } + case dNCCReadError: + errorRead = true + case dNCCReadSuccess: + errorRead = false + default: + } + } + } + }() + return s +} + +// Shuts down dummy net.Conn server. Every blocking or future method calls will do nothing and result in error. +// Result will be dNCErrClosed if server was allready closed. +// Server can not be unclosed. +func (s *dNC) Close() error { + select { + case s.control <- nil: + <-s.done + return nil + case <-s.done: + } + return dNCErrClosed +} + +// Performs a write action to server. +// If not locked by (*dNC).WriteLock, it results in error or success. If locked, this method will block until unlocked, or closed. +// +// This method can be set to result in error or success, via (*dNC).WriteError() or (*dNC).WriteSuccess() methods. +// +// If setted to result in error, the 'reply' function will NOT be called and internal buffer will NOT increasethe. +// Result will be (0, dNCErrWrite). +// +// If setted to result in success, the 'reply' function will be called and its result will be writen to internal buffer. +// If there is something in the internal buffer, the (*dNC).Read([...]) will be unblocked (if not previously locked with (*dNC).ReadLock). +// Result will be (len(b), nil) +// +// If server was closed previously, result will be (0, dNCErrClosed). +func (s *dNC) Write(b []byte) (int, error) { + resChan := make(chan dNCIoResult) + select { + case s.in <- dNCIo{b, resChan}: + res := <-resChan + return res.n, res.err + case <-s.done: + } + return 0, dNCErrClosed +} + +// Performs a read action from server. +// If locked by (*dNC).ReadLock(), this method will block until unlocked with (*dNC).ReadUnlock(), or server closes. +// +// If not locked, this method can be setted to result imidiatly in error, will block if internal buffer is empty or will perform an read operation from internal buffer. +// +// If setted to result in error via (*dNC).ReadError(), the result will be (0, dNCErrWrite). +// +// If not locked and not setted to result in error via (*dNC).ReadSuccess(), this method will block until internall buffer is not empty, than it returns the result of the buffer read operation via (*bytes.Buffer).Read([...]). +// If the internal buffer is empty after this method, all follwing (*dNC).Read([...]), requests will block until internall buffer is filled after successful write requests. +// +// If server was closed previously, result will be (0, io.EOF). +func (s *dNC) Read(b []byte) (int, error) { + resChan := make(chan dNCIoResult) + select { + case s.out <- dNCIo{b, resChan}: + res := <-resChan + return res.n, res.err + case <-s.done: + } + return 0, io.EOF +} +func (s *dNC) LocalAddr() net.Addr { return s.addr } +func (s *dNC) RemoteAddr() net.Addr { return s.addr } +func (s *dNC) SetDeadline(t time.Time) error { return dNCErrNotImplemented } +func (s *dNC) SetReadDeadline(t time.Time) error { return dNCErrNotImplemented } +func (s *dNC) SetWriteDeadline(t time.Time) error { return dNCErrNotImplemented } + +func (s *dNC) Control(i interface{}) error { + select { + case s.control <- i: + return nil + case <-s.done: + } + return dNCErrClosed +} + +// Locks writing. All write requests will be blocked until write is unlocked with (*dNC).WriteUnlock, or server closes. +func (s *dNC) WriteLock() error { + return s.Control(dNCCWriteLock{}) +} + +// Unlocks writing. All blocked write requests until now will be accepted. +func (s *dNC) WriteUnlock() error { + return s.Control(dNCCWriteUnlock{}) +} + +// Unlocks writing and makes (*dNC).Write to result (0, dNCErrWrite). +func (s *dNC) WriteError() error { + if err := s.WriteUnlock(); err != nil { + return err + } + return s.Control(dNCCWriteError{}) +} + +// Unlocks writing and makes (*dNC).Write([...]) not result in error. See (*dNC).Write for details. +func (s *dNC) WriteSuccess() error { + if err := s.WriteUnlock(); err != nil { + return err + } + return s.Control(dNCCWriteSuccess{}) +} + +// Locks reading. All read requests will be blocked until read is unlocked with (*dNC).ReadUnlock, or server closes. +// (*dNC).Read([...]) wil block even after successful write. +func (s *dNC) ReadLock() error { + return s.Control(dNCCReadLock{}) +} + +// Unlocks reading. If the internall buffer is not empty, next read will not block. +func (s *dNC) ReadUnlock() error { + return s.Control(dNCCReadUnlock{}) +} + +// Unlocks read and makes every blocked and following (*dNC).Read([...]) imidiatly result in error. See (*dNC).Read for details. +func (s *dNC) ReadError() error { + if err := s.ReadUnlock(); err != nil { + return err + } + return s.Control(dNCCReadError{}) +} + +// Unlocks read and makes every blocked and following (*dNC).Read([...]) requests be handled, if according to internal buffer. See (*dNC).Read for details. +func (s *dNC) ReadSuccess() error { + if err := s.ReadUnlock(); err != nil { + return err + } + return s.Control(dNCCReadSuccess{}) +} diff --git a/dummyNetConn_test.go b/dummyNetConn_test.go new file mode 100644 index 0000000..94691be --- /dev/null +++ b/dummyNetConn_test.go @@ -0,0 +1,273 @@ +package xgb + +import ( + "errors" + "fmt" + "io" + "reflect" + "testing" + "time" +) + +func TestDummyNetConn(t *testing.T) { + ioStatesPairGenerator := func(writeStates, readStates []string) []func() (*dNC, error) { + writeSetters := map[string]func(*dNC) error{ + "lock": (*dNC).WriteLock, + "error": (*dNC).WriteError, + "success": (*dNC).WriteSuccess, + } + readSetters := map[string]func(*dNC) error{ + "lock": (*dNC).ReadLock, + "error": (*dNC).ReadError, + "success": (*dNC).ReadSuccess, + } + + res := []func() (*dNC, error){} + for _, writeState := range writeStates { + writeState, writeSetter := writeState, writeSetters[writeState] + if writeSetter == nil { + panic("unknown write state: " + writeState) + continue + } + for _, readState := range readStates { + readState, readSetter := readState, readSetters[readState] + if readSetter == nil { + panic("unknown read state: " + readState) + continue + } + res = append(res, func() (*dNC, error) { + + // loopback server + s := newDummyNetConn("w:"+writeState+";r:"+readState, func(b []byte) []byte { return b }) + + if err := readSetter(s); err != nil { + s.Close() + return nil, errors.New("set read " + readState + " error: " + err.Error()) + } + + if err := writeSetter(s); err != nil { + s.Close() + return nil, errors.New("set write " + writeState + " error: " + err.Error()) + } + + return s, nil + }) + } + } + return res + } + + timeout := time.Millisecond + wantResponse := func(action func(*dNC) error, want, block error) func(*dNC) error { + return func(s *dNC) error { + actionResult := make(chan error) + timedOut := make(chan struct{}) + go func() { + err := action(s) + select { + case <-timedOut: + if err != block { + t.Errorf("after unblocking, action result=%v, want %v", err, block) + } + case actionResult <- err: + } + }() + select { + case err := <-actionResult: + if err != want { + return errors.New(fmt.Sprintf("action result=%v, want %v", err, want)) + } + case <-time.After(timeout): + close(timedOut) + return errors.New(fmt.Sprintf("action did not respond for %v, result want %v", timeout, want)) + } + return nil + } + } + wantBlock := func(action func(*dNC) error, unblock error) func(*dNC) error { + return func(s *dNC) error { + actionResult := make(chan error) + timedOut := make(chan struct{}) + go func() { + err := action(s) + select { + case <-timedOut: + if err != unblock { + t.Errorf("after unblocking, action result=%v, want %v", err, unblock) + } + case actionResult <- err: + } + }() + select { + case err := <-actionResult: + return errors.New(fmt.Sprintf("action result=%v, want to be blocked", err)) + case <-time.After(timeout): + close(timedOut) + } + return nil + } + } + write := func(b string) func(*dNC) error { + return func(s *dNC) error { + n, err := s.Write([]byte(b)) + if err == nil && n != len(b) { + return errors.New("Write returned nil error, but not everything was written") + } + return err + } + } + read := func(b string) func(*dNC) error { + return func(s *dNC) error { + r := make([]byte, len(b)) + n, err := s.Read(r) + if err == nil { + if n != len(b) { + return errors.New("Read returned nil error, but not everything was read") + } + if !reflect.DeepEqual(r, []byte(b)) { + return errors.New("Read=\"" + string(r) + "\", want \"" + string(b) + "\"") + } + } + return err + } + } + + testCases := []struct { + description string + servers []func() (*dNC, error) + actions []func(*dNC) error // actions per server + }{ + {"close,close", + ioStatesPairGenerator( + []string{"lock", "error", "success"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantResponse((*dNC).Close, nil, dNCErrClosed), + wantResponse((*dNC).Close, dNCErrClosed, dNCErrClosed), + }, + }, + {"write,close,write", + ioStatesPairGenerator( + []string{"lock"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantBlock(write(""), dNCErrClosed), + wantResponse((*dNC).Close, nil, dNCErrClosed), + wantResponse(write(""), dNCErrClosed, dNCErrClosed), + }, + }, + {"write,close,write", + ioStatesPairGenerator( + []string{"error"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantResponse(write(""), dNCErrWrite, dNCErrClosed), + wantResponse((*dNC).Close, nil, dNCErrClosed), + wantResponse(write(""), dNCErrClosed, dNCErrClosed), + }, + }, + {"write,close,write", + ioStatesPairGenerator( + []string{"success"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantResponse(write(""), nil, dNCErrClosed), + wantResponse((*dNC).Close, nil, dNCErrClosed), + wantResponse(write(""), dNCErrClosed, dNCErrClosed), + }, + }, + {"read,close,read", + ioStatesPairGenerator( + []string{"lock", "error", "success"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantBlock(read(""), io.EOF), + wantResponse((*dNC).Close, nil, dNCErrClosed), + wantResponse(read(""), io.EOF, io.EOF), + }, + }, + {"write,read", + ioStatesPairGenerator( + []string{"lock"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantBlock(write("1"), dNCErrClosed), + wantBlock(read("1"), io.EOF), + }, + }, + {"write,read", + ioStatesPairGenerator( + []string{"error"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantResponse(write("1"), dNCErrWrite, dNCErrClosed), + wantBlock(read("1"), io.EOF), + }, + }, + {"write,read", + ioStatesPairGenerator( + []string{"success"}, + []string{"lock"}, + ), + []func(*dNC) error{ + wantResponse(write("1"), nil, dNCErrClosed), + wantBlock(read("1"), io.EOF), + }, + }, + {"write,read", + ioStatesPairGenerator( + []string{"success"}, + []string{"error"}, + ), + []func(*dNC) error{ + wantResponse(write("1"), nil, dNCErrClosed), + wantResponse(read("1"), dNCErrRead, io.EOF), + }, + }, + {"write,read", + ioStatesPairGenerator( + []string{"success"}, + []string{"success"}, + ), + []func(*dNC) error{ + wantResponse(write("1"), nil, dNCErrClosed), + wantResponse(read("1"), nil, io.EOF), + }, + }, + } + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + defer leaksMonitor(tc.description).checkTesting(t) + + for _, server := range tc.servers { + s, err := server() + if err != nil { + t.Error(err) + continue + } + if s == nil { + t.Error("nil server in testcase") + continue + } + + t.Run(s.LocalAddr().String(), func(t *testing.T) { + defer leaksMonitor(s.LocalAddr().String()).checkTesting(t) + for _, action := range tc.actions { + if err := action(s); err != nil { + t.Error(err) + break + } + } + s.Close() + }) + } + }) + } +} diff --git a/xgb_test.go b/xgb_test.go index 5540e5c..d6ddb13 100644 --- a/xgb_test.go +++ b/xgb_test.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "net" "regexp" "runtime" "strconv" @@ -14,320 +13,6 @@ import ( "time" ) -type addr struct { - s string -} - -func (_ addr) Network() string { return "dummy" } -func (a addr) String() string { return a.s } - -type errTimeout struct{ error } - -func (_ errTimeout) Timeout() bool { return true } - -var ( - dXErrNotImplemented = errors.New("command not implemented") - dXErrClosed = errors.New("server closed") - dXErrWrite = errors.New("server write failed") - dXErrRead = errors.New("server read failed") - dXErrResponse = errors.New("server response error") -) - -type dXIoResult struct { - n int - err error -} -type dXIo struct { - b []byte - result chan dXIoResult -} - -type dXCSendEvent struct{} -type dXCWriteLock struct{} -type dXCWriteUnlock struct{} -type dXCWriteError struct{} -type dXCWriteSuccess struct{ errorResponse bool } -type dXCReadLock struct{} -type dXCReadUnlock struct{} -type dXCReadError struct{} -type dXCReadSuccess struct{} - -type dXEvent struct{} - -func (_ dXEvent) Bytes() []byte { return nil } -func (_ dXEvent) String() string { return "dummy X server event" } - -type dXError struct { - seqId uint16 -} - -func (e dXError) SequenceId() uint16 { return e.seqId } -func (_ dXError) BadId() uint32 { return 0 } -func (_ dXError) Error() string { return "dummy X server error reply" } - -// dummy X server implementing net.Conn interface, -type dX struct { - addr addr - in, out chan dXIo - control chan interface{} - done chan struct{} -} - -// Results running dummy X server, satisfying net.Conn interface for test purposes. -// It is users responsibility to stop and clean up resources with (*dX).Close, if not needed anymore. -// By default, the read and write method are unlocked and will not result in error and read response will be a non error X reply (like with (*dX).WriteSuccess(false), (*dX).ReadSuccess()). -//TODO make (*dX).SetDeadline, (*dX).SetReadDeadline, (*dX).SetWriteDeadline work proprely. -func newDX(name string) *dX { - s := &dX{ - addr{name}, - make(chan dXIo), make(chan dXIo), - make(chan interface{}), - make(chan struct{}), - } - - seqId := uint16(1) - incrementSequenceId := func() { - // this has to be the same algorithm as in (*Conn).generateSeqIds - if seqId == uint16((1<<16)-1) { - seqId = 0 - } else { - seqId++ - } - } - - in, out := s.in, chan dXIo(nil) - buf := &bytes.Buffer{} - errorRead, errorWrite, errorResponse := false, false, false - lockRead := false - - NewErrorFuncs[255] = func(buf []byte) Error { - return dXError{Get16(buf[2:])} - } - NewEventFuncs[128&127] = func(buf []byte) Event { - return dXEvent{} - } - - go func() { - defer close(s.done) - for { - select { - case dxsio := <-in: - if errorWrite { - dxsio.result <- dXIoResult{0, dXErrWrite} - break - } - - response := make([]byte, 32) - if errorResponse { // response will be error - response[0] = 0 // error - response[1] = 255 // error function - } else { // response will by reply with no additional reply - response[0] = 1 // reply - } - Put16(response[2:], seqId) // sequence number - incrementSequenceId() - - buf.Write(response) - dxsio.result <- dXIoResult{len(dxsio.b), nil} - - if !lockRead && out == nil { - out = s.out - } - case dxsio := <-out: - if errorRead { - dxsio.result <- dXIoResult{0, dXErrRead} - break - } - - n, err := buf.Read(dxsio.b) - dxsio.result <- dXIoResult{n, err} - - if buf.Len() == 0 { - out = nil - } - case ci := <-s.control: - if ci == nil { - return - } - switch cs := ci.(type) { - case dXCSendEvent: - response := make([]byte, 32) - response[0] = 128 - buf.Write(response) - - if !lockRead && out == nil { - out = s.out - } - case dXCWriteLock: - in = nil - case dXCWriteUnlock: - in = s.in - case dXCWriteError: - errorWrite = true - case dXCWriteSuccess: - errorWrite = false - errorResponse = cs.errorResponse - case dXCReadLock: - out = nil - lockRead = true - case dXCReadUnlock: - lockRead = false - if buf.Len() > 0 && out == nil { - out = s.out - } - case dXCReadError: - errorRead = true - case dXCReadSuccess: - errorRead = false - default: - } - } - } - }() - return s -} - -// Shuts down dummy X server. Every blocking or future method calls will do nothing and result in error. -// Result will be dXErrClosed if server was allready closed. -// Server can not be unclosed. -func (s *dX) Close() error { - select { - case s.control <- nil: - <-s.done - return nil - case <-s.done: - } - return dXErrClosed -} - -// Imitates write action to X server. -// If not locked by (*dX).WriteLock, it results in error or success. -// -// If Write errors, the second result parameter will be an error {dXErrWrite|dXErrClosed}, the resulting first parameter will be 0, -// no response will be generated and the internal sequence number will not be incremented. -// -// If Write succeedes, it results in (len(b), nil), the (*dX).Read will be unblocked (if not locked with (*dX).ReadLock), -// an [32]byte response will be written to buffer from which (*dX).Read reads, -// with sequence number and proper X response type (error or reply) and the internal sequence number will be increased. -// -// If server was closed previously, result will be (0, dXErrClosed). -func (s *dX) Write(b []byte) (int, error) { - resChan := make(chan dXIoResult) - //fmt.Printf("(*dX).Write: got write request: %v\n", b) - select { - case s.in <- dXIo{b, resChan}: - //fmt.Printf("(*dX).Write: input channel has accepted request\n") - res := <-resChan - //fmt.Printf("(*dX).Write: got result: %v\n", res) - return res.n, res.err - case <-s.done: - } - //fmt.Printf("(*dX).Write: server was closed\n") - return 0, dXErrClosed -} - -// Imitates read action from X server. -// If locked by (*dX).ReadLock, read will block until unlocking with (*dX).ReadUnlock, or server closes. -// -// If not locked, Read will result in error, or block until internal read buffer is not empty, depending on internal state. -// The internal state can be modified via (*dX).ReadError, or (*dX).ReadSuccess -// ReadError makes it return (0, dXErrRead), with no changes to internal buffer or state. -// ReadSuccess makes it block until there are some write responses. After emtying the internal read buffer, all next Read requests will block untill another successful write requests. -// -// The resulting read success response type can be altered with (*dX).WriteSuccess method. -// -// If server was closed previously, result will be (0, io.EOF). -func (s *dX) Read(b []byte) (int, error) { - resChan := make(chan dXIoResult) - //fmt.Printf("(*dX).Read: got read request of length: %v\n", len(b)) - select { - case s.out <- dXIo{b, resChan}: - //fmt.Printf("(*dX).Read: output channel has accepted request\n") - res := <-resChan - //fmt.Printf("(*dX).Read: got result: %v\n", res) - //fmt.Printf("(*dX).Read: result bytes: %v\n", b) - return res.n, res.err - case <-s.done: - //fmt.Printf("(*dX).Read: server was closed\n") - } - return 0, io.EOF -} -func (s *dX) LocalAddr() net.Addr { return s.addr } -func (s *dX) RemoteAddr() net.Addr { return s.addr } -func (s *dX) SetDeadline(t time.Time) error { return dXErrNotImplemented } -func (s *dX) SetReadDeadline(t time.Time) error { return dXErrNotImplemented } -func (s *dX) SetWriteDeadline(t time.Time) error { return dXErrNotImplemented } - -func (s *dX) Control(i interface{}) error { - select { - case s.control <- i: - return nil - case <-s.done: - } - return dXErrClosed -} - -// Adds an Event into read buffer. -func (s *dX) SendEvent() error { - return s.Control(dXCSendEvent{}) -} - -// Locks writing. All write requests will be blocked until write is unlocked with (*dX).WriteUnlock, or server closes. -func (s *dX) WriteLock() error { - return s.Control(dXCWriteLock{}) -} - -// Unlocks writing. All blocked write requests until now will be accepted. -func (s *dX) WriteUnlock() error { - return s.Control(dXCWriteUnlock{}) -} - -// Unlocks writing and makes (*dX).Write to result (0, dXErrWrite). -func (s *dX) WriteError() error { - if err := s.WriteUnlock(); err != nil { - return err - } - return s.Control(dXCWriteError{}) -} - -// Unlocks writing and makes (*dX).Write to result (len(b), nil), with a proper X reply. -// If errorResult is true, the response will be an X error response, -// else an normal reply. See (*dX).Write for details. -func (s *dX) WriteSuccess(errorResult bool) error { - if err := s.WriteUnlock(); err != nil { - return err - } - return s.Control(dXCWriteSuccess{errorResult}) -} - -// Locks reading. All read requests will be blocked until read is unlocked with (*dX).ReadUnlock, or server closes. -// (*dX).Read wil block even after successful write. -func (s *dX) ReadLock() error { - return s.Control(dXCReadLock{}) -} - -// Unlocks reading. If there are any unresponded requests in reading buffer, read will be unblocked. -func (s *dX) ReadUnlock() error { - return s.Control(dXCReadUnlock{}) -} - -// Unlocks read and makes every blocked and following (*dX).Read requests fail. See (*dX).Read for details. -func (s *dX) ReadError() error { - if err := s.ReadUnlock(); err != nil { - return err - } - return s.Control(dXCReadError{}) -} - -// Unlocks read and makes every blocked and following (*dX).Read requests be handled, if there are any in read buffer. -// See (*dX).Read for details. -func (s *dX) ReadSuccess() error { - if err := s.ReadUnlock(); err != nil { - return err - } - return s.Control(dXCReadSuccess{}) -} - type goroutine struct { id int name string @@ -429,56 +114,23 @@ func (l *leaks) ignoreLeak(grs ...goroutine) { } } -func testDXCombinations(writeStates, readStates []string) []func() (*dX, error) { - writeSetters := map[string]func(*dX) error{ - "lock": (*dX).WriteLock, - "error": (*dX).WriteError, - "successReply": func(s *dX) error { return s.WriteSuccess(false) }, - "successError": func(s *dX) error { return s.WriteSuccess(true) }, - } - readSetters := map[string]func(*dX) error{ - "lock": (*dX).ReadLock, - "error": (*dX).ReadError, - "success": (*dX).ReadSuccess, - } +type dNCEvent struct{} - res := []func() (*dX, error){} - for _, writeState := range writeStates { - writeState, writeSetter := writeState, writeSetters[writeState] - if writeSetter == nil { - panic("unknown write state: " + writeState) - continue - } - for _, readState := range readStates { - readState, readSetter := readState, readSetters[readState] - if readSetter == nil { - panic("unknown read state: " + readState) - continue - } - res = append(res, func() (*dX, error) { - s := newDX("write=" + writeState + ",read=" + readState) - - if err := readSetter(s); err != nil { - s.Close() - return nil, errors.New("set read " + readState + " error: " + err.Error()) - } - - if err := writeSetter(s); err != nil { - s.Close() - return nil, errors.New("set write " + writeState + " error: " + err.Error()) - } +func (_ dNCEvent) Bytes() []byte { return nil } +func (_ dNCEvent) String() string { return "dummy X server event" } - return s, nil - }) - } - } - return res +type dNCError struct { + seqId uint16 } -func TestDummyXServer(t *testing.T) { +func (e dNCError) SequenceId() uint16 { return e.seqId } +func (_ dNCError) BadId() uint32 { return 0 } +func (_ dNCError) Error() string { return "dummy X server error reply" } + +func TestConnOnNonBlockingDummyXServer(t *testing.T) { timeout := time.Millisecond - wantResponse := func(action func(*dX) error, want, block error) func(*dX) error { - return func(s *dX) error { + wantResponse := func(action func(*Conn) error, want, block error) func(*Conn) error { + return func(s *Conn) error { actionResult := make(chan error) timedOut := make(chan struct{}) go func() { @@ -503,286 +155,138 @@ func TestDummyXServer(t *testing.T) { return nil } } - wantBlock := func(action func(*dX) error, unblock error) func(*dX) error { - return func(s *dX) error { - actionResult := make(chan error) - timedOut := make(chan struct{}) - go func() { - err := action(s) - select { - case <-timedOut: - if err != unblock { - t.Errorf("after unblocking, action result=%v, want %v", err, unblock) - } - case actionResult <- err: - } - }() - select { - case err := <-actionResult: - return errors.New(fmt.Sprintf("action result=%v, want to be blocked", err)) - case <-time.After(timeout): - close(timedOut) - } - return nil - } + NewErrorFuncs[255] = func(buf []byte) Error { + return dNCError{Get16(buf[2:])} } - write := func() func(*dX) error { - return func(s *dX) error { - _, err := s.Write([]byte{1}) - return err - } + NewEventFuncs[128&127] = func(buf []byte) Event { + return dNCEvent{} } - read := func() func(*dX) error { - return func(s *dX) error { - b := make([]byte, 32) - _, err := s.Read(b) - return err + checkedReply := func(wantError bool) func(*Conn) error { + request := "reply" + if wantError { + request = "error" } - } - readSuccess := func(seqId uint16, errorResponse bool) func(*dX) error { - return func(s *dX) error { - b := make([]byte, 32) - _, err := s.Read(b) - if err != nil { - return err + return func(c *Conn) error { + cookie := c.NewCookie(true, true) + c.NewRequest([]byte(request), cookie) + _, err := cookie.Reply() + if wantError && err == nil { + return errors.New(fmt.Sprintf("checked request \"%v\" with reply resulted in nil error, want some error", request)) } - if seqId != Get16(b[2:]) { - return errors.New(fmt.Sprintf("got read sequence number %d, want %d", Get16(b[2:]), seqId)) + if !wantError && err != nil { + return errors.New(fmt.Sprintf("checked request \"%v\" with reply resulted in error %v, want nil error", request, err)) } - b0, desc := 0, "error" - if !errorResponse { - b0, desc = 1, "valid" + return nil + } + } + checkedNoreply := func(wantError bool) func(*Conn) error { + request := "noreply" + if wantError { + request = "error" + } + return func(c *Conn) error { + cookie := c.NewCookie(true, false) + c.NewRequest([]byte(request), cookie) + err := cookie.Check() + if wantError && err == nil { + return errors.New(fmt.Sprintf("checked request \"%v\" with no reply resulted in nil error, want some error", request)) } - if int(b[0]) != b0 { - return errors.New(fmt.Sprintf("response is not an %s reply: %v", desc, b)) + if !wantError && err != nil { + return errors.New(fmt.Sprintf("checked request \"%v\" with no reply resulted in error %v, want nil error", request, err)) } return nil } } - - testCases := []struct { - description string - servers []func() (*dX, error) - actions []func(*dX) error // actions per server - }{ - {"empty", - []func() (*dX, error){ - func() (*dX, error) { return newDX("server"), nil }, - }, - []func(*dX) error{ - func(s *dX) error { return nil }, - }, - }, - {"close,close", - testDXCombinations( - []string{"lock", "error", "successError", "successReply"}, - []string{"lock", "error", "success"}, - ), - []func(*dX) error{ - wantResponse((*dX).Close, nil, dXErrClosed), - wantResponse((*dX).Close, dXErrClosed, dXErrClosed), - }, - }, - {"write,close,write", - testDXCombinations( - []string{"lock"}, - []string{"lock", "error", "success"}, - ), - []func(*dX) error{ - wantBlock(write(), dXErrClosed), - wantResponse((*dX).Close, nil, dXErrClosed), - wantResponse(write(), dXErrClosed, dXErrClosed), - }, - }, - {"write,close,write", - testDXCombinations( - []string{"error"}, - []string{"lock", "error", "success"}, - ), - []func(*dX) error{ - wantResponse(write(), dXErrWrite, dXErrClosed), - wantResponse((*dX).Close, nil, dXErrClosed), - wantResponse(write(), dXErrClosed, dXErrClosed), - }, - }, - {"write,close,write", - testDXCombinations( - []string{"successError", "successReply"}, - []string{"lock", "error", "success"}, - ), - []func(*dX) error{ - wantResponse(write(), nil, dXErrClosed), - wantResponse((*dX).Close, nil, dXErrClosed), - wantResponse(write(), dXErrClosed, dXErrClosed), - }, - }, - {"read,close,read", - testDXCombinations( - []string{"lock", "error", "successError", "successReply"}, - []string{"lock", "error", "success"}, - ), - []func(*dX) error{ - wantBlock(read(), io.EOF), - wantResponse((*dX).Close, nil, dXErrClosed), - wantResponse(read(), io.EOF, io.EOF), - }, - }, - {"write,read", - testDXCombinations( - []string{"lock"}, - []string{"lock", "error", "success"}, - ), - []func(*dX) error{ - wantBlock(write(), dXErrClosed), - wantBlock(read(), io.EOF), - }, - }, - {"write,read", - testDXCombinations( - []string{"error"}, - []string{"lock", "error", "success"}, - ), - []func(*dX) error{ - wantResponse(write(), dXErrWrite, dXErrClosed), - wantBlock(read(), io.EOF), - }, - }, - {"write,read", - testDXCombinations( - []string{"successError"}, - []string{"lock"}, - ), - []func(*dX) error{ - wantResponse(write(), nil, dXErrClosed), - wantBlock(read(), io.EOF), - }, - }, - {"write,read", - testDXCombinations( - []string{"successError"}, - []string{"error"}, - ), - []func(*dX) error{ - wantResponse(write(), nil, dXErrClosed), - wantResponse(read(), dXErrRead, io.EOF), - }, - }, - {"write,read", - testDXCombinations( - []string{"successError"}, - []string{"success"}, - ), - []func(*dX) error{ - wantResponse(write(), nil, dXErrClosed), - wantResponse(readSuccess(1, true), nil, io.EOF), - }, - }, - {"write,read", - testDXCombinations( - []string{"successReply"}, - []string{"success"}, - ), - []func(*dX) error{ - wantResponse(write(), nil, dXErrClosed), - wantResponse(readSuccess(1, false), nil, io.EOF), - }, - }, - } - for _, tc := range testCases { - t.Run(tc.description, func(t *testing.T) { - defer leaksMonitor(tc.description).checkTesting(t) - - for _, server := range tc.servers { - s, err := server() - if err != nil { - t.Error(err) - continue - } - if s == nil { - t.Error("nil server in testcase") - continue - } - - t.Run(s.LocalAddr().String(), func(t *testing.T) { - defer leaksMonitor(s.LocalAddr().String()).checkTesting(t) - for _, action := range tc.actions { - if err := action(s); err != nil { - t.Error(err) - break - } - } - s.Close() - }) + uncheckedReply := func(wantError bool) func(*Conn) error { + request := "reply" + if wantError { + request = "error" + } + return func(c *Conn) error { + cookie := c.NewCookie(false, true) + c.NewRequest([]byte(request), cookie) + _, err := cookie.Reply() + if err != nil { + return errors.New(fmt.Sprintf("unchecked request \"%v\" with reply resulted in %v, want nil", request, err)) } - }) + return nil + } } -} - -func TestConnOnNonBlockingDummyXServer(t *testing.T) { - timeout := time.Millisecond - wantResponse := func(action func(*Conn) error, want, block error) func(*Conn) error { + uncheckedNoreply := func(wantError bool) func(*Conn) error { + request := "noreply" + if wantError { + request = "error" + } return func(c *Conn) error { - actionResult := make(chan error) - timedOut := make(chan struct{}) - go func() { - err := action(c) - select { - case <-timedOut: - if err != block { - t.Errorf("after unblocking, action result=%v, want %v", err, block) - } - case actionResult <- err: - } - }() - select { - case err := <-actionResult: - if err != want { - return errors.New(fmt.Sprintf("action result=%v, want %v", err, want)) - } - case <-time.After(timeout): - close(timedOut) - return errors.New(fmt.Sprintf("action did not respond for %v, result want %v", timeout, want)) - } + cookie := c.NewCookie(false, false) + c.NewRequest([]byte(request), cookie) return nil } } - crequest := func(checked, reply bool) func(*Conn) error { + event := func() func(*Conn) error { return func(c *Conn) error { - cookie := c.NewCookie(checked, reply) - c.NewRequest([]byte("crequest"), cookie) - _, err := cookie.Reply() + _, err := c.conn.Write([]byte("event")) + if err != nil { + return errors.New(fmt.Sprintf("asked dummy server to send event, but resulted in error: %v\n", err)) + } return err } } + waitEvent := func(wantError bool) func(*Conn) error { + return func(c *Conn) error { + _, err := c.WaitForEvent() + if wantError && err == nil { + return errors.New(fmt.Sprintf("wait for event resulted in nil error, want some error")) + } + if !wantError && err != nil { + return errors.New(fmt.Sprintf("wait for event resulted in error %v, want nil error", err)) + } + return nil + } + } testCases := []struct { description string - servers []func() (*dX, error) actions []func(*Conn) error }{ - {"cclose", - testDXCombinations( - []string{"successError", "successReply"}, - []string{"success"}, - ), + {"close", []func(*Conn) error{}, }, - {"crequest", - testDXCombinations([]string{"successError"}, []string{"success"}), + {"checked requests with reply", []func(*Conn) error{ - wantResponse(crequest(true, true), dXError{1}, io.ErrShortWrite), + checkedReply(false), + checkedReply(true), + checkedReply(false), + checkedReply(true), }, }, - {"crequest", - testDXCombinations([]string{"successReply"}, []string{"success"}), + {"checked requests no reply", []func(*Conn) error{ - wantResponse(crequest(true, true), nil, io.ErrShortWrite), + checkedNoreply(false), + checkedNoreply(true), + checkedNoreply(false), + checkedNoreply(true), }, }, - // sometimes panic on unfixed branch - close of closed channel - {"cclose", - testDXCombinations([]string{"error"}, []string{"error", "success"}), - []func(*Conn) error{}, + {"unchecked requests with reply", + []func(*Conn) error{ + uncheckedReply(false), + uncheckedReply(true), + waitEvent(true), + uncheckedReply(false), + event(), + waitEvent(false), + }, + }, + {"unchecked requests no reply", + []func(*Conn) error{ + uncheckedNoreply(false), + uncheckedNoreply(true), + waitEvent(true), + uncheckedNoreply(false), + event(), + waitEvent(false), + }, }, } for _, tc := range testCases { @@ -790,52 +294,72 @@ func TestConnOnNonBlockingDummyXServer(t *testing.T) { tclm := leaksMonitor("test case " + tc.description) defer tclm.checkTesting(t) - for _, server := range tc.servers { - s, err := server() - if err != nil { - t.Error(err) - continue + seqId := uint16(1) + incrementSequenceId := func() { + // this has to be the same algorithm as in (*Conn).generateSeqIds + if seqId == uint16((1<<16)-1) { + seqId = 0 + } else { + seqId++ } - if s == nil { - t.Error("nil *dX in testcase") - continue + } + dummyXreplyer := func(request []byte) []byte { + //fmt.Printf("dummyXreplyer got request: %s\n", string(request)) + res := make([]byte, 32) + switch string(request) { + case "event": + res[0] = 128 + //fmt.Printf("dummyXreplyer sent response: %v\n", res) + return res + case "error": + res[0] = 0 // error + res[1] = 255 // error function + default: + res[0] = 1 // reply + } + Put16(res[2:], seqId) // sequence number + incrementSequenceId() + if string(request) == "noreply" { + //fmt.Printf("dummyXreplyer no response sent\n") + return nil } + //fmt.Printf("dummyXreplyer sent response: %v\n", res) + return res + } - t.Run(s.LocalAddr().String(), func(t *testing.T) { - sclm := leaksMonitor(s.LocalAddr().String()+" after sever close", tclm) - defer sclm.checkTesting(t) + sclm := leaksMonitor("after server close", tclm) + defer sclm.checkTesting(t) + s := newDummyNetConn("dummX", dummyXreplyer) + defer s.Close() - c, err := postNewConn(&Conn{conn: s}) - if err != nil { - t.Errorf("connect to dummy server error: %v", err) - return - } + c, err := postNewConn(&Conn{conn: s}) + if err != nil { + t.Errorf("connect to dummy server error: %v", err) + return + } - rlm := leaksMonitor(c.conn.LocalAddr().String() + " after actions end") - for _, action := range tc.actions { - if err := action(c); err != nil { - t.Error(err) - break - } - } - c.Close() - if err := wantResponse( - func(c *Conn) error { - if ev, err := c.WaitForEvent(); ev != nil || err != nil { - return fmt.Errorf("after (*Conn).Close, (*Conn).WaitForEvent() = (%v,%v), want (nil,nil)", ev, err) - } - return nil - }, - nil, - io.ErrShortWrite, - )(c); err != nil { - t.Error(err) + rlm := leaksMonitor("after actions end") + for _, action := range tc.actions { + if err := action(c); err != nil { + t.Error(err) + break + } + } + c.Close() + if err := wantResponse( + func(c *Conn) error { + if ev, err := c.WaitForEvent(); ev != nil || err != nil { + return fmt.Errorf("after (*Conn).Close, (*Conn).WaitForEvent() = (%v,%v), want (nil,nil)", ev, err) } - rlm.checkTesting(t) - }) - - s.Close() + return nil + }, + nil, + io.ErrShortWrite, + )(c); err != nil { + t.Error(err) } + rlm.checkTesting(t) + }) } } |
