summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--dummyNetConn.go261
-rw-r--r--dummyNetConn_test.go273
-rw-r--r--xgb_test.go816
3 files changed, 704 insertions, 646 deletions
diff --git a/dummyNetConn.go b/dummyNetConn.go
new file mode 100644
index 0000000..91bae4f
--- /dev/null
+++ b/dummyNetConn.go
@@ -0,0 +1,261 @@
+package xgb
+
+import (
+ "bytes"
+ "errors"
+ "io"
+ "net"
+ "time"
+)
+
+type dAddr struct {
+ s string
+}
+
+func (_ dAddr) Network() string { return "dummy" }
+func (a dAddr) String() string { return a.s }
+
+var (
+ dNCErrNotImplemented = errors.New("command not implemented")
+ dNCErrClosed = errors.New("server closed")
+ dNCErrWrite = errors.New("server write failed")
+ dNCErrRead = errors.New("server read failed")
+ dNCErrResponse = errors.New("server response error")
+)
+
+type dNCIoResult struct {
+ n int
+ err error
+}
+type dNCIo struct {
+ b []byte
+ result chan dNCIoResult
+}
+
+type dNCCWriteLock struct{}
+type dNCCWriteUnlock struct{}
+type dNCCWriteError struct{}
+type dNCCWriteSuccess struct{}
+type dNCCReadLock struct{}
+type dNCCReadUnlock struct{}
+type dNCCReadError struct{}
+type dNCCReadSuccess struct{}
+
+// dummy net.Conn interface. Needs to be constructed via newDummyNetConn([...]) function.
+type dNC struct {
+ reply func([]byte) []byte
+ addr dAddr
+ in, out chan dNCIo
+ control chan interface{}
+ done chan struct{}
+}
+
+// Results running dummy server, satisfying net.Conn interface for test purposes.
+// 'name' parameter will be returned via (*dNC).Local/RemoteAddr().String()
+// 'reply' parameter function will be runned only on successful (*dNC).Write(b) with 'b' as parameter to 'reply'. The result will be stored in internal buffer and can be retrieved later via (*dNC).Read([...]) method.
+// It is users responsibility to stop and clean up resources with (*dNC).Close, if not needed anymore.
+// By default, the (*dNC).Write([...]) and (*dNC).Read([...]) methods are unlocked and will not result in error.
+//TODO make (*dNC).SetDeadline, (*dNC).SetReadDeadline, (*dNC).SetWriteDeadline work proprely.
+func newDummyNetConn(name string, reply func([]byte) []byte) *dNC {
+
+ s := &dNC{
+ reply,
+ dAddr{name},
+ make(chan dNCIo), make(chan dNCIo),
+ make(chan interface{}),
+ make(chan struct{}),
+ }
+
+ in, out := s.in, chan dNCIo(nil)
+ buf := &bytes.Buffer{}
+ errorRead, errorWrite := false, false
+ lockRead := false
+
+ go func() {
+ defer close(s.done)
+ for {
+ select {
+ case dxsio := <-in:
+ if errorWrite {
+ dxsio.result <- dNCIoResult{0, dNCErrWrite}
+ break
+ }
+
+ response := s.reply(dxsio.b)
+
+ buf.Write(response)
+ dxsio.result <- dNCIoResult{len(dxsio.b), nil}
+
+ if !lockRead && buf.Len() > 0 && out == nil {
+ out = s.out
+ }
+ case dxsio := <-out:
+ if errorRead {
+ dxsio.result <- dNCIoResult{0, dNCErrRead}
+ break
+ }
+
+ n, err := buf.Read(dxsio.b)
+ dxsio.result <- dNCIoResult{n, err}
+
+ if buf.Len() == 0 {
+ out = nil
+ }
+ case ci := <-s.control:
+ if ci == nil {
+ return
+ }
+ switch ci.(type) {
+ case dNCCWriteLock:
+ in = nil
+ case dNCCWriteUnlock:
+ in = s.in
+ case dNCCWriteError:
+ errorWrite = true
+ case dNCCWriteSuccess:
+ errorWrite = false
+ case dNCCReadLock:
+ out = nil
+ lockRead = true
+ case dNCCReadUnlock:
+ lockRead = false
+ if buf.Len() > 0 && out == nil {
+ out = s.out
+ }
+ case dNCCReadError:
+ errorRead = true
+ case dNCCReadSuccess:
+ errorRead = false
+ default:
+ }
+ }
+ }
+ }()
+ return s
+}
+
+// Shuts down dummy net.Conn server. Every blocking or future method calls will do nothing and result in error.
+// Result will be dNCErrClosed if server was allready closed.
+// Server can not be unclosed.
+func (s *dNC) Close() error {
+ select {
+ case s.control <- nil:
+ <-s.done
+ return nil
+ case <-s.done:
+ }
+ return dNCErrClosed
+}
+
+// Performs a write action to server.
+// If not locked by (*dNC).WriteLock, it results in error or success. If locked, this method will block until unlocked, or closed.
+//
+// This method can be set to result in error or success, via (*dNC).WriteError() or (*dNC).WriteSuccess() methods.
+//
+// If setted to result in error, the 'reply' function will NOT be called and internal buffer will NOT increasethe.
+// Result will be (0, dNCErrWrite).
+//
+// If setted to result in success, the 'reply' function will be called and its result will be writen to internal buffer.
+// If there is something in the internal buffer, the (*dNC).Read([...]) will be unblocked (if not previously locked with (*dNC).ReadLock).
+// Result will be (len(b), nil)
+//
+// If server was closed previously, result will be (0, dNCErrClosed).
+func (s *dNC) Write(b []byte) (int, error) {
+ resChan := make(chan dNCIoResult)
+ select {
+ case s.in <- dNCIo{b, resChan}:
+ res := <-resChan
+ return res.n, res.err
+ case <-s.done:
+ }
+ return 0, dNCErrClosed
+}
+
+// Performs a read action from server.
+// If locked by (*dNC).ReadLock(), this method will block until unlocked with (*dNC).ReadUnlock(), or server closes.
+//
+// If not locked, this method can be setted to result imidiatly in error, will block if internal buffer is empty or will perform an read operation from internal buffer.
+//
+// If setted to result in error via (*dNC).ReadError(), the result will be (0, dNCErrWrite).
+//
+// If not locked and not setted to result in error via (*dNC).ReadSuccess(), this method will block until internall buffer is not empty, than it returns the result of the buffer read operation via (*bytes.Buffer).Read([...]).
+// If the internal buffer is empty after this method, all follwing (*dNC).Read([...]), requests will block until internall buffer is filled after successful write requests.
+//
+// If server was closed previously, result will be (0, io.EOF).
+func (s *dNC) Read(b []byte) (int, error) {
+ resChan := make(chan dNCIoResult)
+ select {
+ case s.out <- dNCIo{b, resChan}:
+ res := <-resChan
+ return res.n, res.err
+ case <-s.done:
+ }
+ return 0, io.EOF
+}
+func (s *dNC) LocalAddr() net.Addr { return s.addr }
+func (s *dNC) RemoteAddr() net.Addr { return s.addr }
+func (s *dNC) SetDeadline(t time.Time) error { return dNCErrNotImplemented }
+func (s *dNC) SetReadDeadline(t time.Time) error { return dNCErrNotImplemented }
+func (s *dNC) SetWriteDeadline(t time.Time) error { return dNCErrNotImplemented }
+
+func (s *dNC) Control(i interface{}) error {
+ select {
+ case s.control <- i:
+ return nil
+ case <-s.done:
+ }
+ return dNCErrClosed
+}
+
+// Locks writing. All write requests will be blocked until write is unlocked with (*dNC).WriteUnlock, or server closes.
+func (s *dNC) WriteLock() error {
+ return s.Control(dNCCWriteLock{})
+}
+
+// Unlocks writing. All blocked write requests until now will be accepted.
+func (s *dNC) WriteUnlock() error {
+ return s.Control(dNCCWriteUnlock{})
+}
+
+// Unlocks writing and makes (*dNC).Write to result (0, dNCErrWrite).
+func (s *dNC) WriteError() error {
+ if err := s.WriteUnlock(); err != nil {
+ return err
+ }
+ return s.Control(dNCCWriteError{})
+}
+
+// Unlocks writing and makes (*dNC).Write([...]) not result in error. See (*dNC).Write for details.
+func (s *dNC) WriteSuccess() error {
+ if err := s.WriteUnlock(); err != nil {
+ return err
+ }
+ return s.Control(dNCCWriteSuccess{})
+}
+
+// Locks reading. All read requests will be blocked until read is unlocked with (*dNC).ReadUnlock, or server closes.
+// (*dNC).Read([...]) wil block even after successful write.
+func (s *dNC) ReadLock() error {
+ return s.Control(dNCCReadLock{})
+}
+
+// Unlocks reading. If the internall buffer is not empty, next read will not block.
+func (s *dNC) ReadUnlock() error {
+ return s.Control(dNCCReadUnlock{})
+}
+
+// Unlocks read and makes every blocked and following (*dNC).Read([...]) imidiatly result in error. See (*dNC).Read for details.
+func (s *dNC) ReadError() error {
+ if err := s.ReadUnlock(); err != nil {
+ return err
+ }
+ return s.Control(dNCCReadError{})
+}
+
+// Unlocks read and makes every blocked and following (*dNC).Read([...]) requests be handled, if according to internal buffer. See (*dNC).Read for details.
+func (s *dNC) ReadSuccess() error {
+ if err := s.ReadUnlock(); err != nil {
+ return err
+ }
+ return s.Control(dNCCReadSuccess{})
+}
diff --git a/dummyNetConn_test.go b/dummyNetConn_test.go
new file mode 100644
index 0000000..94691be
--- /dev/null
+++ b/dummyNetConn_test.go
@@ -0,0 +1,273 @@
+package xgb
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "reflect"
+ "testing"
+ "time"
+)
+
+func TestDummyNetConn(t *testing.T) {
+ ioStatesPairGenerator := func(writeStates, readStates []string) []func() (*dNC, error) {
+ writeSetters := map[string]func(*dNC) error{
+ "lock": (*dNC).WriteLock,
+ "error": (*dNC).WriteError,
+ "success": (*dNC).WriteSuccess,
+ }
+ readSetters := map[string]func(*dNC) error{
+ "lock": (*dNC).ReadLock,
+ "error": (*dNC).ReadError,
+ "success": (*dNC).ReadSuccess,
+ }
+
+ res := []func() (*dNC, error){}
+ for _, writeState := range writeStates {
+ writeState, writeSetter := writeState, writeSetters[writeState]
+ if writeSetter == nil {
+ panic("unknown write state: " + writeState)
+ continue
+ }
+ for _, readState := range readStates {
+ readState, readSetter := readState, readSetters[readState]
+ if readSetter == nil {
+ panic("unknown read state: " + readState)
+ continue
+ }
+ res = append(res, func() (*dNC, error) {
+
+ // loopback server
+ s := newDummyNetConn("w:"+writeState+";r:"+readState, func(b []byte) []byte { return b })
+
+ if err := readSetter(s); err != nil {
+ s.Close()
+ return nil, errors.New("set read " + readState + " error: " + err.Error())
+ }
+
+ if err := writeSetter(s); err != nil {
+ s.Close()
+ return nil, errors.New("set write " + writeState + " error: " + err.Error())
+ }
+
+ return s, nil
+ })
+ }
+ }
+ return res
+ }
+
+ timeout := time.Millisecond
+ wantResponse := func(action func(*dNC) error, want, block error) func(*dNC) error {
+ return func(s *dNC) error {
+ actionResult := make(chan error)
+ timedOut := make(chan struct{})
+ go func() {
+ err := action(s)
+ select {
+ case <-timedOut:
+ if err != block {
+ t.Errorf("after unblocking, action result=%v, want %v", err, block)
+ }
+ case actionResult <- err:
+ }
+ }()
+ select {
+ case err := <-actionResult:
+ if err != want {
+ return errors.New(fmt.Sprintf("action result=%v, want %v", err, want))
+ }
+ case <-time.After(timeout):
+ close(timedOut)
+ return errors.New(fmt.Sprintf("action did not respond for %v, result want %v", timeout, want))
+ }
+ return nil
+ }
+ }
+ wantBlock := func(action func(*dNC) error, unblock error) func(*dNC) error {
+ return func(s *dNC) error {
+ actionResult := make(chan error)
+ timedOut := make(chan struct{})
+ go func() {
+ err := action(s)
+ select {
+ case <-timedOut:
+ if err != unblock {
+ t.Errorf("after unblocking, action result=%v, want %v", err, unblock)
+ }
+ case actionResult <- err:
+ }
+ }()
+ select {
+ case err := <-actionResult:
+ return errors.New(fmt.Sprintf("action result=%v, want to be blocked", err))
+ case <-time.After(timeout):
+ close(timedOut)
+ }
+ return nil
+ }
+ }
+ write := func(b string) func(*dNC) error {
+ return func(s *dNC) error {
+ n, err := s.Write([]byte(b))
+ if err == nil && n != len(b) {
+ return errors.New("Write returned nil error, but not everything was written")
+ }
+ return err
+ }
+ }
+ read := func(b string) func(*dNC) error {
+ return func(s *dNC) error {
+ r := make([]byte, len(b))
+ n, err := s.Read(r)
+ if err == nil {
+ if n != len(b) {
+ return errors.New("Read returned nil error, but not everything was read")
+ }
+ if !reflect.DeepEqual(r, []byte(b)) {
+ return errors.New("Read=\"" + string(r) + "\", want \"" + string(b) + "\"")
+ }
+ }
+ return err
+ }
+ }
+
+ testCases := []struct {
+ description string
+ servers []func() (*dNC, error)
+ actions []func(*dNC) error // actions per server
+ }{
+ {"close,close",
+ ioStatesPairGenerator(
+ []string{"lock", "error", "success"},
+ []string{"lock", "error", "success"},
+ ),
+ []func(*dNC) error{
+ wantResponse((*dNC).Close, nil, dNCErrClosed),
+ wantResponse((*dNC).Close, dNCErrClosed, dNCErrClosed),
+ },
+ },
+ {"write,close,write",
+ ioStatesPairGenerator(
+ []string{"lock"},
+ []string{"lock", "error", "success"},
+ ),
+ []func(*dNC) error{
+ wantBlock(write(""), dNCErrClosed),
+ wantResponse((*dNC).Close, nil, dNCErrClosed),
+ wantResponse(write(""), dNCErrClosed, dNCErrClosed),
+ },
+ },
+ {"write,close,write",
+ ioStatesPairGenerator(
+ []string{"error"},
+ []string{"lock", "error", "success"},
+ ),
+ []func(*dNC) error{
+ wantResponse(write(""), dNCErrWrite, dNCErrClosed),
+ wantResponse((*dNC).Close, nil, dNCErrClosed),
+ wantResponse(write(""), dNCErrClosed, dNCErrClosed),
+ },
+ },
+ {"write,close,write",
+ ioStatesPairGenerator(
+ []string{"success"},
+ []string{"lock", "error", "success"},
+ ),
+ []func(*dNC) error{
+ wantResponse(write(""), nil, dNCErrClosed),
+ wantResponse((*dNC).Close, nil, dNCErrClosed),
+ wantResponse(write(""), dNCErrClosed, dNCErrClosed),
+ },
+ },
+ {"read,close,read",
+ ioStatesPairGenerator(
+ []string{"lock", "error", "success"},
+ []string{"lock", "error", "success"},
+ ),
+ []func(*dNC) error{
+ wantBlock(read(""), io.EOF),
+ wantResponse((*dNC).Close, nil, dNCErrClosed),
+ wantResponse(read(""), io.EOF, io.EOF),
+ },
+ },
+ {"write,read",
+ ioStatesPairGenerator(
+ []string{"lock"},
+ []string{"lock", "error", "success"},
+ ),
+ []func(*dNC) error{
+ wantBlock(write("1"), dNCErrClosed),
+ wantBlock(read("1"), io.EOF),
+ },
+ },
+ {"write,read",
+ ioStatesPairGenerator(
+ []string{"error"},
+ []string{"lock", "error", "success"},
+ ),
+ []func(*dNC) error{
+ wantResponse(write("1"), dNCErrWrite, dNCErrClosed),
+ wantBlock(read("1"), io.EOF),
+ },
+ },
+ {"write,read",
+ ioStatesPairGenerator(
+ []string{"success"},
+ []string{"lock"},
+ ),
+ []func(*dNC) error{
+ wantResponse(write("1"), nil, dNCErrClosed),
+ wantBlock(read("1"), io.EOF),
+ },
+ },
+ {"write,read",
+ ioStatesPairGenerator(
+ []string{"success"},
+ []string{"error"},
+ ),
+ []func(*dNC) error{
+ wantResponse(write("1"), nil, dNCErrClosed),
+ wantResponse(read("1"), dNCErrRead, io.EOF),
+ },
+ },
+ {"write,read",
+ ioStatesPairGenerator(
+ []string{"success"},
+ []string{"success"},
+ ),
+ []func(*dNC) error{
+ wantResponse(write("1"), nil, dNCErrClosed),
+ wantResponse(read("1"), nil, io.EOF),
+ },
+ },
+ }
+ for _, tc := range testCases {
+ t.Run(tc.description, func(t *testing.T) {
+ defer leaksMonitor(tc.description).checkTesting(t)
+
+ for _, server := range tc.servers {
+ s, err := server()
+ if err != nil {
+ t.Error(err)
+ continue
+ }
+ if s == nil {
+ t.Error("nil server in testcase")
+ continue
+ }
+
+ t.Run(s.LocalAddr().String(), func(t *testing.T) {
+ defer leaksMonitor(s.LocalAddr().String()).checkTesting(t)
+ for _, action := range tc.actions {
+ if err := action(s); err != nil {
+ t.Error(err)
+ break
+ }
+ }
+ s.Close()
+ })
+ }
+ })
+ }
+}
diff --git a/xgb_test.go b/xgb_test.go
index 5540e5c..d6ddb13 100644
--- a/xgb_test.go
+++ b/xgb_test.go
@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
- "net"
"regexp"
"runtime"
"strconv"
@@ -14,320 +13,6 @@ import (
"time"
)
-type addr struct {
- s string
-}
-
-func (_ addr) Network() string { return "dummy" }
-func (a addr) String() string { return a.s }
-
-type errTimeout struct{ error }
-
-func (_ errTimeout) Timeout() bool { return true }
-
-var (
- dXErrNotImplemented = errors.New("command not implemented")
- dXErrClosed = errors.New("server closed")
- dXErrWrite = errors.New("server write failed")
- dXErrRead = errors.New("server read failed")
- dXErrResponse = errors.New("server response error")
-)
-
-type dXIoResult struct {
- n int
- err error
-}
-type dXIo struct {
- b []byte
- result chan dXIoResult
-}
-
-type dXCSendEvent struct{}
-type dXCWriteLock struct{}
-type dXCWriteUnlock struct{}
-type dXCWriteError struct{}
-type dXCWriteSuccess struct{ errorResponse bool }
-type dXCReadLock struct{}
-type dXCReadUnlock struct{}
-type dXCReadError struct{}
-type dXCReadSuccess struct{}
-
-type dXEvent struct{}
-
-func (_ dXEvent) Bytes() []byte { return nil }
-func (_ dXEvent) String() string { return "dummy X server event" }
-
-type dXError struct {
- seqId uint16
-}
-
-func (e dXError) SequenceId() uint16 { return e.seqId }
-func (_ dXError) BadId() uint32 { return 0 }
-func (_ dXError) Error() string { return "dummy X server error reply" }
-
-// dummy X server implementing net.Conn interface,
-type dX struct {
- addr addr
- in, out chan dXIo
- control chan interface{}
- done chan struct{}
-}
-
-// Results running dummy X server, satisfying net.Conn interface for test purposes.
-// It is users responsibility to stop and clean up resources with (*dX).Close, if not needed anymore.
-// By default, the read and write method are unlocked and will not result in error and read response will be a non error X reply (like with (*dX).WriteSuccess(false), (*dX).ReadSuccess()).
-//TODO make (*dX).SetDeadline, (*dX).SetReadDeadline, (*dX).SetWriteDeadline work proprely.
-func newDX(name string) *dX {
- s := &dX{
- addr{name},
- make(chan dXIo), make(chan dXIo),
- make(chan interface{}),
- make(chan struct{}),
- }
-
- seqId := uint16(1)
- incrementSequenceId := func() {
- // this has to be the same algorithm as in (*Conn).generateSeqIds
- if seqId == uint16((1<<16)-1) {
- seqId = 0
- } else {
- seqId++
- }
- }
-
- in, out := s.in, chan dXIo(nil)
- buf := &bytes.Buffer{}
- errorRead, errorWrite, errorResponse := false, false, false
- lockRead := false
-
- NewErrorFuncs[255] = func(buf []byte) Error {
- return dXError{Get16(buf[2:])}
- }
- NewEventFuncs[128&127] = func(buf []byte) Event {
- return dXEvent{}
- }
-
- go func() {
- defer close(s.done)
- for {
- select {
- case dxsio := <-in:
- if errorWrite {
- dxsio.result <- dXIoResult{0, dXErrWrite}
- break
- }
-
- response := make([]byte, 32)
- if errorResponse { // response will be error
- response[0] = 0 // error
- response[1] = 255 // error function
- } else { // response will by reply with no additional reply
- response[0] = 1 // reply
- }
- Put16(response[2:], seqId) // sequence number
- incrementSequenceId()
-
- buf.Write(response)
- dxsio.result <- dXIoResult{len(dxsio.b), nil}
-
- if !lockRead && out == nil {
- out = s.out
- }
- case dxsio := <-out:
- if errorRead {
- dxsio.result <- dXIoResult{0, dXErrRead}
- break
- }
-
- n, err := buf.Read(dxsio.b)
- dxsio.result <- dXIoResult{n, err}
-
- if buf.Len() == 0 {
- out = nil
- }
- case ci := <-s.control:
- if ci == nil {
- return
- }
- switch cs := ci.(type) {
- case dXCSendEvent:
- response := make([]byte, 32)
- response[0] = 128
- buf.Write(response)
-
- if !lockRead && out == nil {
- out = s.out
- }
- case dXCWriteLock:
- in = nil
- case dXCWriteUnlock:
- in = s.in
- case dXCWriteError:
- errorWrite = true
- case dXCWriteSuccess:
- errorWrite = false
- errorResponse = cs.errorResponse
- case dXCReadLock:
- out = nil
- lockRead = true
- case dXCReadUnlock:
- lockRead = false
- if buf.Len() > 0 && out == nil {
- out = s.out
- }
- case dXCReadError:
- errorRead = true
- case dXCReadSuccess:
- errorRead = false
- default:
- }
- }
- }
- }()
- return s
-}
-
-// Shuts down dummy X server. Every blocking or future method calls will do nothing and result in error.
-// Result will be dXErrClosed if server was allready closed.
-// Server can not be unclosed.
-func (s *dX) Close() error {
- select {
- case s.control <- nil:
- <-s.done
- return nil
- case <-s.done:
- }
- return dXErrClosed
-}
-
-// Imitates write action to X server.
-// If not locked by (*dX).WriteLock, it results in error or success.
-//
-// If Write errors, the second result parameter will be an error {dXErrWrite|dXErrClosed}, the resulting first parameter will be 0,
-// no response will be generated and the internal sequence number will not be incremented.
-//
-// If Write succeedes, it results in (len(b), nil), the (*dX).Read will be unblocked (if not locked with (*dX).ReadLock),
-// an [32]byte response will be written to buffer from which (*dX).Read reads,
-// with sequence number and proper X response type (error or reply) and the internal sequence number will be increased.
-//
-// If server was closed previously, result will be (0, dXErrClosed).
-func (s *dX) Write(b []byte) (int, error) {
- resChan := make(chan dXIoResult)
- //fmt.Printf("(*dX).Write: got write request: %v\n", b)
- select {
- case s.in <- dXIo{b, resChan}:
- //fmt.Printf("(*dX).Write: input channel has accepted request\n")
- res := <-resChan
- //fmt.Printf("(*dX).Write: got result: %v\n", res)
- return res.n, res.err
- case <-s.done:
- }
- //fmt.Printf("(*dX).Write: server was closed\n")
- return 0, dXErrClosed
-}
-
-// Imitates read action from X server.
-// If locked by (*dX).ReadLock, read will block until unlocking with (*dX).ReadUnlock, or server closes.
-//
-// If not locked, Read will result in error, or block until internal read buffer is not empty, depending on internal state.
-// The internal state can be modified via (*dX).ReadError, or (*dX).ReadSuccess
-// ReadError makes it return (0, dXErrRead), with no changes to internal buffer or state.
-// ReadSuccess makes it block until there are some write responses. After emtying the internal read buffer, all next Read requests will block untill another successful write requests.
-//
-// The resulting read success response type can be altered with (*dX).WriteSuccess method.
-//
-// If server was closed previously, result will be (0, io.EOF).
-func (s *dX) Read(b []byte) (int, error) {
- resChan := make(chan dXIoResult)
- //fmt.Printf("(*dX).Read: got read request of length: %v\n", len(b))
- select {
- case s.out <- dXIo{b, resChan}:
- //fmt.Printf("(*dX).Read: output channel has accepted request\n")
- res := <-resChan
- //fmt.Printf("(*dX).Read: got result: %v\n", res)
- //fmt.Printf("(*dX).Read: result bytes: %v\n", b)
- return res.n, res.err
- case <-s.done:
- //fmt.Printf("(*dX).Read: server was closed\n")
- }
- return 0, io.EOF
-}
-func (s *dX) LocalAddr() net.Addr { return s.addr }
-func (s *dX) RemoteAddr() net.Addr { return s.addr }
-func (s *dX) SetDeadline(t time.Time) error { return dXErrNotImplemented }
-func (s *dX) SetReadDeadline(t time.Time) error { return dXErrNotImplemented }
-func (s *dX) SetWriteDeadline(t time.Time) error { return dXErrNotImplemented }
-
-func (s *dX) Control(i interface{}) error {
- select {
- case s.control <- i:
- return nil
- case <-s.done:
- }
- return dXErrClosed
-}
-
-// Adds an Event into read buffer.
-func (s *dX) SendEvent() error {
- return s.Control(dXCSendEvent{})
-}
-
-// Locks writing. All write requests will be blocked until write is unlocked with (*dX).WriteUnlock, or server closes.
-func (s *dX) WriteLock() error {
- return s.Control(dXCWriteLock{})
-}
-
-// Unlocks writing. All blocked write requests until now will be accepted.
-func (s *dX) WriteUnlock() error {
- return s.Control(dXCWriteUnlock{})
-}
-
-// Unlocks writing and makes (*dX).Write to result (0, dXErrWrite).
-func (s *dX) WriteError() error {
- if err := s.WriteUnlock(); err != nil {
- return err
- }
- return s.Control(dXCWriteError{})
-}
-
-// Unlocks writing and makes (*dX).Write to result (len(b), nil), with a proper X reply.
-// If errorResult is true, the response will be an X error response,
-// else an normal reply. See (*dX).Write for details.
-func (s *dX) WriteSuccess(errorResult bool) error {
- if err := s.WriteUnlock(); err != nil {
- return err
- }
- return s.Control(dXCWriteSuccess{errorResult})
-}
-
-// Locks reading. All read requests will be blocked until read is unlocked with (*dX).ReadUnlock, or server closes.
-// (*dX).Read wil block even after successful write.
-func (s *dX) ReadLock() error {
- return s.Control(dXCReadLock{})
-}
-
-// Unlocks reading. If there are any unresponded requests in reading buffer, read will be unblocked.
-func (s *dX) ReadUnlock() error {
- return s.Control(dXCReadUnlock{})
-}
-
-// Unlocks read and makes every blocked and following (*dX).Read requests fail. See (*dX).Read for details.
-func (s *dX) ReadError() error {
- if err := s.ReadUnlock(); err != nil {
- return err
- }
- return s.Control(dXCReadError{})
-}
-
-// Unlocks read and makes every blocked and following (*dX).Read requests be handled, if there are any in read buffer.
-// See (*dX).Read for details.
-func (s *dX) ReadSuccess() error {
- if err := s.ReadUnlock(); err != nil {
- return err
- }
- return s.Control(dXCReadSuccess{})
-}
-
type goroutine struct {
id int
name string
@@ -429,56 +114,23 @@ func (l *leaks) ignoreLeak(grs ...goroutine) {
}
}
-func testDXCombinations(writeStates, readStates []string) []func() (*dX, error) {
- writeSetters := map[string]func(*dX) error{
- "lock": (*dX).WriteLock,
- "error": (*dX).WriteError,
- "successReply": func(s *dX) error { return s.WriteSuccess(false) },
- "successError": func(s *dX) error { return s.WriteSuccess(true) },
- }
- readSetters := map[string]func(*dX) error{
- "lock": (*dX).ReadLock,
- "error": (*dX).ReadError,
- "success": (*dX).ReadSuccess,
- }
+type dNCEvent struct{}
- res := []func() (*dX, error){}
- for _, writeState := range writeStates {
- writeState, writeSetter := writeState, writeSetters[writeState]
- if writeSetter == nil {
- panic("unknown write state: " + writeState)
- continue
- }
- for _, readState := range readStates {
- readState, readSetter := readState, readSetters[readState]
- if readSetter == nil {
- panic("unknown read state: " + readState)
- continue
- }
- res = append(res, func() (*dX, error) {
- s := newDX("write=" + writeState + ",read=" + readState)
-
- if err := readSetter(s); err != nil {
- s.Close()
- return nil, errors.New("set read " + readState + " error: " + err.Error())
- }
-
- if err := writeSetter(s); err != nil {
- s.Close()
- return nil, errors.New("set write " + writeState + " error: " + err.Error())
- }
+func (_ dNCEvent) Bytes() []byte { return nil }
+func (_ dNCEvent) String() string { return "dummy X server event" }
- return s, nil
- })
- }
- }
- return res
+type dNCError struct {
+ seqId uint16
}
-func TestDummyXServer(t *testing.T) {
+func (e dNCError) SequenceId() uint16 { return e.seqId }
+func (_ dNCError) BadId() uint32 { return 0 }
+func (_ dNCError) Error() string { return "dummy X server error reply" }
+
+func TestConnOnNonBlockingDummyXServer(t *testing.T) {
timeout := time.Millisecond
- wantResponse := func(action func(*dX) error, want, block error) func(*dX) error {
- return func(s *dX) error {
+ wantResponse := func(action func(*Conn) error, want, block error) func(*Conn) error {
+ return func(s *Conn) error {
actionResult := make(chan error)
timedOut := make(chan struct{})
go func() {
@@ -503,286 +155,138 @@ func TestDummyXServer(t *testing.T) {
return nil
}
}
- wantBlock := func(action func(*dX) error, unblock error) func(*dX) error {
- return func(s *dX) error {
- actionResult := make(chan error)
- timedOut := make(chan struct{})
- go func() {
- err := action(s)
- select {
- case <-timedOut:
- if err != unblock {
- t.Errorf("after unblocking, action result=%v, want %v", err, unblock)
- }
- case actionResult <- err:
- }
- }()
- select {
- case err := <-actionResult:
- return errors.New(fmt.Sprintf("action result=%v, want to be blocked", err))
- case <-time.After(timeout):
- close(timedOut)
- }
- return nil
- }
+ NewErrorFuncs[255] = func(buf []byte) Error {
+ return dNCError{Get16(buf[2:])}
}
- write := func() func(*dX) error {
- return func(s *dX) error {
- _, err := s.Write([]byte{1})
- return err
- }
+ NewEventFuncs[128&127] = func(buf []byte) Event {
+ return dNCEvent{}
}
- read := func() func(*dX) error {
- return func(s *dX) error {
- b := make([]byte, 32)
- _, err := s.Read(b)
- return err
+ checkedReply := func(wantError bool) func(*Conn) error {
+ request := "reply"
+ if wantError {
+ request = "error"
}
- }
- readSuccess := func(seqId uint16, errorResponse bool) func(*dX) error {
- return func(s *dX) error {
- b := make([]byte, 32)
- _, err := s.Read(b)
- if err != nil {
- return err
+ return func(c *Conn) error {
+ cookie := c.NewCookie(true, true)
+ c.NewRequest([]byte(request), cookie)
+ _, err := cookie.Reply()
+ if wantError && err == nil {
+ return errors.New(fmt.Sprintf("checked request \"%v\" with reply resulted in nil error, want some error", request))
}
- if seqId != Get16(b[2:]) {
- return errors.New(fmt.Sprintf("got read sequence number %d, want %d", Get16(b[2:]), seqId))
+ if !wantError && err != nil {
+ return errors.New(fmt.Sprintf("checked request \"%v\" with reply resulted in error %v, want nil error", request, err))
}
- b0, desc := 0, "error"
- if !errorResponse {
- b0, desc = 1, "valid"
+ return nil
+ }
+ }
+ checkedNoreply := func(wantError bool) func(*Conn) error {
+ request := "noreply"
+ if wantError {
+ request = "error"
+ }
+ return func(c *Conn) error {
+ cookie := c.NewCookie(true, false)
+ c.NewRequest([]byte(request), cookie)
+ err := cookie.Check()
+ if wantError && err == nil {
+ return errors.New(fmt.Sprintf("checked request \"%v\" with no reply resulted in nil error, want some error", request))
}
- if int(b[0]) != b0 {
- return errors.New(fmt.Sprintf("response is not an %s reply: %v", desc, b))
+ if !wantError && err != nil {
+ return errors.New(fmt.Sprintf("checked request \"%v\" with no reply resulted in error %v, want nil error", request, err))
}
return nil
}
}
-
- testCases := []struct {
- description string
- servers []func() (*dX, error)
- actions []func(*dX) error // actions per server
- }{
- {"empty",
- []func() (*dX, error){
- func() (*dX, error) { return newDX("server"), nil },
- },
- []func(*dX) error{
- func(s *dX) error { return nil },
- },
- },
- {"close,close",
- testDXCombinations(
- []string{"lock", "error", "successError", "successReply"},
- []string{"lock", "error", "success"},
- ),
- []func(*dX) error{
- wantResponse((*dX).Close, nil, dXErrClosed),
- wantResponse((*dX).Close, dXErrClosed, dXErrClosed),
- },
- },
- {"write,close,write",
- testDXCombinations(
- []string{"lock"},
- []string{"lock", "error", "success"},
- ),
- []func(*dX) error{
- wantBlock(write(), dXErrClosed),
- wantResponse((*dX).Close, nil, dXErrClosed),
- wantResponse(write(), dXErrClosed, dXErrClosed),
- },
- },
- {"write,close,write",
- testDXCombinations(
- []string{"error"},
- []string{"lock", "error", "success"},
- ),
- []func(*dX) error{
- wantResponse(write(), dXErrWrite, dXErrClosed),
- wantResponse((*dX).Close, nil, dXErrClosed),
- wantResponse(write(), dXErrClosed, dXErrClosed),
- },
- },
- {"write,close,write",
- testDXCombinations(
- []string{"successError", "successReply"},
- []string{"lock", "error", "success"},
- ),
- []func(*dX) error{
- wantResponse(write(), nil, dXErrClosed),
- wantResponse((*dX).Close, nil, dXErrClosed),
- wantResponse(write(), dXErrClosed, dXErrClosed),
- },
- },
- {"read,close,read",
- testDXCombinations(
- []string{"lock", "error", "successError", "successReply"},
- []string{"lock", "error", "success"},
- ),
- []func(*dX) error{
- wantBlock(read(), io.EOF),
- wantResponse((*dX).Close, nil, dXErrClosed),
- wantResponse(read(), io.EOF, io.EOF),
- },
- },
- {"write,read",
- testDXCombinations(
- []string{"lock"},
- []string{"lock", "error", "success"},
- ),
- []func(*dX) error{
- wantBlock(write(), dXErrClosed),
- wantBlock(read(), io.EOF),
- },
- },
- {"write,read",
- testDXCombinations(
- []string{"error"},
- []string{"lock", "error", "success"},
- ),
- []func(*dX) error{
- wantResponse(write(), dXErrWrite, dXErrClosed),
- wantBlock(read(), io.EOF),
- },
- },
- {"write,read",
- testDXCombinations(
- []string{"successError"},
- []string{"lock"},
- ),
- []func(*dX) error{
- wantResponse(write(), nil, dXErrClosed),
- wantBlock(read(), io.EOF),
- },
- },
- {"write,read",
- testDXCombinations(
- []string{"successError"},
- []string{"error"},
- ),
- []func(*dX) error{
- wantResponse(write(), nil, dXErrClosed),
- wantResponse(read(), dXErrRead, io.EOF),
- },
- },
- {"write,read",
- testDXCombinations(
- []string{"successError"},
- []string{"success"},
- ),
- []func(*dX) error{
- wantResponse(write(), nil, dXErrClosed),
- wantResponse(readSuccess(1, true), nil, io.EOF),
- },
- },
- {"write,read",
- testDXCombinations(
- []string{"successReply"},
- []string{"success"},
- ),
- []func(*dX) error{
- wantResponse(write(), nil, dXErrClosed),
- wantResponse(readSuccess(1, false), nil, io.EOF),
- },
- },
- }
- for _, tc := range testCases {
- t.Run(tc.description, func(t *testing.T) {
- defer leaksMonitor(tc.description).checkTesting(t)
-
- for _, server := range tc.servers {
- s, err := server()
- if err != nil {
- t.Error(err)
- continue
- }
- if s == nil {
- t.Error("nil server in testcase")
- continue
- }
-
- t.Run(s.LocalAddr().String(), func(t *testing.T) {
- defer leaksMonitor(s.LocalAddr().String()).checkTesting(t)
- for _, action := range tc.actions {
- if err := action(s); err != nil {
- t.Error(err)
- break
- }
- }
- s.Close()
- })
+ uncheckedReply := func(wantError bool) func(*Conn) error {
+ request := "reply"
+ if wantError {
+ request = "error"
+ }
+ return func(c *Conn) error {
+ cookie := c.NewCookie(false, true)
+ c.NewRequest([]byte(request), cookie)
+ _, err := cookie.Reply()
+ if err != nil {
+ return errors.New(fmt.Sprintf("unchecked request \"%v\" with reply resulted in %v, want nil", request, err))
}
- })
+ return nil
+ }
}
-}
-
-func TestConnOnNonBlockingDummyXServer(t *testing.T) {
- timeout := time.Millisecond
- wantResponse := func(action func(*Conn) error, want, block error) func(*Conn) error {
+ uncheckedNoreply := func(wantError bool) func(*Conn) error {
+ request := "noreply"
+ if wantError {
+ request = "error"
+ }
return func(c *Conn) error {
- actionResult := make(chan error)
- timedOut := make(chan struct{})
- go func() {
- err := action(c)
- select {
- case <-timedOut:
- if err != block {
- t.Errorf("after unblocking, action result=%v, want %v", err, block)
- }
- case actionResult <- err:
- }
- }()
- select {
- case err := <-actionResult:
- if err != want {
- return errors.New(fmt.Sprintf("action result=%v, want %v", err, want))
- }
- case <-time.After(timeout):
- close(timedOut)
- return errors.New(fmt.Sprintf("action did not respond for %v, result want %v", timeout, want))
- }
+ cookie := c.NewCookie(false, false)
+ c.NewRequest([]byte(request), cookie)
return nil
}
}
- crequest := func(checked, reply bool) func(*Conn) error {
+ event := func() func(*Conn) error {
return func(c *Conn) error {
- cookie := c.NewCookie(checked, reply)
- c.NewRequest([]byte("crequest"), cookie)
- _, err := cookie.Reply()
+ _, err := c.conn.Write([]byte("event"))
+ if err != nil {
+ return errors.New(fmt.Sprintf("asked dummy server to send event, but resulted in error: %v\n", err))
+ }
return err
}
}
+ waitEvent := func(wantError bool) func(*Conn) error {
+ return func(c *Conn) error {
+ _, err := c.WaitForEvent()
+ if wantError && err == nil {
+ return errors.New(fmt.Sprintf("wait for event resulted in nil error, want some error"))
+ }
+ if !wantError && err != nil {
+ return errors.New(fmt.Sprintf("wait for event resulted in error %v, want nil error", err))
+ }
+ return nil
+ }
+ }
testCases := []struct {
description string
- servers []func() (*dX, error)
actions []func(*Conn) error
}{
- {"cclose",
- testDXCombinations(
- []string{"successError", "successReply"},
- []string{"success"},
- ),
+ {"close",
[]func(*Conn) error{},
},
- {"crequest",
- testDXCombinations([]string{"successError"}, []string{"success"}),
+ {"checked requests with reply",
[]func(*Conn) error{
- wantResponse(crequest(true, true), dXError{1}, io.ErrShortWrite),
+ checkedReply(false),
+ checkedReply(true),
+ checkedReply(false),
+ checkedReply(true),
},
},
- {"crequest",
- testDXCombinations([]string{"successReply"}, []string{"success"}),
+ {"checked requests no reply",
[]func(*Conn) error{
- wantResponse(crequest(true, true), nil, io.ErrShortWrite),
+ checkedNoreply(false),
+ checkedNoreply(true),
+ checkedNoreply(false),
+ checkedNoreply(true),
},
},
- // sometimes panic on unfixed branch - close of closed channel
- {"cclose",
- testDXCombinations([]string{"error"}, []string{"error", "success"}),
- []func(*Conn) error{},
+ {"unchecked requests with reply",
+ []func(*Conn) error{
+ uncheckedReply(false),
+ uncheckedReply(true),
+ waitEvent(true),
+ uncheckedReply(false),
+ event(),
+ waitEvent(false),
+ },
+ },
+ {"unchecked requests no reply",
+ []func(*Conn) error{
+ uncheckedNoreply(false),
+ uncheckedNoreply(true),
+ waitEvent(true),
+ uncheckedNoreply(false),
+ event(),
+ waitEvent(false),
+ },
},
}
for _, tc := range testCases {
@@ -790,52 +294,72 @@ func TestConnOnNonBlockingDummyXServer(t *testing.T) {
tclm := leaksMonitor("test case " + tc.description)
defer tclm.checkTesting(t)
- for _, server := range tc.servers {
- s, err := server()
- if err != nil {
- t.Error(err)
- continue
+ seqId := uint16(1)
+ incrementSequenceId := func() {
+ // this has to be the same algorithm as in (*Conn).generateSeqIds
+ if seqId == uint16((1<<16)-1) {
+ seqId = 0
+ } else {
+ seqId++
}
- if s == nil {
- t.Error("nil *dX in testcase")
- continue
+ }
+ dummyXreplyer := func(request []byte) []byte {
+ //fmt.Printf("dummyXreplyer got request: %s\n", string(request))
+ res := make([]byte, 32)
+ switch string(request) {
+ case "event":
+ res[0] = 128
+ //fmt.Printf("dummyXreplyer sent response: %v\n", res)
+ return res
+ case "error":
+ res[0] = 0 // error
+ res[1] = 255 // error function
+ default:
+ res[0] = 1 // reply
+ }
+ Put16(res[2:], seqId) // sequence number
+ incrementSequenceId()
+ if string(request) == "noreply" {
+ //fmt.Printf("dummyXreplyer no response sent\n")
+ return nil
}
+ //fmt.Printf("dummyXreplyer sent response: %v\n", res)
+ return res
+ }
- t.Run(s.LocalAddr().String(), func(t *testing.T) {
- sclm := leaksMonitor(s.LocalAddr().String()+" after sever close", tclm)
- defer sclm.checkTesting(t)
+ sclm := leaksMonitor("after server close", tclm)
+ defer sclm.checkTesting(t)
+ s := newDummyNetConn("dummX", dummyXreplyer)
+ defer s.Close()
- c, err := postNewConn(&Conn{conn: s})
- if err != nil {
- t.Errorf("connect to dummy server error: %v", err)
- return
- }
+ c, err := postNewConn(&Conn{conn: s})
+ if err != nil {
+ t.Errorf("connect to dummy server error: %v", err)
+ return
+ }
- rlm := leaksMonitor(c.conn.LocalAddr().String() + " after actions end")
- for _, action := range tc.actions {
- if err := action(c); err != nil {
- t.Error(err)
- break
- }
- }
- c.Close()
- if err := wantResponse(
- func(c *Conn) error {
- if ev, err := c.WaitForEvent(); ev != nil || err != nil {
- return fmt.Errorf("after (*Conn).Close, (*Conn).WaitForEvent() = (%v,%v), want (nil,nil)", ev, err)
- }
- return nil
- },
- nil,
- io.ErrShortWrite,
- )(c); err != nil {
- t.Error(err)
+ rlm := leaksMonitor("after actions end")
+ for _, action := range tc.actions {
+ if err := action(c); err != nil {
+ t.Error(err)
+ break
+ }
+ }
+ c.Close()
+ if err := wantResponse(
+ func(c *Conn) error {
+ if ev, err := c.WaitForEvent(); ev != nil || err != nil {
+ return fmt.Errorf("after (*Conn).Close, (*Conn).WaitForEvent() = (%v,%v), want (nil,nil)", ev, err)
}
- rlm.checkTesting(t)
- })
-
- s.Close()
+ return nil
+ },
+ nil,
+ io.ErrShortWrite,
+ )(c); err != nil {
+ t.Error(err)
}
+ rlm.checkTesting(t)
+
})
}
}