From 457a66ddc44ef57d60b3a91e06aeac99d5b55c36 Mon Sep 17 00:00:00 2001 From: jEzEk Date: Fri, 5 Oct 2018 03:17:55 +0200 Subject: xgb.go test file with very basic test server --- xgb_test.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 xgb_test.go diff --git a/xgb_test.go b/xgb_test.go new file mode 100644 index 0000000..55c70d4 --- /dev/null +++ b/xgb_test.go @@ -0,0 +1,30 @@ +package xgb + +import ( + "net" + "testing" + "time" +) + +type addr struct{} + +func (_ addr) Network() string { return "" } +func (_ addr) String() string { return "" } + +type server struct{} + +func (s *server) Write(b []byte) (int, error) { return len(b), nil } +func (s *server) Read(b []byte) (int, error) { return len(b), nil } +func (s *server) Close() error { return nil } +func (s *server) LocalAddr() Addr { return addr{} } +func (s *server) RemoteAddr() Addr { return addr{} } +func (s *server) SetDeadline(t time.Time) error { return nil } +func (s *server) SetReadDeadline(t time.Time) error { return nil } +func (s *server) SetWriteDeadline(t time.Time) error { return nil } + +func dummyServer() net.Conn { + return &server +} + +func TestConnOpenClose(t *testing.T) { +} -- cgit v1.2.3 From ed5808209c37ed7364035d2c3a02806e20c7fabb Mon Sep 17 00:00:00 2001 From: jEzEk Date: Fri, 5 Oct 2018 19:40:15 +0200 Subject: testing blocking server, Conn open/close test --- xgb_test.go | 108 ++++++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 98 insertions(+), 10 deletions(-) diff --git a/xgb_test.go b/xgb_test.go index 55c70d4..e937e62 100644 --- a/xgb_test.go +++ b/xgb_test.go @@ -1,7 +1,10 @@ package xgb import ( + "errors" + "io" "net" + "runtime" "testing" "time" ) @@ -11,20 +14,105 @@ type addr struct{} func (_ addr) Network() string { return "" } func (_ addr) String() string { return "" } -type server struct{} +type server struct { + control chan interface{} + done chan struct{} +} + +func newServer() net.Conn { + s := &server{ + make(chan interface{}), + make(chan struct{}), + } + go func() { + defer close(s.done) + for { + select { + case ci := <-s.control: + if ci == nil { + return + } + } + } + }() + return s +} -func (s *server) Write(b []byte) (int, error) { return len(b), nil } -func (s *server) Read(b []byte) (int, error) { return len(b), nil } -func (s *server) Close() error { return nil } -func (s *server) LocalAddr() Addr { return addr{} } -func (s *server) RemoteAddr() Addr { return addr{} } +func (_ *server) errClosed() error { + return errors.New("closed") +} +func (_ *server) errEOF() error { + return io.EOF +} + +func (s *server) Write(b []byte) (int, error) { + select { + case <-s.done: + } + return 0, s.errClosed() +} + +func (s *server) Read(b []byte) (int, error) { + select { + case <-s.done: + } + return 0, s.errEOF() +} +func (s *server) Close() error { + select { + case s.control <- nil: + <-s.done + return nil + case <-s.done: + return s.errClosed() + } +} +func (s *server) LocalAddr() net.Addr { return addr{} } +func (s *server) RemoteAddr() net.Addr { return addr{} } func (s *server) SetDeadline(t time.Time) error { return nil } func (s *server) SetReadDeadline(t time.Time) error { return nil } func (s *server) SetWriteDeadline(t time.Time) error { return nil } -func dummyServer() net.Conn { - return &server -} - func TestConnOpenClose(t *testing.T) { + ngrs := runtime.NumGoroutine() + + t.Logf("creating new dummy blocking server") + s := newServer() + defer func() { + if err := s.Close(); err != nil { + t.Errorf("server closing error: %v", err) + } + }() + t.Logf("new server created: %v", s) + + leakTimeout := time.Second + defer func() { + if ngre := runtime.NumGoroutine(); ngrs != ngre { + t.Logf("possible goroutine leakage, waiting %v", leakTimeout) + time.Sleep(time.Second) + if ngre := runtime.NumGoroutine(); ngrs != ngre { + t.Errorf("goroutine leaks: start(%d) != end(%d)", ngrs, ngre) + } + } + }() + + c, err := postNewConn(&Conn{conn: s}) + if err != nil { + t.Fatalf("connect error: %v", err) + } + t.Logf("connection to server created: %v", c) + + closeErr := make(chan error, 1) + closeTimeout := time.Second + select { + case closeErr <- func() error { + t.Logf("closing connection to server") + c.Close() + t.Logf("connection to server closed") + return nil + }(): + case <-time.After(closeTimeout): + t.Errorf("*Conn.Close() not responded for %v", closeTimeout) + } + } -- cgit v1.2.3 From 240ff301eda1821b047e9e691e54302cf99918d2 Mon Sep 17 00:00:00 2001 From: jEzEk Date: Sat, 6 Oct 2018 18:15:38 +0200 Subject: leak testing struct & checks --- xgb_test.go | 97 ++++++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 86 insertions(+), 11 deletions(-) diff --git a/xgb_test.go b/xgb_test.go index e937e62..e6c9dea 100644 --- a/xgb_test.go +++ b/xgb_test.go @@ -1,10 +1,14 @@ package xgb import ( + "bytes" "errors" "io" "net" + "regexp" "runtime" + "strconv" + "strings" "testing" "time" ) @@ -73,8 +77,88 @@ func (s *server) SetDeadline(t time.Time) error { return nil } func (s *server) SetReadDeadline(t time.Time) error { return nil } func (s *server) SetWriteDeadline(t time.Time) error { return nil } +// ispired by https://golang.org/src/runtime/debug/stack.go?s=587:606#L21 +// stack returns a formatted stack trace of all goroutines. +// It calls runtime.Stack with a large enough buffer to capture the entire trace. +func stack() []byte { + buf := make([]byte, 1024) + for { + n := runtime.Stack(buf, true) + if n < len(buf) { + return buf[:n] + } + buf = make([]byte, 2*len(buf)) + } +} + +type goroutine struct { + id int + name string + stack []byte +} + +type leaks struct { + goroutines map[int]goroutine +} + +func leaksMonitor() leaks { + return leaks{ + leaks{}.collectGoroutines(), + } +} + +func (_ leaks) collectGoroutines() map[int]goroutine { + res := make(map[int]goroutine) + stacks := bytes.Split(stack(), []byte{'\n', '\n'}) + + regexpId := regexp.MustCompile(`^\s*goroutine\s*(\d+)`) + for _, st := range stacks { + lines := bytes.Split(st, []byte{'\n'}) + if len(lines) < 2 { + panic("routine stach has less tnan two lines: " + string(st)) + } + + idMatches := regexpId.FindSubmatch(lines[0]) + if len(idMatches) < 2 { + panic("no id found in goroutine stack's first line: " + string(lines[0])) + } + id, err := strconv.Atoi(string(idMatches[1])) + if err != nil { + panic("converting goroutine id to number error: " + err.Error()) + } + if _, ok := res[id]; ok { + panic("2 goroutines with same id: " + strconv.Itoa(id)) + } + + //TODO filter out test routines, stack routine + res[id] = goroutine{id, strings.TrimSpace(string(lines[1])), st} + } + return res +} + +func (l leaks) checkTesting(t *testing.T) { + { + goroutines := l.collectGoroutines() + if len(l.goroutines) == len(goroutines) { + return + } + } + leakTimeout := time.Second + t.Logf("possible goroutine leakage, waiting %v", leakTimeout) + goroutines := l.collectGoroutines() + if len(l.goroutines) == len(goroutines) { + return + } + t.Errorf("%d goroutine leaks: start(%d) != end(%d)", len(goroutines)-len(l.goroutines), len(l.goroutines), len(goroutines)) + for id, gr := range goroutines { + if _, ok := l.goroutines[id]; ok { + continue + } + t.Error(gr.name) + } +} + func TestConnOpenClose(t *testing.T) { - ngrs := runtime.NumGoroutine() t.Logf("creating new dummy blocking server") s := newServer() @@ -85,16 +169,7 @@ func TestConnOpenClose(t *testing.T) { }() t.Logf("new server created: %v", s) - leakTimeout := time.Second - defer func() { - if ngre := runtime.NumGoroutine(); ngrs != ngre { - t.Logf("possible goroutine leakage, waiting %v", leakTimeout) - time.Sleep(time.Second) - if ngre := runtime.NumGoroutine(); ngrs != ngre { - t.Errorf("goroutine leaks: start(%d) != end(%d)", ngrs, ngre) - } - } - }() + defer leaksMonitor().checkTesting(t) c, err := postNewConn(&Conn{conn: s}) if err != nil { -- cgit v1.2.3 From f3d222beec9aa5a17746c4d1e151246f2e1112d0 Mon Sep 17 00:00:00 2001 From: jEzEk Date: Sat, 6 Oct 2018 18:33:28 +0200 Subject: test not timed out on blocking close fix --- xgb_test.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/xgb_test.go b/xgb_test.go index e6c9dea..7badfc4 100644 --- a/xgb_test.go +++ b/xgb_test.go @@ -154,20 +154,20 @@ func (l leaks) checkTesting(t *testing.T) { if _, ok := l.goroutines[id]; ok { continue } - t.Error(gr.name) + t.Log(gr.name, "\n", string(gr.stack)) } } func TestConnOpenClose(t *testing.T) { - t.Logf("creating new dummy blocking server") + //t.Logf("creating new dummy blocking server") s := newServer() defer func() { if err := s.Close(); err != nil { t.Errorf("server closing error: %v", err) } }() - t.Logf("new server created: %v", s) + //t.Logf("new server created: %v", s) defer leaksMonitor().checkTesting(t) @@ -175,17 +175,18 @@ func TestConnOpenClose(t *testing.T) { if err != nil { t.Fatalf("connect error: %v", err) } - t.Logf("connection to server created: %v", c) + //t.Logf("connection to server created: %v", c) - closeErr := make(chan error, 1) + closeErr := make(chan struct{}) + go func() { + //t.Logf("closing connection to server") + c.Close() + close(closeErr) + }() closeTimeout := time.Second select { - case closeErr <- func() error { - t.Logf("closing connection to server") - c.Close() - t.Logf("connection to server closed") - return nil - }(): + case <-closeErr: + //t.Logf("connection to server closed") case <-time.After(closeTimeout): t.Errorf("*Conn.Close() not responded for %v", closeTimeout) } -- cgit v1.2.3 From 3ffc892ea27af72af2877fe22ecb1e91e561dc93 Mon Sep 17 00:00:00 2001 From: jEzEk Date: Sat, 6 Oct 2018 19:24:52 +0200 Subject: test dummy write error server -> conn close tests --- xgb_test.go | 153 +++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 94 insertions(+), 59 deletions(-) diff --git a/xgb_test.go b/xgb_test.go index 7badfc4..45c6176 100644 --- a/xgb_test.go +++ b/xgb_test.go @@ -13,18 +13,22 @@ import ( "time" ) -type addr struct{} +type addr struct { + s string +} -func (_ addr) Network() string { return "" } -func (_ addr) String() string { return "" } +func (_ addr) Network() string { return "dummy" } +func (a addr) String() string { return a.s } -type server struct { +type serverBlocking struct { + addr addr control chan interface{} done chan struct{} } -func newServer() net.Conn { - s := &server{ +func newServerBlocking() net.Conn { + s := &serverBlocking{ + addr{"blocking server"}, make(chan interface{}), make(chan struct{}), } @@ -42,27 +46,27 @@ func newServer() net.Conn { return s } -func (_ *server) errClosed() error { +func (_ *serverBlocking) errClosed() error { return errors.New("closed") } -func (_ *server) errEOF() error { +func (_ *serverBlocking) errEOF() error { return io.EOF } -func (s *server) Write(b []byte) (int, error) { +func (s *serverBlocking) Write(b []byte) (int, error) { select { case <-s.done: } return 0, s.errClosed() } -func (s *server) Read(b []byte) (int, error) { +func (s *serverBlocking) Read(b []byte) (int, error) { select { case <-s.done: } return 0, s.errEOF() } -func (s *server) Close() error { +func (s *serverBlocking) Close() error { select { case s.control <- nil: <-s.done @@ -71,24 +75,32 @@ func (s *server) Close() error { return s.errClosed() } } -func (s *server) LocalAddr() net.Addr { return addr{} } -func (s *server) RemoteAddr() net.Addr { return addr{} } -func (s *server) SetDeadline(t time.Time) error { return nil } -func (s *server) SetReadDeadline(t time.Time) error { return nil } -func (s *server) SetWriteDeadline(t time.Time) error { return nil } +func (s *serverBlocking) LocalAddr() net.Addr { return s.addr } +func (s *serverBlocking) RemoteAddr() net.Addr { return s.addr } +func (s *serverBlocking) SetDeadline(t time.Time) error { return nil } +func (s *serverBlocking) SetReadDeadline(t time.Time) error { return nil } +func (s *serverBlocking) SetWriteDeadline(t time.Time) error { return nil } + +type serverWriteError struct { + *serverBlocking +} -// ispired by https://golang.org/src/runtime/debug/stack.go?s=587:606#L21 -// stack returns a formatted stack trace of all goroutines. -// It calls runtime.Stack with a large enough buffer to capture the entire trace. -func stack() []byte { - buf := make([]byte, 1024) - for { - n := runtime.Stack(buf, true) - if n < len(buf) { - return buf[:n] - } - buf = make([]byte, 2*len(buf)) +func newServerWriteError() net.Conn { + s := &serverWriteError{newServerBlocking().(*serverBlocking)} + s.addr.s = "server write error" + return s +} + +func (s *serverWriteError) Write(b []byte) (int, error) { + select { + case <-s.done: + return 0, s.errClosed() + default: } + return 0, s.errWrite() +} +func (_ *serverWriteError) errWrite() error { + return errors.New("write failed") } type goroutine struct { @@ -107,9 +119,23 @@ func leaksMonitor() leaks { } } -func (_ leaks) collectGoroutines() map[int]goroutine { +// ispired by https://golang.org/src/runtime/debug/stack.go?s=587:606#L21 +// stack returns a formatted stack trace of all goroutines. +// It calls runtime.Stack with a large enough buffer to capture the entire trace. +func (_ leaks) stack() []byte { + buf := make([]byte, 1024) + for { + n := runtime.Stack(buf, true) + if n < len(buf) { + return buf[:n] + } + buf = make([]byte, 2*len(buf)) + } +} + +func (l leaks) collectGoroutines() map[int]goroutine { res := make(map[int]goroutine) - stacks := bytes.Split(stack(), []byte{'\n', '\n'}) + stacks := bytes.Split(l.stack(), []byte{'\n', '\n'}) regexpId := regexp.MustCompile(`^\s*goroutine\s*(\d+)`) for _, st := range stacks { @@ -129,9 +155,14 @@ func (_ leaks) collectGoroutines() map[int]goroutine { if _, ok := res[id]; ok { panic("2 goroutines with same id: " + strconv.Itoa(id)) } + name := strings.TrimSpace(string(lines[1])) + + //filter out our stack routine + if strings.Contains(name, "xgb.leaks.stacks") { + continue + } - //TODO filter out test routines, stack routine - res[id] = goroutine{id, strings.TrimSpace(string(lines[1])), st} + res[id] = goroutine{id, name, st} } return res } @@ -144,7 +175,8 @@ func (l leaks) checkTesting(t *testing.T) { } } leakTimeout := time.Second - t.Logf("possible goroutine leakage, waiting %v", leakTimeout) + time.Sleep(leakTimeout) + //t.Logf("possible goroutine leakage, waiting %v", leakTimeout) goroutines := l.collectGoroutines() if len(l.goroutines) == len(goroutines) { return @@ -160,35 +192,38 @@ func (l leaks) checkTesting(t *testing.T) { func TestConnOpenClose(t *testing.T) { - //t.Logf("creating new dummy blocking server") - s := newServer() - defer func() { - if err := s.Close(); err != nil { - t.Errorf("server closing error: %v", err) - } - }() - //t.Logf("new server created: %v", s) - - defer leaksMonitor().checkTesting(t) - - c, err := postNewConn(&Conn{conn: s}) - if err != nil { - t.Fatalf("connect error: %v", err) + testCases := []func() net.Conn{ + // newServerBlocking, // i'm not ready to handle this yet + newServerWriteError, } - //t.Logf("connection to server created: %v", c) + for _, tc := range testCases { + lm := leaksMonitor() + serverConn := tc() + + t.Run(serverConn.LocalAddr().String(), func(t *testing.T) { + c, err := postNewConn(&Conn{conn: serverConn}) + if err != nil { + t.Fatalf("connect error: %v", err) + } + //t.Logf("connection to server created: %v", c) + + closeErr := make(chan struct{}) + go func() { + //t.Logf("closing connection to server") + c.Close() + close(closeErr) + }() + closeTimeout := time.Second + select { + case <-closeErr: + //t.Logf("connection to server closed") + case <-time.After(closeTimeout): + t.Errorf("*Conn.Close() not responded for %v", closeTimeout) + } + }) - closeErr := make(chan struct{}) - go func() { - //t.Logf("closing connection to server") - c.Close() - close(closeErr) - }() - closeTimeout := time.Second - select { - case <-closeErr: - //t.Logf("connection to server closed") - case <-time.After(closeTimeout): - t.Errorf("*Conn.Close() not responded for %v", closeTimeout) + serverConn.Close() + lm.checkTesting(t) } } -- cgit v1.2.3 From 391f5d0d06667f92e6c14ebe9c026ae8b117da3b Mon Sep 17 00:00:00 2001 From: jEzEk Date: Sat, 6 Oct 2018 22:44:30 +0200 Subject: test fixes --- xgb_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/xgb_test.go b/xgb_test.go index 45c6176..fc8504a 100644 --- a/xgb_test.go +++ b/xgb_test.go @@ -32,7 +32,9 @@ func newServerBlocking() net.Conn { make(chan interface{}), make(chan struct{}), } + runned := make(chan struct{}) go func() { + close(runned) defer close(s.done) for { select { @@ -43,11 +45,12 @@ func newServerBlocking() net.Conn { } } }() + <-runned return s } func (_ *serverBlocking) errClosed() error { - return errors.New("closed") + return errors.New("server closed") } func (_ *serverBlocking) errEOF() error { return io.EOF @@ -197,10 +200,10 @@ func TestConnOpenClose(t *testing.T) { newServerWriteError, } for _, tc := range testCases { - lm := leaksMonitor() serverConn := tc() t.Run(serverConn.LocalAddr().String(), func(t *testing.T) { + defer leaksMonitor().checkTesting(t) c, err := postNewConn(&Conn{conn: serverConn}) if err != nil { t.Fatalf("connect error: %v", err) @@ -223,7 +226,6 @@ func TestConnOpenClose(t *testing.T) { }) serverConn.Close() - lm.checkTesting(t) } } -- cgit v1.2.3 From 753d971232b5844e73ef70ee0e9e0e9d99bcb14b Mon Sep 17 00:00:00 2001 From: jEzEk Date: Sun, 7 Oct 2018 18:02:26 +0200 Subject: tests reactor, negative leaks error fix --- xgb_test.go | 37 +++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/xgb_test.go b/xgb_test.go index fc8504a..d375261 100644 --- a/xgb_test.go +++ b/xgb_test.go @@ -26,9 +26,9 @@ type serverBlocking struct { done chan struct{} } -func newServerBlocking() net.Conn { +func newServerBlocking(name string) *serverBlocking { s := &serverBlocking{ - addr{"blocking server"}, + addr{name}, make(chan interface{}), make(chan struct{}), } @@ -48,21 +48,18 @@ func newServerBlocking() net.Conn { <-runned return s } - func (_ *serverBlocking) errClosed() error { return errors.New("server closed") } func (_ *serverBlocking) errEOF() error { return io.EOF } - func (s *serverBlocking) Write(b []byte) (int, error) { select { case <-s.done: } return 0, s.errClosed() } - func (s *serverBlocking) Read(b []byte) (int, error) { select { case <-s.done: @@ -88,12 +85,9 @@ type serverWriteError struct { *serverBlocking } -func newServerWriteError() net.Conn { - s := &serverWriteError{newServerBlocking().(*serverBlocking)} - s.addr.s = "server write error" - return s +func newServerWriteError(name string) *serverWriteError { + return &serverWriteError{newServerBlocking(name)} } - func (s *serverWriteError) Write(b []byte) (int, error) { select { case <-s.done: @@ -161,7 +155,7 @@ func (l leaks) collectGoroutines() map[int]goroutine { name := strings.TrimSpace(string(lines[1])) //filter out our stack routine - if strings.Contains(name, "xgb.leaks.stacks") { + if strings.Contains(name, "xgb.leaks.stack") { continue } @@ -173,7 +167,7 @@ func (l leaks) collectGoroutines() map[int]goroutine { func (l leaks) checkTesting(t *testing.T) { { goroutines := l.collectGoroutines() - if len(l.goroutines) == len(goroutines) { + if len(l.goroutines) >= len(goroutines) { return } } @@ -181,7 +175,7 @@ func (l leaks) checkTesting(t *testing.T) { time.Sleep(leakTimeout) //t.Logf("possible goroutine leakage, waiting %v", leakTimeout) goroutines := l.collectGoroutines() - if len(l.goroutines) == len(goroutines) { + if len(l.goroutines) >= len(goroutines) { return } t.Errorf("%d goroutine leaks: start(%d) != end(%d)", len(goroutines)-len(l.goroutines), len(l.goroutines), len(goroutines)) @@ -195,15 +189,20 @@ func (l leaks) checkTesting(t *testing.T) { func TestConnOpenClose(t *testing.T) { - testCases := []func() net.Conn{ - // newServerBlocking, // i'm not ready to handle this yet - newServerWriteError, + testCases := []struct { + name string + serverConstuctor func(string) net.Conn + }{ + //{"blocking server", func(n string) net.Conn { return newServerBlocking(n) }}, // i'm not ready to handle this yet + {"write error server", func(n string) net.Conn { return newServerWriteError(n) }}, } for _, tc := range testCases { - serverConn := tc() + t.Run(tc.name, func(t *testing.T) { + serverConn := tc.serverConstuctor(tc.name) + defer serverConn.Close() - t.Run(serverConn.LocalAddr().String(), func(t *testing.T) { defer leaksMonitor().checkTesting(t) + c, err := postNewConn(&Conn{conn: serverConn}) if err != nil { t.Fatalf("connect error: %v", err) @@ -224,8 +223,6 @@ func TestConnOpenClose(t *testing.T) { t.Errorf("*Conn.Close() not responded for %v", closeTimeout) } }) - - serverConn.Close() } } -- cgit v1.2.3 From f9b5ab059749326f11bc61d49c1241b63c5ec993 Mon Sep 17 00:00:00 2001 From: jEzEk Date: Sun, 7 Oct 2018 19:24:59 +0200 Subject: new test servers, test severs run/close testing new servers: serverWriteErrorReadError serverWriteSuccessReadBlocking serverWriteSuccessReadErrorAfterWrite --- xgb_test.go | 185 ++++++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 167 insertions(+), 18 deletions(-) diff --git a/xgb_test.go b/xgb_test.go index d375261..ae88091 100644 --- a/xgb_test.go +++ b/xgb_test.go @@ -9,6 +9,7 @@ import ( "runtime" "strconv" "strings" + "sync" "testing" "time" ) @@ -20,6 +21,13 @@ type addr struct { func (_ addr) Network() string { return "dummy" } func (a addr) String() string { return a.s } +var ( + serverErrEOF = io.EOF + serverErrClosed = errors.New("server closed") + serverErrWrite = errors.New("server write failed") + serverErrRead = errors.New("server read failed") +) + type serverBlocking struct { addr addr control chan interface{} @@ -48,23 +56,18 @@ func newServerBlocking(name string) *serverBlocking { <-runned return s } -func (_ *serverBlocking) errClosed() error { - return errors.New("server closed") -} -func (_ *serverBlocking) errEOF() error { - return io.EOF -} + func (s *serverBlocking) Write(b []byte) (int, error) { select { case <-s.done: } - return 0, s.errClosed() + return 0, serverErrClosed } func (s *serverBlocking) Read(b []byte) (int, error) { select { case <-s.done: } - return 0, s.errEOF() + return 0, serverErrEOF } func (s *serverBlocking) Close() error { select { @@ -72,7 +75,7 @@ func (s *serverBlocking) Close() error { <-s.done return nil case <-s.done: - return s.errClosed() + return serverErrClosed } } func (s *serverBlocking) LocalAddr() net.Addr { return s.addr } @@ -81,23 +84,110 @@ func (s *serverBlocking) SetDeadline(t time.Time) error { return nil } func (s *serverBlocking) SetReadDeadline(t time.Time) error { return nil } func (s *serverBlocking) SetWriteDeadline(t time.Time) error { return nil } -type serverWriteError struct { +type serverWriteErrorReadError struct { + *serverBlocking +} + +func newServerWriteErrorReadError(name string) *serverWriteErrorReadError { + return &serverWriteErrorReadError{newServerBlocking(name)} +} +func (s *serverWriteErrorReadError) Write(b []byte) (int, error) { + select { + case <-s.done: + return 0, serverErrClosed + default: + } + return 0, serverErrWrite +} +func (s *serverWriteErrorReadError) Read(b []byte) (int, error) { + select { + case <-s.done: + return 0, serverErrClosed + default: + } + return 0, serverErrRead +} + +type serverWriteErrorReadBlocking struct { + *serverBlocking +} + +func newServerWriteErrorReadBlocking(name string) *serverWriteErrorReadBlocking { + return &serverWriteErrorReadBlocking{newServerBlocking(name)} +} +func (s *serverWriteErrorReadBlocking) Write(b []byte) (int, error) { + select { + case <-s.done: + return 0, serverErrClosed + default: + } + return 0, serverErrWrite +} + +type serverWriteSuccessReadBlocking struct { + *serverBlocking +} + +func newServerWriteSuccessReadBlocking(name string) *serverWriteSuccessReadBlocking { + return &serverWriteSuccessReadBlocking{newServerBlocking(name)} +} +func (s *serverWriteSuccessReadBlocking) Write(b []byte) (int, error) { + select { + case <-s.done: + return 0, serverErrClosed + default: + } + return len(b), nil +} + +type serverWriteSuccessReadErrorAfterWrite struct { *serverBlocking + out chan struct{} + wg *sync.WaitGroup } -func newServerWriteError(name string) *serverWriteError { - return &serverWriteError{newServerBlocking(name)} +func newServerWriteSuccessReadErrorAfterWrite(name string) *serverWriteSuccessReadErrorAfterWrite { + return &serverWriteSuccessReadErrorAfterWrite{ + newServerBlocking(name), + make(chan struct{}), + &sync.WaitGroup{}, + } } -func (s *serverWriteError) Write(b []byte) (int, error) { +func (s *serverWriteSuccessReadErrorAfterWrite) Write(b []byte) (int, error) { select { case <-s.done: - return 0, s.errClosed() + return 0, serverErrClosed default: } - return 0, s.errWrite() + s.wg.Add(1) + go func() { + select { + case s.out <- struct{}{}: + case <-s.done: + } + s.wg.Done() + }() + return len(b), nil } -func (_ *serverWriteError) errWrite() error { - return errors.New("write failed") +func (s *serverWriteSuccessReadErrorAfterWrite) Read(b []byte) (int, error) { + select { + case <-s.done: + return 0, serverErrClosed + default: + } + select { + case <-s.out: + return 0, serverErrRead + case <-s.done: + } + return 0, serverErrClosed +} +func (s *serverWriteSuccessReadErrorAfterWrite) Close() error { + if err := s.serverBlocking.Close(); err != nil { + return err + } + s.wg.Wait() + return nil } type goroutine struct { @@ -187,6 +277,62 @@ func (l leaks) checkTesting(t *testing.T) { } } +func TestDummyServersRunClose(t *testing.T) { + + testCases := []struct { + name string + serverConstuctor func(string) net.Conn + }{ + {"write blocking,read blocking server", func(n string) net.Conn { return newServerBlocking(n) }}, + {"write error,read error server", func(n string) net.Conn { return newServerWriteErrorReadError(n) }}, + {"write error,read blocking server", func(n string) net.Conn { return newServerWriteErrorReadBlocking(n) }}, + {"write success,read blocking server", func(n string) net.Conn { return newServerWriteSuccessReadBlocking(n) }}, + {"write success,read error afer write server", func(n string) net.Conn { return newServerWriteSuccessReadErrorAfterWrite(n) }}, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + defer leaksMonitor().checkTesting(t) + + serverConn := tc.serverConstuctor(tc.name) + + { + closeErr := make(chan error) + go func() { + closeErr <- serverConn.Close() + close(closeErr) + }() + closeTimeout := time.Second + select { + case err := <-closeErr: + want := error(nil) + if err != want { + t.Errorf("(net.Conn).Close()=%v, want %v", err, want) + } + case <-time.After(closeTimeout): + t.Errorf("*Conn.Close() not responded for %v", closeTimeout) + } + } + { + closeErr := make(chan error) + go func() { + closeErr <- serverConn.Close() + close(closeErr) + }() + closeTimeout := time.Second + select { + case err := <-closeErr: + want := serverErrClosed + if err != want { + t.Errorf("(net.Conn).Close()=%v, want %v", err, want) + } + case <-time.After(closeTimeout): + t.Errorf("*Conn.Close() not responded for %v", closeTimeout) + } + } + }) + } +} + func TestConnOpenClose(t *testing.T) { testCases := []struct { @@ -194,7 +340,10 @@ func TestConnOpenClose(t *testing.T) { serverConstuctor func(string) net.Conn }{ //{"blocking server", func(n string) net.Conn { return newServerBlocking(n) }}, // i'm not ready to handle this yet - {"write error server", func(n string) net.Conn { return newServerWriteError(n) }}, + {"write error,read error server", func(n string) net.Conn { return newServerWriteErrorReadError(n) }}, + {"write error,read blocking server", func(n string) net.Conn { return newServerWriteErrorReadBlocking(n) }}, + {"write success,read blocking server", func(n string) net.Conn { return newServerWriteSuccessReadBlocking(n) }}, + {"write success,read error afer write server", func(n string) net.Conn { return newServerWriteSuccessReadErrorAfterWrite(n) }}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { -- cgit v1.2.3 From 6a67513dc990e60178c2de432a2e5a1042351b56 Mon Sep 17 00:00:00 2001 From: jEzEk Date: Tue, 9 Oct 2018 19:34:28 +0200 Subject: test dummy X server is responding & functional --- xgb_test.go | 139 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 131 insertions(+), 8 deletions(-) diff --git a/xgb_test.go b/xgb_test.go index ae88091..7eda5e4 100644 --- a/xgb_test.go +++ b/xgb_test.go @@ -3,6 +3,7 @@ package xgb import ( "bytes" "errors" + "fmt" "io" "net" "regexp" @@ -21,13 +22,133 @@ type addr struct { func (_ addr) Network() string { return "dummy" } func (a addr) String() string { return a.s } +type errTimeout struct{ error } + +func (_ errTimeout) Timeout() bool { return true } + var ( - serverErrEOF = io.EOF - serverErrClosed = errors.New("server closed") - serverErrWrite = errors.New("server write failed") - serverErrRead = errors.New("server read failed") + serverErrNotImplemented = errors.New("command not implemented") + serverErrEOF = io.EOF + serverErrClosed = errors.New("server closed") + serverErrWrite = errors.New("server write failed") + serverErrRead = errors.New("server read failed") ) +type dXIoResult struct { + n int + err error +} +type dXIo struct { + b []byte + result chan dXIoResult +} + +// dumm server implementing net.Conn interface, +// Read blocks until Write, pipes Write to Read, than Read blocks again. +type dX struct { + addr addr + in, out chan dXIo + control chan interface{} + done chan struct{} +} + +func newDX(name string) *dX { + s := &dX{ + addr{name}, + make(chan dXIo), make(chan dXIo), + make(chan interface{}), + make(chan struct{}), + } + + seqId := uint16(1) + incrementSequenceId := func() { + // this has to be the same algorithm as in (*Conn).generateSeqIds + if seqId == uint16((1<<16)-1) { + seqId = 0 + } else { + seqId++ + } + } + + in, out := s.in, chan dXIo(nil) + buf := &bytes.Buffer{} + + go func() { + defer close(s.done) + for { + select { + case dxsio := <-in: + response := make([]byte, 32) + response[0] = 1 // not error reply + Put16(response[2:], seqId) // sequence number + + buf.Write(response) + incrementSequenceId() + dxsio.result <- dXIoResult{len(dxsio.b), nil} + + if out == nil && buf.Len() > 0 { + out = s.out + } + case dxsio := <-out: + n, err := buf.Read(dxsio.b) + dxsio.result <- dXIoResult{n, err} + + if buf.Len() == 0 { + out = nil + } + case ci := <-s.control: + if ci == nil { + return + } + } + } + }() + return s +} +func (s *dX) Close() error { + select { + case s.control <- nil: + <-s.done + return nil + case <-s.done: + } + return serverErrClosed +} +func (s *dX) Write(b []byte) (int, error) { + resChan := make(chan dXIoResult) + fmt.Printf("(*dX).Write: got write request: %v\n", b) + select { + case s.in <- dXIo{b, resChan}: + fmt.Printf("(*dX).Write: input channel has accepted request\n") + res := <-resChan + fmt.Printf("(*dX).Write: got result: %v\n", res) + return res.n, res.err + case <-s.done: + } + fmt.Printf("(*dX).Write: server was closed\n") + return 0, serverErrClosed +} +func (s *dX) Read(b []byte) (int, error) { + resChan := make(chan dXIoResult) + fmt.Printf("(*dX).Read: got read request of length: %v\n", len(b)) + select { + case s.out <- dXIo{b, resChan}: + fmt.Printf("(*dX).Read: output channel has accepted request\n") + res := <-resChan + fmt.Printf("(*dX).Read: got result: %v\n", res) + fmt.Printf("(*dX).Read: result bytes: %v\n", b) + return res.n, res.err + case <-s.done: + fmt.Printf("(*dX).Read: server was closed\n") + } + return 0, serverErrClosed +} +func (s *dX) LocalAddr() net.Addr { return s.addr } +func (s *dX) RemoteAddr() net.Addr { return s.addr } +func (s *dX) SetDeadline(t time.Time) error { return serverErrNotImplemented } +func (s *dX) SetReadDeadline(t time.Time) error { return serverErrNotImplemented } +func (s *dX) SetWriteDeadline(t time.Time) error { return serverErrNotImplemented } + type serverBlocking struct { addr addr control chan interface{} @@ -288,6 +409,7 @@ func TestDummyServersRunClose(t *testing.T) { {"write error,read blocking server", func(n string) net.Conn { return newServerWriteErrorReadBlocking(n) }}, {"write success,read blocking server", func(n string) net.Conn { return newServerWriteSuccessReadBlocking(n) }}, {"write success,read error afer write server", func(n string) net.Conn { return newServerWriteSuccessReadErrorAfterWrite(n) }}, + {"write success,read success afer write server", func(n string) net.Conn { return newDX(n) }}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -340,10 +462,11 @@ func TestConnOpenClose(t *testing.T) { serverConstuctor func(string) net.Conn }{ //{"blocking server", func(n string) net.Conn { return newServerBlocking(n) }}, // i'm not ready to handle this yet - {"write error,read error server", func(n string) net.Conn { return newServerWriteErrorReadError(n) }}, - {"write error,read blocking server", func(n string) net.Conn { return newServerWriteErrorReadBlocking(n) }}, - {"write success,read blocking server", func(n string) net.Conn { return newServerWriteSuccessReadBlocking(n) }}, - {"write success,read error afer write server", func(n string) net.Conn { return newServerWriteSuccessReadErrorAfterWrite(n) }}, + //{"write error,read error server", func(n string) net.Conn { return newServerWriteErrorReadError(n) }}, + //{"write error,read blocking server", func(n string) net.Conn { return newServerWriteErrorReadBlocking(n) }}, + //{"write success,read blocking server", func(n string) net.Conn { return newServerWriteSuccessReadBlocking(n) }}, + //{"write success,read error afer write server", func(n string) net.Conn { return newServerWriteSuccessReadErrorAfterWrite(n) }}, + {"write success,read success afer write server", func(n string) net.Conn { return newDX(n) }}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { -- cgit v1.2.3 From a2583299cf9dcb5b8e59d636b0aa8341fab9a3af Mon Sep 17 00:00:00 2001 From: jEzEk Date: Sat, 13 Oct 2018 19:34:49 +0200 Subject: test server w tests, test *Conn open/close --- xgb_test.go | 849 ++++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 595 insertions(+), 254 deletions(-) diff --git a/xgb_test.go b/xgb_test.go index 7eda5e4..5540e5c 100644 --- a/xgb_test.go +++ b/xgb_test.go @@ -10,7 +10,6 @@ import ( "runtime" "strconv" "strings" - "sync" "testing" "time" ) @@ -27,11 +26,11 @@ type errTimeout struct{ error } func (_ errTimeout) Timeout() bool { return true } var ( - serverErrNotImplemented = errors.New("command not implemented") - serverErrEOF = io.EOF - serverErrClosed = errors.New("server closed") - serverErrWrite = errors.New("server write failed") - serverErrRead = errors.New("server read failed") + dXErrNotImplemented = errors.New("command not implemented") + dXErrClosed = errors.New("server closed") + dXErrWrite = errors.New("server write failed") + dXErrRead = errors.New("server read failed") + dXErrResponse = errors.New("server response error") ) type dXIoResult struct { @@ -43,8 +42,30 @@ type dXIo struct { result chan dXIoResult } -// dumm server implementing net.Conn interface, -// Read blocks until Write, pipes Write to Read, than Read blocks again. +type dXCSendEvent struct{} +type dXCWriteLock struct{} +type dXCWriteUnlock struct{} +type dXCWriteError struct{} +type dXCWriteSuccess struct{ errorResponse bool } +type dXCReadLock struct{} +type dXCReadUnlock struct{} +type dXCReadError struct{} +type dXCReadSuccess struct{} + +type dXEvent struct{} + +func (_ dXEvent) Bytes() []byte { return nil } +func (_ dXEvent) String() string { return "dummy X server event" } + +type dXError struct { + seqId uint16 +} + +func (e dXError) SequenceId() uint16 { return e.seqId } +func (_ dXError) BadId() uint32 { return 0 } +func (_ dXError) Error() string { return "dummy X server error reply" } + +// dummy X server implementing net.Conn interface, type dX struct { addr addr in, out chan dXIo @@ -52,6 +73,10 @@ type dX struct { done chan struct{} } +// Results running dummy X server, satisfying net.Conn interface for test purposes. +// It is users responsibility to stop and clean up resources with (*dX).Close, if not needed anymore. +// By default, the read and write method are unlocked and will not result in error and read response will be a non error X reply (like with (*dX).WriteSuccess(false), (*dX).ReadSuccess()). +//TODO make (*dX).SetDeadline, (*dX).SetReadDeadline, (*dX).SetWriteDeadline work proprely. func newDX(name string) *dX { s := &dX{ addr{name}, @@ -72,24 +97,48 @@ func newDX(name string) *dX { in, out := s.in, chan dXIo(nil) buf := &bytes.Buffer{} + errorRead, errorWrite, errorResponse := false, false, false + lockRead := false + + NewErrorFuncs[255] = func(buf []byte) Error { + return dXError{Get16(buf[2:])} + } + NewEventFuncs[128&127] = func(buf []byte) Event { + return dXEvent{} + } go func() { defer close(s.done) for { select { case dxsio := <-in: + if errorWrite { + dxsio.result <- dXIoResult{0, dXErrWrite} + break + } + response := make([]byte, 32) - response[0] = 1 // not error reply + if errorResponse { // response will be error + response[0] = 0 // error + response[1] = 255 // error function + } else { // response will by reply with no additional reply + response[0] = 1 // reply + } Put16(response[2:], seqId) // sequence number + incrementSequenceId() buf.Write(response) - incrementSequenceId() dxsio.result <- dXIoResult{len(dxsio.b), nil} - if out == nil && buf.Len() > 0 { + if !lockRead && out == nil { out = s.out } case dxsio := <-out: + if errorRead { + dxsio.result <- dXIoResult{0, dXErrRead} + break + } + n, err := buf.Read(dxsio.b) dxsio.result <- dXIoResult{n, err} @@ -100,11 +149,47 @@ func newDX(name string) *dX { if ci == nil { return } + switch cs := ci.(type) { + case dXCSendEvent: + response := make([]byte, 32) + response[0] = 128 + buf.Write(response) + + if !lockRead && out == nil { + out = s.out + } + case dXCWriteLock: + in = nil + case dXCWriteUnlock: + in = s.in + case dXCWriteError: + errorWrite = true + case dXCWriteSuccess: + errorWrite = false + errorResponse = cs.errorResponse + case dXCReadLock: + out = nil + lockRead = true + case dXCReadUnlock: + lockRead = false + if buf.Len() > 0 && out == nil { + out = s.out + } + case dXCReadError: + errorRead = true + case dXCReadSuccess: + errorRead = false + default: + } } } }() return s } + +// Shuts down dummy X server. Every blocking or future method calls will do nothing and result in error. +// Result will be dXErrClosed if server was allready closed. +// Server can not be unclosed. func (s *dX) Close() error { select { case s.control <- nil: @@ -112,203 +197,135 @@ func (s *dX) Close() error { return nil case <-s.done: } - return serverErrClosed -} + return dXErrClosed +} + +// Imitates write action to X server. +// If not locked by (*dX).WriteLock, it results in error or success. +// +// If Write errors, the second result parameter will be an error {dXErrWrite|dXErrClosed}, the resulting first parameter will be 0, +// no response will be generated and the internal sequence number will not be incremented. +// +// If Write succeedes, it results in (len(b), nil), the (*dX).Read will be unblocked (if not locked with (*dX).ReadLock), +// an [32]byte response will be written to buffer from which (*dX).Read reads, +// with sequence number and proper X response type (error or reply) and the internal sequence number will be increased. +// +// If server was closed previously, result will be (0, dXErrClosed). func (s *dX) Write(b []byte) (int, error) { resChan := make(chan dXIoResult) - fmt.Printf("(*dX).Write: got write request: %v\n", b) + //fmt.Printf("(*dX).Write: got write request: %v\n", b) select { case s.in <- dXIo{b, resChan}: - fmt.Printf("(*dX).Write: input channel has accepted request\n") + //fmt.Printf("(*dX).Write: input channel has accepted request\n") res := <-resChan - fmt.Printf("(*dX).Write: got result: %v\n", res) + //fmt.Printf("(*dX).Write: got result: %v\n", res) return res.n, res.err case <-s.done: } - fmt.Printf("(*dX).Write: server was closed\n") - return 0, serverErrClosed -} + //fmt.Printf("(*dX).Write: server was closed\n") + return 0, dXErrClosed +} + +// Imitates read action from X server. +// If locked by (*dX).ReadLock, read will block until unlocking with (*dX).ReadUnlock, or server closes. +// +// If not locked, Read will result in error, or block until internal read buffer is not empty, depending on internal state. +// The internal state can be modified via (*dX).ReadError, or (*dX).ReadSuccess +// ReadError makes it return (0, dXErrRead), with no changes to internal buffer or state. +// ReadSuccess makes it block until there are some write responses. After emtying the internal read buffer, all next Read requests will block untill another successful write requests. +// +// The resulting read success response type can be altered with (*dX).WriteSuccess method. +// +// If server was closed previously, result will be (0, io.EOF). func (s *dX) Read(b []byte) (int, error) { resChan := make(chan dXIoResult) - fmt.Printf("(*dX).Read: got read request of length: %v\n", len(b)) + //fmt.Printf("(*dX).Read: got read request of length: %v\n", len(b)) select { case s.out <- dXIo{b, resChan}: - fmt.Printf("(*dX).Read: output channel has accepted request\n") + //fmt.Printf("(*dX).Read: output channel has accepted request\n") res := <-resChan - fmt.Printf("(*dX).Read: got result: %v\n", res) - fmt.Printf("(*dX).Read: result bytes: %v\n", b) + //fmt.Printf("(*dX).Read: got result: %v\n", res) + //fmt.Printf("(*dX).Read: result bytes: %v\n", b) return res.n, res.err case <-s.done: - fmt.Printf("(*dX).Read: server was closed\n") + //fmt.Printf("(*dX).Read: server was closed\n") } - return 0, serverErrClosed + return 0, io.EOF } func (s *dX) LocalAddr() net.Addr { return s.addr } func (s *dX) RemoteAddr() net.Addr { return s.addr } -func (s *dX) SetDeadline(t time.Time) error { return serverErrNotImplemented } -func (s *dX) SetReadDeadline(t time.Time) error { return serverErrNotImplemented } -func (s *dX) SetWriteDeadline(t time.Time) error { return serverErrNotImplemented } - -type serverBlocking struct { - addr addr - control chan interface{} - done chan struct{} -} - -func newServerBlocking(name string) *serverBlocking { - s := &serverBlocking{ - addr{name}, - make(chan interface{}), - make(chan struct{}), - } - runned := make(chan struct{}) - go func() { - close(runned) - defer close(s.done) - for { - select { - case ci := <-s.control: - if ci == nil { - return - } - } - } - }() - <-runned - return s -} +func (s *dX) SetDeadline(t time.Time) error { return dXErrNotImplemented } +func (s *dX) SetReadDeadline(t time.Time) error { return dXErrNotImplemented } +func (s *dX) SetWriteDeadline(t time.Time) error { return dXErrNotImplemented } -func (s *serverBlocking) Write(b []byte) (int, error) { +func (s *dX) Control(i interface{}) error { select { - case <-s.done: - } - return 0, serverErrClosed -} -func (s *serverBlocking) Read(b []byte) (int, error) { - select { - case <-s.done: - } - return 0, serverErrEOF -} -func (s *serverBlocking) Close() error { - select { - case s.control <- nil: - <-s.done + case s.control <- i: return nil case <-s.done: - return serverErrClosed } -} -func (s *serverBlocking) LocalAddr() net.Addr { return s.addr } -func (s *serverBlocking) RemoteAddr() net.Addr { return s.addr } -func (s *serverBlocking) SetDeadline(t time.Time) error { return nil } -func (s *serverBlocking) SetReadDeadline(t time.Time) error { return nil } -func (s *serverBlocking) SetWriteDeadline(t time.Time) error { return nil } - -type serverWriteErrorReadError struct { - *serverBlocking + return dXErrClosed } -func newServerWriteErrorReadError(name string) *serverWriteErrorReadError { - return &serverWriteErrorReadError{newServerBlocking(name)} -} -func (s *serverWriteErrorReadError) Write(b []byte) (int, error) { - select { - case <-s.done: - return 0, serverErrClosed - default: - } - return 0, serverErrWrite -} -func (s *serverWriteErrorReadError) Read(b []byte) (int, error) { - select { - case <-s.done: - return 0, serverErrClosed - default: - } - return 0, serverErrRead +// Adds an Event into read buffer. +func (s *dX) SendEvent() error { + return s.Control(dXCSendEvent{}) } -type serverWriteErrorReadBlocking struct { - *serverBlocking +// Locks writing. All write requests will be blocked until write is unlocked with (*dX).WriteUnlock, or server closes. +func (s *dX) WriteLock() error { + return s.Control(dXCWriteLock{}) } -func newServerWriteErrorReadBlocking(name string) *serverWriteErrorReadBlocking { - return &serverWriteErrorReadBlocking{newServerBlocking(name)} -} -func (s *serverWriteErrorReadBlocking) Write(b []byte) (int, error) { - select { - case <-s.done: - return 0, serverErrClosed - default: - } - return 0, serverErrWrite +// Unlocks writing. All blocked write requests until now will be accepted. +func (s *dX) WriteUnlock() error { + return s.Control(dXCWriteUnlock{}) } -type serverWriteSuccessReadBlocking struct { - *serverBlocking +// Unlocks writing and makes (*dX).Write to result (0, dXErrWrite). +func (s *dX) WriteError() error { + if err := s.WriteUnlock(); err != nil { + return err + } + return s.Control(dXCWriteError{}) } -func newServerWriteSuccessReadBlocking(name string) *serverWriteSuccessReadBlocking { - return &serverWriteSuccessReadBlocking{newServerBlocking(name)} -} -func (s *serverWriteSuccessReadBlocking) Write(b []byte) (int, error) { - select { - case <-s.done: - return 0, serverErrClosed - default: +// Unlocks writing and makes (*dX).Write to result (len(b), nil), with a proper X reply. +// If errorResult is true, the response will be an X error response, +// else an normal reply. See (*dX).Write for details. +func (s *dX) WriteSuccess(errorResult bool) error { + if err := s.WriteUnlock(); err != nil { + return err } - return len(b), nil + return s.Control(dXCWriteSuccess{errorResult}) } -type serverWriteSuccessReadErrorAfterWrite struct { - *serverBlocking - out chan struct{} - wg *sync.WaitGroup +// Locks reading. All read requests will be blocked until read is unlocked with (*dX).ReadUnlock, or server closes. +// (*dX).Read wil block even after successful write. +func (s *dX) ReadLock() error { + return s.Control(dXCReadLock{}) } -func newServerWriteSuccessReadErrorAfterWrite(name string) *serverWriteSuccessReadErrorAfterWrite { - return &serverWriteSuccessReadErrorAfterWrite{ - newServerBlocking(name), - make(chan struct{}), - &sync.WaitGroup{}, - } +// Unlocks reading. If there are any unresponded requests in reading buffer, read will be unblocked. +func (s *dX) ReadUnlock() error { + return s.Control(dXCReadUnlock{}) } -func (s *serverWriteSuccessReadErrorAfterWrite) Write(b []byte) (int, error) { - select { - case <-s.done: - return 0, serverErrClosed - default: - } - s.wg.Add(1) - go func() { - select { - case s.out <- struct{}{}: - case <-s.done: - } - s.wg.Done() - }() - return len(b), nil -} -func (s *serverWriteSuccessReadErrorAfterWrite) Read(b []byte) (int, error) { - select { - case <-s.done: - return 0, serverErrClosed - default: - } - select { - case <-s.out: - return 0, serverErrRead - case <-s.done: + +// Unlocks read and makes every blocked and following (*dX).Read requests fail. See (*dX).Read for details. +func (s *dX) ReadError() error { + if err := s.ReadUnlock(); err != nil { + return err } - return 0, serverErrClosed + return s.Control(dXCReadError{}) } -func (s *serverWriteSuccessReadErrorAfterWrite) Close() error { - if err := s.serverBlocking.Close(); err != nil { + +// Unlocks read and makes every blocked and following (*dX).Read requests be handled, if there are any in read buffer. +// See (*dX).Read for details. +func (s *dX) ReadSuccess() error { + if err := s.ReadUnlock(); err != nil { return err } - s.wg.Wait() - return nil + return s.Control(dXCReadSuccess{}) } type goroutine struct { @@ -318,12 +335,16 @@ type goroutine struct { } type leaks struct { + name string goroutines map[int]goroutine + report []*leaks } -func leaksMonitor() leaks { - return leaks{ +func leaksMonitor(name string, monitors ...*leaks) *leaks { + return &leaks{ + name, leaks{}.collectGoroutines(), + monitors, } } @@ -375,126 +396,446 @@ func (l leaks) collectGoroutines() map[int]goroutine { return res } -func (l leaks) checkTesting(t *testing.T) { - { - goroutines := l.collectGoroutines() - if len(l.goroutines) >= len(goroutines) { - return +func (l leaks) leakingGoroutines() []goroutine { + goroutines := l.collectGoroutines() + res := []goroutine{} + for id, gr := range goroutines { + if _, ok := l.goroutines[id]; ok { + continue } + res = append(res, gr) + } + return res +} +func (l leaks) checkTesting(t *testing.T) { + if len(l.leakingGoroutines()) == 0 { + return } leakTimeout := time.Second time.Sleep(leakTimeout) //t.Logf("possible goroutine leakage, waiting %v", leakTimeout) - goroutines := l.collectGoroutines() - if len(l.goroutines) >= len(goroutines) { - return + grs := l.leakingGoroutines() + for _, gr := range grs { + t.Errorf("%s: %s is leaking", l.name, gr.name) + //t.Errorf("%s: %s is leaking\n%v", l.name, gr.name, string(gr.stack)) } - t.Errorf("%d goroutine leaks: start(%d) != end(%d)", len(goroutines)-len(l.goroutines), len(l.goroutines), len(goroutines)) - for id, gr := range goroutines { - if _, ok := l.goroutines[id]; ok { - continue - } - t.Log(gr.name, "\n", string(gr.stack)) + for _, rl := range l.report { + rl.ignoreLeak(grs...) + } +} +func (l *leaks) ignoreLeak(grs ...goroutine) { + for _, gr := range grs { + l.goroutines[gr.id] = gr } } -func TestDummyServersRunClose(t *testing.T) { +func testDXCombinations(writeStates, readStates []string) []func() (*dX, error) { + writeSetters := map[string]func(*dX) error{ + "lock": (*dX).WriteLock, + "error": (*dX).WriteError, + "successReply": func(s *dX) error { return s.WriteSuccess(false) }, + "successError": func(s *dX) error { return s.WriteSuccess(true) }, + } + readSetters := map[string]func(*dX) error{ + "lock": (*dX).ReadLock, + "error": (*dX).ReadError, + "success": (*dX).ReadSuccess, + } - testCases := []struct { - name string - serverConstuctor func(string) net.Conn - }{ - {"write blocking,read blocking server", func(n string) net.Conn { return newServerBlocking(n) }}, - {"write error,read error server", func(n string) net.Conn { return newServerWriteErrorReadError(n) }}, - {"write error,read blocking server", func(n string) net.Conn { return newServerWriteErrorReadBlocking(n) }}, - {"write success,read blocking server", func(n string) net.Conn { return newServerWriteSuccessReadBlocking(n) }}, - {"write success,read error afer write server", func(n string) net.Conn { return newServerWriteSuccessReadErrorAfterWrite(n) }}, - {"write success,read success afer write server", func(n string) net.Conn { return newDX(n) }}, + res := []func() (*dX, error){} + for _, writeState := range writeStates { + writeState, writeSetter := writeState, writeSetters[writeState] + if writeSetter == nil { + panic("unknown write state: " + writeState) + continue + } + for _, readState := range readStates { + readState, readSetter := readState, readSetters[readState] + if readSetter == nil { + panic("unknown read state: " + readState) + continue + } + res = append(res, func() (*dX, error) { + s := newDX("write=" + writeState + ",read=" + readState) + + if err := readSetter(s); err != nil { + s.Close() + return nil, errors.New("set read " + readState + " error: " + err.Error()) + } + + if err := writeSetter(s); err != nil { + s.Close() + return nil, errors.New("set write " + writeState + " error: " + err.Error()) + } + + return s, nil + }) + } } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - defer leaksMonitor().checkTesting(t) - - serverConn := tc.serverConstuctor(tc.name) - - { - closeErr := make(chan error) - go func() { - closeErr <- serverConn.Close() - close(closeErr) - }() - closeTimeout := time.Second + return res +} + +func TestDummyXServer(t *testing.T) { + timeout := time.Millisecond + wantResponse := func(action func(*dX) error, want, block error) func(*dX) error { + return func(s *dX) error { + actionResult := make(chan error) + timedOut := make(chan struct{}) + go func() { + err := action(s) select { - case err := <-closeErr: - want := error(nil) - if err != want { - t.Errorf("(net.Conn).Close()=%v, want %v", err, want) + case <-timedOut: + if err != block { + t.Errorf("after unblocking, action result=%v, want %v", err, block) } - case <-time.After(closeTimeout): - t.Errorf("*Conn.Close() not responded for %v", closeTimeout) + case actionResult <- err: + } + }() + select { + case err := <-actionResult: + if err != want { + return errors.New(fmt.Sprintf("action result=%v, want %v", err, want)) } + case <-time.After(timeout): + close(timedOut) + return errors.New(fmt.Sprintf("action did not respond for %v, result want %v", timeout, want)) } - { - closeErr := make(chan error) - go func() { - closeErr <- serverConn.Close() - close(closeErr) - }() - closeTimeout := time.Second + return nil + } + } + wantBlock := func(action func(*dX) error, unblock error) func(*dX) error { + return func(s *dX) error { + actionResult := make(chan error) + timedOut := make(chan struct{}) + go func() { + err := action(s) select { - case err := <-closeErr: - want := serverErrClosed - if err != want { - t.Errorf("(net.Conn).Close()=%v, want %v", err, want) + case <-timedOut: + if err != unblock { + t.Errorf("after unblocking, action result=%v, want %v", err, unblock) } - case <-time.After(closeTimeout): - t.Errorf("*Conn.Close() not responded for %v", closeTimeout) + case actionResult <- err: } + }() + select { + case err := <-actionResult: + return errors.New(fmt.Sprintf("action result=%v, want to be blocked", err)) + case <-time.After(timeout): + close(timedOut) } - }) + return nil + } + } + write := func() func(*dX) error { + return func(s *dX) error { + _, err := s.Write([]byte{1}) + return err + } + } + read := func() func(*dX) error { + return func(s *dX) error { + b := make([]byte, 32) + _, err := s.Read(b) + return err + } + } + readSuccess := func(seqId uint16, errorResponse bool) func(*dX) error { + return func(s *dX) error { + b := make([]byte, 32) + _, err := s.Read(b) + if err != nil { + return err + } + if seqId != Get16(b[2:]) { + return errors.New(fmt.Sprintf("got read sequence number %d, want %d", Get16(b[2:]), seqId)) + } + b0, desc := 0, "error" + if !errorResponse { + b0, desc = 1, "valid" + } + if int(b[0]) != b0 { + return errors.New(fmt.Sprintf("response is not an %s reply: %v", desc, b)) + } + return nil + } } -} - -func TestConnOpenClose(t *testing.T) { testCases := []struct { - name string - serverConstuctor func(string) net.Conn + description string + servers []func() (*dX, error) + actions []func(*dX) error // actions per server }{ - //{"blocking server", func(n string) net.Conn { return newServerBlocking(n) }}, // i'm not ready to handle this yet - //{"write error,read error server", func(n string) net.Conn { return newServerWriteErrorReadError(n) }}, - //{"write error,read blocking server", func(n string) net.Conn { return newServerWriteErrorReadBlocking(n) }}, - //{"write success,read blocking server", func(n string) net.Conn { return newServerWriteSuccessReadBlocking(n) }}, - //{"write success,read error afer write server", func(n string) net.Conn { return newServerWriteSuccessReadErrorAfterWrite(n) }}, - {"write success,read success afer write server", func(n string) net.Conn { return newDX(n) }}, + {"empty", + []func() (*dX, error){ + func() (*dX, error) { return newDX("server"), nil }, + }, + []func(*dX) error{ + func(s *dX) error { return nil }, + }, + }, + {"close,close", + testDXCombinations( + []string{"lock", "error", "successError", "successReply"}, + []string{"lock", "error", "success"}, + ), + []func(*dX) error{ + wantResponse((*dX).Close, nil, dXErrClosed), + wantResponse((*dX).Close, dXErrClosed, dXErrClosed), + }, + }, + {"write,close,write", + testDXCombinations( + []string{"lock"}, + []string{"lock", "error", "success"}, + ), + []func(*dX) error{ + wantBlock(write(), dXErrClosed), + wantResponse((*dX).Close, nil, dXErrClosed), + wantResponse(write(), dXErrClosed, dXErrClosed), + }, + }, + {"write,close,write", + testDXCombinations( + []string{"error"}, + []string{"lock", "error", "success"}, + ), + []func(*dX) error{ + wantResponse(write(), dXErrWrite, dXErrClosed), + wantResponse((*dX).Close, nil, dXErrClosed), + wantResponse(write(), dXErrClosed, dXErrClosed), + }, + }, + {"write,close,write", + testDXCombinations( + []string{"successError", "successReply"}, + []string{"lock", "error", "success"}, + ), + []func(*dX) error{ + wantResponse(write(), nil, dXErrClosed), + wantResponse((*dX).Close, nil, dXErrClosed), + wantResponse(write(), dXErrClosed, dXErrClosed), + }, + }, + {"read,close,read", + testDXCombinations( + []string{"lock", "error", "successError", "successReply"}, + []string{"lock", "error", "success"}, + ), + []func(*dX) error{ + wantBlock(read(), io.EOF), + wantResponse((*dX).Close, nil, dXErrClosed), + wantResponse(read(), io.EOF, io.EOF), + }, + }, + {"write,read", + testDXCombinations( + []string{"lock"}, + []string{"lock", "error", "success"}, + ), + []func(*dX) error{ + wantBlock(write(), dXErrClosed), + wantBlock(read(), io.EOF), + }, + }, + {"write,read", + testDXCombinations( + []string{"error"}, + []string{"lock", "error", "success"}, + ), + []func(*dX) error{ + wantResponse(write(), dXErrWrite, dXErrClosed), + wantBlock(read(), io.EOF), + }, + }, + {"write,read", + testDXCombinations( + []string{"successError"}, + []string{"lock"}, + ), + []func(*dX) error{ + wantResponse(write(), nil, dXErrClosed), + wantBlock(read(), io.EOF), + }, + }, + {"write,read", + testDXCombinations( + []string{"successError"}, + []string{"error"}, + ), + []func(*dX) error{ + wantResponse(write(), nil, dXErrClosed), + wantResponse(read(), dXErrRead, io.EOF), + }, + }, + {"write,read", + testDXCombinations( + []string{"successError"}, + []string{"success"}, + ), + []func(*dX) error{ + wantResponse(write(), nil, dXErrClosed), + wantResponse(readSuccess(1, true), nil, io.EOF), + }, + }, + {"write,read", + testDXCombinations( + []string{"successReply"}, + []string{"success"}, + ), + []func(*dX) error{ + wantResponse(write(), nil, dXErrClosed), + wantResponse(readSuccess(1, false), nil, io.EOF), + }, + }, } for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - serverConn := tc.serverConstuctor(tc.name) - defer serverConn.Close() - - defer leaksMonitor().checkTesting(t) + t.Run(tc.description, func(t *testing.T) { + defer leaksMonitor(tc.description).checkTesting(t) + + for _, server := range tc.servers { + s, err := server() + if err != nil { + t.Error(err) + continue + } + if s == nil { + t.Error("nil server in testcase") + continue + } - c, err := postNewConn(&Conn{conn: serverConn}) - if err != nil { - t.Fatalf("connect error: %v", err) + t.Run(s.LocalAddr().String(), func(t *testing.T) { + defer leaksMonitor(s.LocalAddr().String()).checkTesting(t) + for _, action := range tc.actions { + if err := action(s); err != nil { + t.Error(err) + break + } + } + s.Close() + }) } - //t.Logf("connection to server created: %v", c) + }) + } +} - closeErr := make(chan struct{}) +func TestConnOnNonBlockingDummyXServer(t *testing.T) { + timeout := time.Millisecond + wantResponse := func(action func(*Conn) error, want, block error) func(*Conn) error { + return func(c *Conn) error { + actionResult := make(chan error) + timedOut := make(chan struct{}) go func() { - //t.Logf("closing connection to server") - c.Close() - close(closeErr) + err := action(c) + select { + case <-timedOut: + if err != block { + t.Errorf("after unblocking, action result=%v, want %v", err, block) + } + case actionResult <- err: + } }() - closeTimeout := time.Second select { - case <-closeErr: - //t.Logf("connection to server closed") - case <-time.After(closeTimeout): - t.Errorf("*Conn.Close() not responded for %v", closeTimeout) + case err := <-actionResult: + if err != want { + return errors.New(fmt.Sprintf("action result=%v, want %v", err, want)) + } + case <-time.After(timeout): + close(timedOut) + return errors.New(fmt.Sprintf("action did not respond for %v, result want %v", timeout, want)) } - }) + return nil + } + } + crequest := func(checked, reply bool) func(*Conn) error { + return func(c *Conn) error { + cookie := c.NewCookie(checked, reply) + c.NewRequest([]byte("crequest"), cookie) + _, err := cookie.Reply() + return err + } } + testCases := []struct { + description string + servers []func() (*dX, error) + actions []func(*Conn) error + }{ + {"cclose", + testDXCombinations( + []string{"successError", "successReply"}, + []string{"success"}, + ), + []func(*Conn) error{}, + }, + {"crequest", + testDXCombinations([]string{"successError"}, []string{"success"}), + []func(*Conn) error{ + wantResponse(crequest(true, true), dXError{1}, io.ErrShortWrite), + }, + }, + {"crequest", + testDXCombinations([]string{"successReply"}, []string{"success"}), + []func(*Conn) error{ + wantResponse(crequest(true, true), nil, io.ErrShortWrite), + }, + }, + // sometimes panic on unfixed branch - close of closed channel + {"cclose", + testDXCombinations([]string{"error"}, []string{"error", "success"}), + []func(*Conn) error{}, + }, + } + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + tclm := leaksMonitor("test case " + tc.description) + defer tclm.checkTesting(t) + + for _, server := range tc.servers { + s, err := server() + if err != nil { + t.Error(err) + continue + } + if s == nil { + t.Error("nil *dX in testcase") + continue + } + + t.Run(s.LocalAddr().String(), func(t *testing.T) { + sclm := leaksMonitor(s.LocalAddr().String()+" after sever close", tclm) + defer sclm.checkTesting(t) + + c, err := postNewConn(&Conn{conn: s}) + if err != nil { + t.Errorf("connect to dummy server error: %v", err) + return + } + + rlm := leaksMonitor(c.conn.LocalAddr().String() + " after actions end") + for _, action := range tc.actions { + if err := action(c); err != nil { + t.Error(err) + break + } + } + c.Close() + if err := wantResponse( + func(c *Conn) error { + if ev, err := c.WaitForEvent(); ev != nil || err != nil { + return fmt.Errorf("after (*Conn).Close, (*Conn).WaitForEvent() = (%v,%v), want (nil,nil)", ev, err) + } + return nil + }, + nil, + io.ErrShortWrite, + )(c); err != nil { + t.Error(err) + } + rlm.checkTesting(t) + }) + + s.Close() + } + }) + } } -- cgit v1.2.3 From 94eccf489f8ffe493c8a6b98571a207e62663639 Mon Sep 17 00:00:00 2001 From: jEzEk Date: Mon, 22 Oct 2018 19:12:22 +0200 Subject: test on dummy X un/checked with/out reply,refactor --- dummyNetConn.go | 261 ++++++++++++++++ dummyNetConn_test.go | 273 +++++++++++++++++ xgb_test.go | 816 +++++++++++---------------------------------------- 3 files changed, 704 insertions(+), 646 deletions(-) create mode 100644 dummyNetConn.go create mode 100644 dummyNetConn_test.go diff --git a/dummyNetConn.go b/dummyNetConn.go new file mode 100644 index 0000000..91bae4f --- /dev/null +++ b/dummyNetConn.go @@ -0,0 +1,261 @@ +package xgb + +import ( + "bytes" + "errors" + "io" + "net" + "time" +) + +type dAddr struct { + s string +} + +func (_ dAddr) Network() string { return "dummy" } +func (a dAddr) String() string { return a.s } + +var ( + dNCErrNotImplemented = errors.New("command not implemented") + dNCErrClosed = errors.New("server closed") + dNCErrWrite = errors.New("server write failed") + dNCErrRead = errors.New("server read failed") + dNCErrResponse = errors.New("server response error") +) + +type dNCIoResult struct { + n int + err error +} +type dNCIo struct { + b []byte + result chan dNCIoResult +} + +type dNCCWriteLock struct{} +type dNCCWriteUnlock struct{} +type dNCCWriteError struct{} +type dNCCWriteSuccess struct{} +type dNCCReadLock struct{} +type dNCCReadUnlock struct{} +type dNCCReadError struct{} +type dNCCReadSuccess struct{} + +// dummy net.Conn interface. Needs to be constructed via newDummyNetConn([...]) function. +type dNC struct { + reply func([]byte) []byte + addr dAddr + in, out chan dNCIo + control chan interface{} + done chan struct{} +} + +// Results running dummy server, satisfying net.Conn interface for test purposes. +// 'name' parameter will be returned via (*dNC).Local/RemoteAddr().String() +// 'reply' parameter function will be runned only on successful (*dNC).Write(b) with 'b' as parameter to 'reply'. The result will be stored in internal buffer and can be retrieved later via (*dNC).Read([...]) method. +// It is users responsibility to stop and clean up resources with (*dNC).Close, if not needed anymore. +// By default, the (*dNC).Write([...]) and (*dNC).Read([...]) methods are unlocked and will not result in error. +//TODO make (*dNC).SetDeadline, (*dNC).SetReadDeadline, (*dNC).SetWriteDeadline work proprely. +func newDummyNetConn(name string, reply func([]byte) []byte) *dNC { + + s := &dNC{ + reply, + dAddr{name}, + make(chan dNCIo), make(chan dNCIo), + make(chan interface{}), + make(chan struct{}), + } + + in, out := s.in, chan dNCIo(nil) + buf := &bytes.Buffer{} + errorRead, errorWrite := false, false + lockRead := false + + go func() { + defer close(s.done) + for { + select { + case dxsio := <-in: + if errorWrite { + dxsio.result <- dNCIoResult{0, dNCErrWrite} + break + } + + response := s.reply(dxsio.b) + + buf.Write(response) + dxsio.result <- dNCIoResult{len(dxsio.b), nil} + + if !lockRead && buf.Len() > 0 && out == nil { + out = s.out + } + case dxsio := <-out: + if errorRead { + dxsio.result <- dNCIoResult{0, dNCErrRead} + break + } + + n, err := buf.Read(dxsio.b) + dxsio.result <- dNCIoResult{n, err} + + if buf.Len() == 0 { + out = nil + } + case ci := <-s.control: + if ci == nil { + return + } + switch ci.(type) { + case dNCCWriteLock: + in = nil + case dNCCWriteUnlock: + in = s.in + case dNCCWriteError: + errorWrite = true + case dNCCWriteSuccess: + errorWrite = false + case dNCCReadLock: + out = nil + lockRead = true + case dNCCReadUnlock: + lockRead = false + if buf.Len() > 0 && out == nil { + out = s.out + } + case dNCCReadError: + errorRead = true + case dNCCReadSuccess: + errorRead = false + default: + } + } + } + }() + return s +} + +// Shuts down dummy net.Conn server. Every blocking or future method calls will do nothing and result in error. +// Result will be dNCErrClosed if server was allready closed. +// Server can not be unclosed. +func (s *dNC) Close() error { + select { + case s.control <- nil: + <-s.done + return nil + case <-s.done: + } + return dNCErrClosed +} + +// Performs a write action to server. +// If not locked by (*dNC).WriteLock, it results in error or success. If locked, this method will block until unlocked, or closed. +// +// This method can be set to result in error or success, via (*dNC).WriteError() or (*dNC).WriteSuccess() methods. +// +// If setted to result in error, the 'reply' function will NOT be called and internal buffer will NOT increasethe. +// Result will be (0, dNCErrWrite). +// +// If setted to result in success, the 'reply' function will be called and its result will be writen to internal buffer. +// If there is something in the internal buffer, the (*dNC).Read([...]) will be unblocked (if not previously locked with (*dNC).ReadLock). +// Result will be (len(b), nil) +// +// If server was closed previously, result will be (0, dNCErrClosed). +func (s *dNC) Write(b []byte) (int, error) { + resChan := make(chan dNCIoResult) + select { + case s.in <- dNCIo{b, resChan}: + res := <-resChan + return res.n, res.err + case <-s.done: + } + return 0, dNCErrClosed +} + +// Performs a read action from server. +// If locked by (*dNC).ReadLock(), this method will block until unlocked with (*dNC).ReadUnlock(), or server closes. +// +// If not locked, this method can be setted to result imidiatly in error, will block if internal buffer is empty or will perform an read operation from internal buffer. +// +// If setted to result in error via (*dNC).ReadError(), the result will be (0, dNCErrWrite). +// +// If not locked and not setted to result in error via (*dNC).ReadSuccess(), this method will block until internall buffer is not empty, than it returns the result of the buffer read operation via (*bytes.Buffer).Read([...]). +// If the internal buffer is empty after this method, all follwing (*dNC).Read([...]), requests will block until internall buffer is filled after successful write requests. +// +// If server was closed previously, result will be (0, io.EOF). +func (s *dNC) Read(b []byte) (int, error) { + resChan := make(chan dNCIoResult) + select { + case s.out <- dNCIo{b, resChan}: + res := <-resChan + return res.n, res.err + case <-s.done: + } + return 0, io.EOF +} +func (s *dNC) LocalAddr() net.Addr { return s.addr } +func (s *dNC) RemoteAddr() net.Addr { return s.addr } +func (s *dNC) SetDeadline(t time.Time) error { return dNCErrNotImplemented } +func (s *dNC) SetReadDeadline(t time.Time) error { return dNCErrNotImplemented } +func (s *dNC) SetWriteDeadline(t time.Time) error { return dNCErrNotImplemented } + +func (s *dNC) Control(i interface{}) error { + select { + case s.control <- i: + return nil + case <-s.done: + } + return dNCErrClosed +} + +// Locks writing. All write requests will be blocked until write is unlocked with (*dNC).WriteUnlock, or server closes. +func (s *dNC) WriteLock() error { + return s.Control(dNCCWriteLock{}) +} + +// Unlocks writing. All blocked write requests until now will be accepted. +func (s *dNC) WriteUnlock() error { + return s.Control(dNCCWriteUnlock{}) +} + +// Unlocks writing and makes (*dNC).Write to result (0, dNCErrWrite). +func (s *dNC) WriteError() error { + if err := s.WriteUnlock(); err != nil { + return err + } + return s.Control(dNCCWriteError{}) +} + +// Unlocks writing and makes (*dNC).Write([...]) not result in error. See (*dNC).Write for details. +func (s *dNC) WriteSuccess() error { + if err := s.WriteUnlock(); err != nil { + return err + } + return s.Control(dNCCWriteSuccess{}) +} + +// Locks reading. All read requests will be blocked until read is unlocked with (*dNC).ReadUnlock, or server closes. +// (*dNC).Read([...]) wil block even after successful write. +func (s *dNC) ReadLock() error { + return s.Control(dNCCReadLock{}) +} + +// Unlocks reading. If the internall buffer is not empty, next read will not block. +func (s *dNC) ReadUnlock() error { + return s.Control(dNCCReadUnlock{}) +} + +// Unlocks read and makes every blocked and following (*dNC).Read([...]) imidiatly result in error. See (*dNC).Read for details. +func (s *dNC) ReadError() error { + if err := s.ReadUnlock(); err != nil { + return err + } + return s.Control(dNCCReadError{}) +} + +// Unlocks read and makes every blocked and following (*dNC).Read([...]) requests be handled, if according to internal buffer. See (*dNC).Read for details. +func (s *dNC) ReadSuccess() error { + if err := s.ReadUnlock(); err != nil { + return err + } + return s.Control(dNCCReadSuccess{}) +} diff --git a/dummyNetConn_test.go b/dummyNetConn_test.go new file mode 100644 index 0000000..94691be --- /dev/null +++ b/dummyNetConn_test.go @@ -0,0 +1,273 @@ +package xgb + +import ( + "errors" + "fmt" + "io" + "reflect" + "testing" + "time" +) + +func TestDummyNetConn(t *testing.T) { + ioStatesPairGenerator := func(writeStates, readStates []string) []func() (*dNC, error) { + writeSetters := map[string]func(*dNC) error{ + "lock": (*dNC).WriteLock, + "error": (*dNC).WriteError, + "success": (*dNC).WriteSuccess, + } + readSetters := map[string]func(*dNC) error{ + "lock": (*dNC).ReadLock, + "error": (*dNC).ReadError, + "success": (*dNC).ReadSuccess, + } + + res := []func() (*dNC, error){} + for _, writeState := range writeStates { + writeState, writeSetter := writeState, writeSetters[writeState] + if writeSetter == nil { + panic("unknown write state: " + writeState) + continue + } + for _, readState := range readStates { + readState, readSetter := readState, readSetters[readState] + if readSetter == nil { + panic("unknown read state: " + readState) + continue + } + res = append(res, func() (*dNC, error) { + + // loopback server + s := newDummyNetConn("w:"+writeState+";r:"+readState, func(b []byte) []byte { return b }) + + if err := readSetter(s); err != nil { + s.Close() + return nil, errors.New("set read " + readState + " error: " + err.Error()) + } + + if err := writeSetter(s); err != nil { + s.Close() + return nil, errors.New("set write " + writeState + " error: " + err.Error()) + } + + return s, nil + }) + } + } + return res + } + + timeout := time.Millisecond + wantResponse := func(action func(*dNC) error, want, block error) func(*dNC) error { + return func(s *dNC) error { + actionResult := make(chan error) + timedOut := make(chan struct{}) + go func() { + err := action(s) + select { + case <-timedOut: + if err != block { + t.Errorf("after unblocking, action result=%v, want %v", err, block) + } + case actionResult <- err: + } + }() + select { + case err := <-actionResult: + if err != want { + return errors.New(fmt.Sprintf("action result=%v, want %v", err, want)) + } + case <-time.After(timeout): + close(timedOut) + return errors.New(fmt.Sprintf("action did not respond for %v, result want %v", timeout, want)) + } + return nil + } + } + wantBlock := func(action func(*dNC) error, unblock error) func(*dNC) error { + return func(s *dNC) error { + actionResult := make(chan error) + timedOut := make(chan struct{}) + go func() { + err := action(s) + select { + case <-timedOut: + if err != unblock { + t.Errorf("after unblocking, action result=%v, want %v", err, unblock) + } + case actionResult <- err: + } + }() + select { + case err := <-actionResult: + return errors.New(fmt.Sprintf("action result=%v, want to be blocked", err)) + case <-time.After(timeout): + close(timedOut) + } + return nil + } + } + write := func(b string) func(*dNC) error { + return func(s *dNC) error { + n, err := s.Write([]byte(b)) + if err == nil && n != len(b) { + return errors.New("Write returned nil error, but not everything was written") + } + return err + } + } + read := func(b string) func(*dNC) error { + return func(s *dNC) error { + r := make([]byte, len(b)) + n, err := s.Read(r) + if err == nil { + if n != len(b) { + return errors.New("Read returned nil error, but not everything was read") + } + if !reflect.DeepEqual(r, []byte(b)) { + return errors.New("Read=\"" + string(r) + "\", want \"" + string(b) + "\"") + } + } + return err + } + } + + testCases := []struct { + description string + servers []func() (*dNC, error) + actions []func(*dNC) error // actions per server + }{ + {"close,close", + ioStatesPairGenerator( + []string{"lock", "error", "success"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantResponse((*dNC).Close, nil, dNCErrClosed), + wantResponse((*dNC).Close, dNCErrClosed, dNCErrClosed), + }, + }, + {"write,close,write", + ioStatesPairGenerator( + []string{"lock"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantBlock(write(""), dNCErrClosed), + wantResponse((*dNC).Close, nil, dNCErrClosed), + wantResponse(write(""), dNCErrClosed, dNCErrClosed), + }, + }, + {"write,close,write", + ioStatesPairGenerator( + []string{"error"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantResponse(write(""), dNCErrWrite, dNCErrClosed), + wantResponse((*dNC).Close, nil, dNCErrClosed), + wantResponse(write(""), dNCErrClosed, dNCErrClosed), + }, + }, + {"write,close,write", + ioStatesPairGenerator( + []string{"success"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantResponse(write(""), nil, dNCErrClosed), + wantResponse((*dNC).Close, nil, dNCErrClosed), + wantResponse(write(""), dNCErrClosed, dNCErrClosed), + }, + }, + {"read,close,read", + ioStatesPairGenerator( + []string{"lock", "error", "success"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantBlock(read(""), io.EOF), + wantResponse((*dNC).Close, nil, dNCErrClosed), + wantResponse(read(""), io.EOF, io.EOF), + }, + }, + {"write,read", + ioStatesPairGenerator( + []string{"lock"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantBlock(write("1"), dNCErrClosed), + wantBlock(read("1"), io.EOF), + }, + }, + {"write,read", + ioStatesPairGenerator( + []string{"error"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantResponse(write("1"), dNCErrWrite, dNCErrClosed), + wantBlock(read("1"), io.EOF), + }, + }, + {"write,read", + ioStatesPairGenerator( + []string{"success"}, + []string{"lock"}, + ), + []func(*dNC) error{ + wantResponse(write("1"), nil, dNCErrClosed), + wantBlock(read("1"), io.EOF), + }, + }, + {"write,read", + ioStatesPairGenerator( + []string{"success"}, + []string{"error"}, + ), + []func(*dNC) error{ + wantResponse(write("1"), nil, dNCErrClosed), + wantResponse(read("1"), dNCErrRead, io.EOF), + }, + }, + {"write,read", + ioStatesPairGenerator( + []string{"success"}, + []string{"success"}, + ), + []func(*dNC) error{ + wantResponse(write("1"), nil, dNCErrClosed), + wantResponse(read("1"), nil, io.EOF), + }, + }, + } + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + defer leaksMonitor(tc.description).checkTesting(t) + + for _, server := range tc.servers { + s, err := server() + if err != nil { + t.Error(err) + continue + } + if s == nil { + t.Error("nil server in testcase") + continue + } + + t.Run(s.LocalAddr().String(), func(t *testing.T) { + defer leaksMonitor(s.LocalAddr().String()).checkTesting(t) + for _, action := range tc.actions { + if err := action(s); err != nil { + t.Error(err) + break + } + } + s.Close() + }) + } + }) + } +} diff --git a/xgb_test.go b/xgb_test.go index 5540e5c..d6ddb13 100644 --- a/xgb_test.go +++ b/xgb_test.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "net" "regexp" "runtime" "strconv" @@ -14,320 +13,6 @@ import ( "time" ) -type addr struct { - s string -} - -func (_ addr) Network() string { return "dummy" } -func (a addr) String() string { return a.s } - -type errTimeout struct{ error } - -func (_ errTimeout) Timeout() bool { return true } - -var ( - dXErrNotImplemented = errors.New("command not implemented") - dXErrClosed = errors.New("server closed") - dXErrWrite = errors.New("server write failed") - dXErrRead = errors.New("server read failed") - dXErrResponse = errors.New("server response error") -) - -type dXIoResult struct { - n int - err error -} -type dXIo struct { - b []byte - result chan dXIoResult -} - -type dXCSendEvent struct{} -type dXCWriteLock struct{} -type dXCWriteUnlock struct{} -type dXCWriteError struct{} -type dXCWriteSuccess struct{ errorResponse bool } -type dXCReadLock struct{} -type dXCReadUnlock struct{} -type dXCReadError struct{} -type dXCReadSuccess struct{} - -type dXEvent struct{} - -func (_ dXEvent) Bytes() []byte { return nil } -func (_ dXEvent) String() string { return "dummy X server event" } - -type dXError struct { - seqId uint16 -} - -func (e dXError) SequenceId() uint16 { return e.seqId } -func (_ dXError) BadId() uint32 { return 0 } -func (_ dXError) Error() string { return "dummy X server error reply" } - -// dummy X server implementing net.Conn interface, -type dX struct { - addr addr - in, out chan dXIo - control chan interface{} - done chan struct{} -} - -// Results running dummy X server, satisfying net.Conn interface for test purposes. -// It is users responsibility to stop and clean up resources with (*dX).Close, if not needed anymore. -// By default, the read and write method are unlocked and will not result in error and read response will be a non error X reply (like with (*dX).WriteSuccess(false), (*dX).ReadSuccess()). -//TODO make (*dX).SetDeadline, (*dX).SetReadDeadline, (*dX).SetWriteDeadline work proprely. -func newDX(name string) *dX { - s := &dX{ - addr{name}, - make(chan dXIo), make(chan dXIo), - make(chan interface{}), - make(chan struct{}), - } - - seqId := uint16(1) - incrementSequenceId := func() { - // this has to be the same algorithm as in (*Conn).generateSeqIds - if seqId == uint16((1<<16)-1) { - seqId = 0 - } else { - seqId++ - } - } - - in, out := s.in, chan dXIo(nil) - buf := &bytes.Buffer{} - errorRead, errorWrite, errorResponse := false, false, false - lockRead := false - - NewErrorFuncs[255] = func(buf []byte) Error { - return dXError{Get16(buf[2:])} - } - NewEventFuncs[128&127] = func(buf []byte) Event { - return dXEvent{} - } - - go func() { - defer close(s.done) - for { - select { - case dxsio := <-in: - if errorWrite { - dxsio.result <- dXIoResult{0, dXErrWrite} - break - } - - response := make([]byte, 32) - if errorResponse { // response will be error - response[0] = 0 // error - response[1] = 255 // error function - } else { // response will by reply with no additional reply - response[0] = 1 // reply - } - Put16(response[2:], seqId) // sequence number - incrementSequenceId() - - buf.Write(response) - dxsio.result <- dXIoResult{len(dxsio.b), nil} - - if !lockRead && out == nil { - out = s.out - } - case dxsio := <-out: - if errorRead { - dxsio.result <- dXIoResult{0, dXErrRead} - break - } - - n, err := buf.Read(dxsio.b) - dxsio.result <- dXIoResult{n, err} - - if buf.Len() == 0 { - out = nil - } - case ci := <-s.control: - if ci == nil { - return - } - switch cs := ci.(type) { - case dXCSendEvent: - response := make([]byte, 32) - response[0] = 128 - buf.Write(response) - - if !lockRead && out == nil { - out = s.out - } - case dXCWriteLock: - in = nil - case dXCWriteUnlock: - in = s.in - case dXCWriteError: - errorWrite = true - case dXCWriteSuccess: - errorWrite = false - errorResponse = cs.errorResponse - case dXCReadLock: - out = nil - lockRead = true - case dXCReadUnlock: - lockRead = false - if buf.Len() > 0 && out == nil { - out = s.out - } - case dXCReadError: - errorRead = true - case dXCReadSuccess: - errorRead = false - default: - } - } - } - }() - return s -} - -// Shuts down dummy X server. Every blocking or future method calls will do nothing and result in error. -// Result will be dXErrClosed if server was allready closed. -// Server can not be unclosed. -func (s *dX) Close() error { - select { - case s.control <- nil: - <-s.done - return nil - case <-s.done: - } - return dXErrClosed -} - -// Imitates write action to X server. -// If not locked by (*dX).WriteLock, it results in error or success. -// -// If Write errors, the second result parameter will be an error {dXErrWrite|dXErrClosed}, the resulting first parameter will be 0, -// no response will be generated and the internal sequence number will not be incremented. -// -// If Write succeedes, it results in (len(b), nil), the (*dX).Read will be unblocked (if not locked with (*dX).ReadLock), -// an [32]byte response will be written to buffer from which (*dX).Read reads, -// with sequence number and proper X response type (error or reply) and the internal sequence number will be increased. -// -// If server was closed previously, result will be (0, dXErrClosed). -func (s *dX) Write(b []byte) (int, error) { - resChan := make(chan dXIoResult) - //fmt.Printf("(*dX).Write: got write request: %v\n", b) - select { - case s.in <- dXIo{b, resChan}: - //fmt.Printf("(*dX).Write: input channel has accepted request\n") - res := <-resChan - //fmt.Printf("(*dX).Write: got result: %v\n", res) - return res.n, res.err - case <-s.done: - } - //fmt.Printf("(*dX).Write: server was closed\n") - return 0, dXErrClosed -} - -// Imitates read action from X server. -// If locked by (*dX).ReadLock, read will block until unlocking with (*dX).ReadUnlock, or server closes. -// -// If not locked, Read will result in error, or block until internal read buffer is not empty, depending on internal state. -// The internal state can be modified via (*dX).ReadError, or (*dX).ReadSuccess -// ReadError makes it return (0, dXErrRead), with no changes to internal buffer or state. -// ReadSuccess makes it block until there are some write responses. After emtying the internal read buffer, all next Read requests will block untill another successful write requests. -// -// The resulting read success response type can be altered with (*dX).WriteSuccess method. -// -// If server was closed previously, result will be (0, io.EOF). -func (s *dX) Read(b []byte) (int, error) { - resChan := make(chan dXIoResult) - //fmt.Printf("(*dX).Read: got read request of length: %v\n", len(b)) - select { - case s.out <- dXIo{b, resChan}: - //fmt.Printf("(*dX).Read: output channel has accepted request\n") - res := <-resChan - //fmt.Printf("(*dX).Read: got result: %v\n", res) - //fmt.Printf("(*dX).Read: result bytes: %v\n", b) - return res.n, res.err - case <-s.done: - //fmt.Printf("(*dX).Read: server was closed\n") - } - return 0, io.EOF -} -func (s *dX) LocalAddr() net.Addr { return s.addr } -func (s *dX) RemoteAddr() net.Addr { return s.addr } -func (s *dX) SetDeadline(t time.Time) error { return dXErrNotImplemented } -func (s *dX) SetReadDeadline(t time.Time) error { return dXErrNotImplemented } -func (s *dX) SetWriteDeadline(t time.Time) error { return dXErrNotImplemented } - -func (s *dX) Control(i interface{}) error { - select { - case s.control <- i: - return nil - case <-s.done: - } - return dXErrClosed -} - -// Adds an Event into read buffer. -func (s *dX) SendEvent() error { - return s.Control(dXCSendEvent{}) -} - -// Locks writing. All write requests will be blocked until write is unlocked with (*dX).WriteUnlock, or server closes. -func (s *dX) WriteLock() error { - return s.Control(dXCWriteLock{}) -} - -// Unlocks writing. All blocked write requests until now will be accepted. -func (s *dX) WriteUnlock() error { - return s.Control(dXCWriteUnlock{}) -} - -// Unlocks writing and makes (*dX).Write to result (0, dXErrWrite). -func (s *dX) WriteError() error { - if err := s.WriteUnlock(); err != nil { - return err - } - return s.Control(dXCWriteError{}) -} - -// Unlocks writing and makes (*dX).Write to result (len(b), nil), with a proper X reply. -// If errorResult is true, the response will be an X error response, -// else an normal reply. See (*dX).Write for details. -func (s *dX) WriteSuccess(errorResult bool) error { - if err := s.WriteUnlock(); err != nil { - return err - } - return s.Control(dXCWriteSuccess{errorResult}) -} - -// Locks reading. All read requests will be blocked until read is unlocked with (*dX).ReadUnlock, or server closes. -// (*dX).Read wil block even after successful write. -func (s *dX) ReadLock() error { - return s.Control(dXCReadLock{}) -} - -// Unlocks reading. If there are any unresponded requests in reading buffer, read will be unblocked. -func (s *dX) ReadUnlock() error { - return s.Control(dXCReadUnlock{}) -} - -// Unlocks read and makes every blocked and following (*dX).Read requests fail. See (*dX).Read for details. -func (s *dX) ReadError() error { - if err := s.ReadUnlock(); err != nil { - return err - } - return s.Control(dXCReadError{}) -} - -// Unlocks read and makes every blocked and following (*dX).Read requests be handled, if there are any in read buffer. -// See (*dX).Read for details. -func (s *dX) ReadSuccess() error { - if err := s.ReadUnlock(); err != nil { - return err - } - return s.Control(dXCReadSuccess{}) -} - type goroutine struct { id int name string @@ -429,56 +114,23 @@ func (l *leaks) ignoreLeak(grs ...goroutine) { } } -func testDXCombinations(writeStates, readStates []string) []func() (*dX, error) { - writeSetters := map[string]func(*dX) error{ - "lock": (*dX).WriteLock, - "error": (*dX).WriteError, - "successReply": func(s *dX) error { return s.WriteSuccess(false) }, - "successError": func(s *dX) error { return s.WriteSuccess(true) }, - } - readSetters := map[string]func(*dX) error{ - "lock": (*dX).ReadLock, - "error": (*dX).ReadError, - "success": (*dX).ReadSuccess, - } +type dNCEvent struct{} - res := []func() (*dX, error){} - for _, writeState := range writeStates { - writeState, writeSetter := writeState, writeSetters[writeState] - if writeSetter == nil { - panic("unknown write state: " + writeState) - continue - } - for _, readState := range readStates { - readState, readSetter := readState, readSetters[readState] - if readSetter == nil { - panic("unknown read state: " + readState) - continue - } - res = append(res, func() (*dX, error) { - s := newDX("write=" + writeState + ",read=" + readState) - - if err := readSetter(s); err != nil { - s.Close() - return nil, errors.New("set read " + readState + " error: " + err.Error()) - } - - if err := writeSetter(s); err != nil { - s.Close() - return nil, errors.New("set write " + writeState + " error: " + err.Error()) - } +func (_ dNCEvent) Bytes() []byte { return nil } +func (_ dNCEvent) String() string { return "dummy X server event" } - return s, nil - }) - } - } - return res +type dNCError struct { + seqId uint16 } -func TestDummyXServer(t *testing.T) { +func (e dNCError) SequenceId() uint16 { return e.seqId } +func (_ dNCError) BadId() uint32 { return 0 } +func (_ dNCError) Error() string { return "dummy X server error reply" } + +func TestConnOnNonBlockingDummyXServer(t *testing.T) { timeout := time.Millisecond - wantResponse := func(action func(*dX) error, want, block error) func(*dX) error { - return func(s *dX) error { + wantResponse := func(action func(*Conn) error, want, block error) func(*Conn) error { + return func(s *Conn) error { actionResult := make(chan error) timedOut := make(chan struct{}) go func() { @@ -503,286 +155,138 @@ func TestDummyXServer(t *testing.T) { return nil } } - wantBlock := func(action func(*dX) error, unblock error) func(*dX) error { - return func(s *dX) error { - actionResult := make(chan error) - timedOut := make(chan struct{}) - go func() { - err := action(s) - select { - case <-timedOut: - if err != unblock { - t.Errorf("after unblocking, action result=%v, want %v", err, unblock) - } - case actionResult <- err: - } - }() - select { - case err := <-actionResult: - return errors.New(fmt.Sprintf("action result=%v, want to be blocked", err)) - case <-time.After(timeout): - close(timedOut) - } - return nil - } + NewErrorFuncs[255] = func(buf []byte) Error { + return dNCError{Get16(buf[2:])} } - write := func() func(*dX) error { - return func(s *dX) error { - _, err := s.Write([]byte{1}) - return err - } + NewEventFuncs[128&127] = func(buf []byte) Event { + return dNCEvent{} } - read := func() func(*dX) error { - return func(s *dX) error { - b := make([]byte, 32) - _, err := s.Read(b) - return err + checkedReply := func(wantError bool) func(*Conn) error { + request := "reply" + if wantError { + request = "error" } - } - readSuccess := func(seqId uint16, errorResponse bool) func(*dX) error { - return func(s *dX) error { - b := make([]byte, 32) - _, err := s.Read(b) - if err != nil { - return err + return func(c *Conn) error { + cookie := c.NewCookie(true, true) + c.NewRequest([]byte(request), cookie) + _, err := cookie.Reply() + if wantError && err == nil { + return errors.New(fmt.Sprintf("checked request \"%v\" with reply resulted in nil error, want some error", request)) } - if seqId != Get16(b[2:]) { - return errors.New(fmt.Sprintf("got read sequence number %d, want %d", Get16(b[2:]), seqId)) + if !wantError && err != nil { + return errors.New(fmt.Sprintf("checked request \"%v\" with reply resulted in error %v, want nil error", request, err)) } - b0, desc := 0, "error" - if !errorResponse { - b0, desc = 1, "valid" + return nil + } + } + checkedNoreply := func(wantError bool) func(*Conn) error { + request := "noreply" + if wantError { + request = "error" + } + return func(c *Conn) error { + cookie := c.NewCookie(true, false) + c.NewRequest([]byte(request), cookie) + err := cookie.Check() + if wantError && err == nil { + return errors.New(fmt.Sprintf("checked request \"%v\" with no reply resulted in nil error, want some error", request)) } - if int(b[0]) != b0 { - return errors.New(fmt.Sprintf("response is not an %s reply: %v", desc, b)) + if !wantError && err != nil { + return errors.New(fmt.Sprintf("checked request \"%v\" with no reply resulted in error %v, want nil error", request, err)) } return nil } } - - testCases := []struct { - description string - servers []func() (*dX, error) - actions []func(*dX) error // actions per server - }{ - {"empty", - []func() (*dX, error){ - func() (*dX, error) { return newDX("server"), nil }, - }, - []func(*dX) error{ - func(s *dX) error { return nil }, - }, - }, - {"close,close", - testDXCombinations( - []string{"lock", "error", "successError", "successReply"}, - []string{"lock", "error", "success"}, - ), - []func(*dX) error{ - wantResponse((*dX).Close, nil, dXErrClosed), - wantResponse((*dX).Close, dXErrClosed, dXErrClosed), - }, - }, - {"write,close,write", - testDXCombinations( - []string{"lock"}, - []string{"lock", "error", "success"}, - ), - []func(*dX) error{ - wantBlock(write(), dXErrClosed), - wantResponse((*dX).Close, nil, dXErrClosed), - wantResponse(write(), dXErrClosed, dXErrClosed), - }, - }, - {"write,close,write", - testDXCombinations( - []string{"error"}, - []string{"lock", "error", "success"}, - ), - []func(*dX) error{ - wantResponse(write(), dXErrWrite, dXErrClosed), - wantResponse((*dX).Close, nil, dXErrClosed), - wantResponse(write(), dXErrClosed, dXErrClosed), - }, - }, - {"write,close,write", - testDXCombinations( - []string{"successError", "successReply"}, - []string{"lock", "error", "success"}, - ), - []func(*dX) error{ - wantResponse(write(), nil, dXErrClosed), - wantResponse((*dX).Close, nil, dXErrClosed), - wantResponse(write(), dXErrClosed, dXErrClosed), - }, - }, - {"read,close,read", - testDXCombinations( - []string{"lock", "error", "successError", "successReply"}, - []string{"lock", "error", "success"}, - ), - []func(*dX) error{ - wantBlock(read(), io.EOF), - wantResponse((*dX).Close, nil, dXErrClosed), - wantResponse(read(), io.EOF, io.EOF), - }, - }, - {"write,read", - testDXCombinations( - []string{"lock"}, - []string{"lock", "error", "success"}, - ), - []func(*dX) error{ - wantBlock(write(), dXErrClosed), - wantBlock(read(), io.EOF), - }, - }, - {"write,read", - testDXCombinations( - []string{"error"}, - []string{"lock", "error", "success"}, - ), - []func(*dX) error{ - wantResponse(write(), dXErrWrite, dXErrClosed), - wantBlock(read(), io.EOF), - }, - }, - {"write,read", - testDXCombinations( - []string{"successError"}, - []string{"lock"}, - ), - []func(*dX) error{ - wantResponse(write(), nil, dXErrClosed), - wantBlock(read(), io.EOF), - }, - }, - {"write,read", - testDXCombinations( - []string{"successError"}, - []string{"error"}, - ), - []func(*dX) error{ - wantResponse(write(), nil, dXErrClosed), - wantResponse(read(), dXErrRead, io.EOF), - }, - }, - {"write,read", - testDXCombinations( - []string{"successError"}, - []string{"success"}, - ), - []func(*dX) error{ - wantResponse(write(), nil, dXErrClosed), - wantResponse(readSuccess(1, true), nil, io.EOF), - }, - }, - {"write,read", - testDXCombinations( - []string{"successReply"}, - []string{"success"}, - ), - []func(*dX) error{ - wantResponse(write(), nil, dXErrClosed), - wantResponse(readSuccess(1, false), nil, io.EOF), - }, - }, - } - for _, tc := range testCases { - t.Run(tc.description, func(t *testing.T) { - defer leaksMonitor(tc.description).checkTesting(t) - - for _, server := range tc.servers { - s, err := server() - if err != nil { - t.Error(err) - continue - } - if s == nil { - t.Error("nil server in testcase") - continue - } - - t.Run(s.LocalAddr().String(), func(t *testing.T) { - defer leaksMonitor(s.LocalAddr().String()).checkTesting(t) - for _, action := range tc.actions { - if err := action(s); err != nil { - t.Error(err) - break - } - } - s.Close() - }) + uncheckedReply := func(wantError bool) func(*Conn) error { + request := "reply" + if wantError { + request = "error" + } + return func(c *Conn) error { + cookie := c.NewCookie(false, true) + c.NewRequest([]byte(request), cookie) + _, err := cookie.Reply() + if err != nil { + return errors.New(fmt.Sprintf("unchecked request \"%v\" with reply resulted in %v, want nil", request, err)) } - }) + return nil + } } -} - -func TestConnOnNonBlockingDummyXServer(t *testing.T) { - timeout := time.Millisecond - wantResponse := func(action func(*Conn) error, want, block error) func(*Conn) error { + uncheckedNoreply := func(wantError bool) func(*Conn) error { + request := "noreply" + if wantError { + request = "error" + } return func(c *Conn) error { - actionResult := make(chan error) - timedOut := make(chan struct{}) - go func() { - err := action(c) - select { - case <-timedOut: - if err != block { - t.Errorf("after unblocking, action result=%v, want %v", err, block) - } - case actionResult <- err: - } - }() - select { - case err := <-actionResult: - if err != want { - return errors.New(fmt.Sprintf("action result=%v, want %v", err, want)) - } - case <-time.After(timeout): - close(timedOut) - return errors.New(fmt.Sprintf("action did not respond for %v, result want %v", timeout, want)) - } + cookie := c.NewCookie(false, false) + c.NewRequest([]byte(request), cookie) return nil } } - crequest := func(checked, reply bool) func(*Conn) error { + event := func() func(*Conn) error { return func(c *Conn) error { - cookie := c.NewCookie(checked, reply) - c.NewRequest([]byte("crequest"), cookie) - _, err := cookie.Reply() + _, err := c.conn.Write([]byte("event")) + if err != nil { + return errors.New(fmt.Sprintf("asked dummy server to send event, but resulted in error: %v\n", err)) + } return err } } + waitEvent := func(wantError bool) func(*Conn) error { + return func(c *Conn) error { + _, err := c.WaitForEvent() + if wantError && err == nil { + return errors.New(fmt.Sprintf("wait for event resulted in nil error, want some error")) + } + if !wantError && err != nil { + return errors.New(fmt.Sprintf("wait for event resulted in error %v, want nil error", err)) + } + return nil + } + } testCases := []struct { description string - servers []func() (*dX, error) actions []func(*Conn) error }{ - {"cclose", - testDXCombinations( - []string{"successError", "successReply"}, - []string{"success"}, - ), + {"close", []func(*Conn) error{}, }, - {"crequest", - testDXCombinations([]string{"successError"}, []string{"success"}), + {"checked requests with reply", []func(*Conn) error{ - wantResponse(crequest(true, true), dXError{1}, io.ErrShortWrite), + checkedReply(false), + checkedReply(true), + checkedReply(false), + checkedReply(true), }, }, - {"crequest", - testDXCombinations([]string{"successReply"}, []string{"success"}), + {"checked requests no reply", []func(*Conn) error{ - wantResponse(crequest(true, true), nil, io.ErrShortWrite), + checkedNoreply(false), + checkedNoreply(true), + checkedNoreply(false), + checkedNoreply(true), }, }, - // sometimes panic on unfixed branch - close of closed channel - {"cclose", - testDXCombinations([]string{"error"}, []string{"error", "success"}), - []func(*Conn) error{}, + {"unchecked requests with reply", + []func(*Conn) error{ + uncheckedReply(false), + uncheckedReply(true), + waitEvent(true), + uncheckedReply(false), + event(), + waitEvent(false), + }, + }, + {"unchecked requests no reply", + []func(*Conn) error{ + uncheckedNoreply(false), + uncheckedNoreply(true), + waitEvent(true), + uncheckedNoreply(false), + event(), + waitEvent(false), + }, }, } for _, tc := range testCases { @@ -790,52 +294,72 @@ func TestConnOnNonBlockingDummyXServer(t *testing.T) { tclm := leaksMonitor("test case " + tc.description) defer tclm.checkTesting(t) - for _, server := range tc.servers { - s, err := server() - if err != nil { - t.Error(err) - continue + seqId := uint16(1) + incrementSequenceId := func() { + // this has to be the same algorithm as in (*Conn).generateSeqIds + if seqId == uint16((1<<16)-1) { + seqId = 0 + } else { + seqId++ } - if s == nil { - t.Error("nil *dX in testcase") - continue + } + dummyXreplyer := func(request []byte) []byte { + //fmt.Printf("dummyXreplyer got request: %s\n", string(request)) + res := make([]byte, 32) + switch string(request) { + case "event": + res[0] = 128 + //fmt.Printf("dummyXreplyer sent response: %v\n", res) + return res + case "error": + res[0] = 0 // error + res[1] = 255 // error function + default: + res[0] = 1 // reply + } + Put16(res[2:], seqId) // sequence number + incrementSequenceId() + if string(request) == "noreply" { + //fmt.Printf("dummyXreplyer no response sent\n") + return nil } + //fmt.Printf("dummyXreplyer sent response: %v\n", res) + return res + } - t.Run(s.LocalAddr().String(), func(t *testing.T) { - sclm := leaksMonitor(s.LocalAddr().String()+" after sever close", tclm) - defer sclm.checkTesting(t) + sclm := leaksMonitor("after server close", tclm) + defer sclm.checkTesting(t) + s := newDummyNetConn("dummX", dummyXreplyer) + defer s.Close() - c, err := postNewConn(&Conn{conn: s}) - if err != nil { - t.Errorf("connect to dummy server error: %v", err) - return - } + c, err := postNewConn(&Conn{conn: s}) + if err != nil { + t.Errorf("connect to dummy server error: %v", err) + return + } - rlm := leaksMonitor(c.conn.LocalAddr().String() + " after actions end") - for _, action := range tc.actions { - if err := action(c); err != nil { - t.Error(err) - break - } - } - c.Close() - if err := wantResponse( - func(c *Conn) error { - if ev, err := c.WaitForEvent(); ev != nil || err != nil { - return fmt.Errorf("after (*Conn).Close, (*Conn).WaitForEvent() = (%v,%v), want (nil,nil)", ev, err) - } - return nil - }, - nil, - io.ErrShortWrite, - )(c); err != nil { - t.Error(err) + rlm := leaksMonitor("after actions end") + for _, action := range tc.actions { + if err := action(c); err != nil { + t.Error(err) + break + } + } + c.Close() + if err := wantResponse( + func(c *Conn) error { + if ev, err := c.WaitForEvent(); ev != nil || err != nil { + return fmt.Errorf("after (*Conn).Close, (*Conn).WaitForEvent() = (%v,%v), want (nil,nil)", ev, err) } - rlm.checkTesting(t) - }) - - s.Close() + return nil + }, + nil, + io.ErrShortWrite, + )(c); err != nil { + t.Error(err) } + rlm.checkTesting(t) + }) } } -- cgit v1.2.3 From e7026d385e6e9bba05fe9748151963275563cc0e Mon Sep 17 00:00:00 2001 From: jEzEk Date: Wed, 24 Oct 2018 19:20:53 +0200 Subject: testing unexpected (*Conn).conn close --- xgb_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/xgb_test.go b/xgb_test.go index d6ddb13..799990b 100644 --- a/xgb_test.go +++ b/xgb_test.go @@ -288,6 +288,17 @@ func TestConnOnNonBlockingDummyXServer(t *testing.T) { waitEvent(false), }, }, + {"unexpected conn close", + []func(*Conn) error{ + func(c *Conn) error { + c.conn.Close() + if ev, err := c.WaitForEvent(); ev != nil || err != nil { + return fmt.Errorf("after conn close WaitForEvent() = (%v, %v), want (nil, nil)", ev, err) + } + return nil + }, + }, + }, } for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { -- cgit v1.2.3 From 10df72a1f6ebc4703bef26e015c364ee155dc51f Mon Sep 17 00:00:00 2001 From: jEzEk Date: Thu, 25 Oct 2018 18:05:12 +0200 Subject: tests after (*Conn) close channel is closed --- xgb_test.go | 55 +++++++++++++++++-------------------------------------- 1 file changed, 17 insertions(+), 38 deletions(-) diff --git a/xgb_test.go b/xgb_test.go index 799990b..b10f4bf 100644 --- a/xgb_test.go +++ b/xgb_test.go @@ -4,7 +4,6 @@ import ( "bytes" "errors" "fmt" - "io" "regexp" "runtime" "strconv" @@ -129,32 +128,6 @@ func (_ dNCError) Error() string { return "dummy X server error reply" } func TestConnOnNonBlockingDummyXServer(t *testing.T) { timeout := time.Millisecond - wantResponse := func(action func(*Conn) error, want, block error) func(*Conn) error { - return func(s *Conn) error { - actionResult := make(chan error) - timedOut := make(chan struct{}) - go func() { - err := action(s) - select { - case <-timedOut: - if err != block { - t.Errorf("after unblocking, action result=%v, want %v", err, block) - } - case actionResult <- err: - } - }() - select { - case err := <-actionResult: - if err != want { - return errors.New(fmt.Sprintf("action result=%v, want %v", err, want)) - } - case <-time.After(timeout): - close(timedOut) - return errors.New(fmt.Sprintf("action did not respond for %v, result want %v", timeout, want)) - } - return nil - } - } NewErrorFuncs[255] = func(buf []byte) Error { return dNCError{Get16(buf[2:])} } @@ -295,6 +268,14 @@ func TestConnOnNonBlockingDummyXServer(t *testing.T) { if ev, err := c.WaitForEvent(); ev != nil || err != nil { return fmt.Errorf("after conn close WaitForEvent() = (%v, %v), want (nil, nil)", ev, err) } + select { + case eoe, ok := <-c.eventChan: + if ok { + return fmt.Errorf("(*Conn).eventChan should be closed by now, but is not and returns %v", eoe) + } + case <-time.After(timeout): + return fmt.Errorf("(*Conn).eventChan should be closed by now, but is not and was blocking for %v", timeout) + } return nil }, }, @@ -356,19 +337,17 @@ func TestConnOnNonBlockingDummyXServer(t *testing.T) { break } } + c.Close() - if err := wantResponse( - func(c *Conn) error { - if ev, err := c.WaitForEvent(); ev != nil || err != nil { - return fmt.Errorf("after (*Conn).Close, (*Conn).WaitForEvent() = (%v,%v), want (nil,nil)", ev, err) - } - return nil - }, - nil, - io.ErrShortWrite, - )(c); err != nil { - t.Error(err) + select { + case eoe, ok := <-c.eventChan: + if ok { + t.Errorf("(*Conn).eventChan should be closed after (*Conn),Close(), but is not and returned %v", eoe) + } + case <-time.After(timeout): + t.Errorf("(*Conn).eventChan should be closed after (*Conn),Close(), but is not and was blocking for %v", timeout) } + rlm.checkTesting(t) }) -- cgit v1.2.3 From 788010f11d65572ca0cea2caf65ade6db4a274de Mon Sep 17 00:00:00 2001 From: jEzEk Date: Fri, 26 Oct 2018 03:45:55 +0200 Subject: tests double close, close with pending requests (*Conn).Close panic recover in tests --- xgb_test.go | 62 +++++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/xgb_test.go b/xgb_test.go index b10f4bf..6931c3d 100644 --- a/xgb_test.go +++ b/xgb_test.go @@ -217,6 +217,17 @@ func TestConnOnNonBlockingDummyXServer(t *testing.T) { return nil } } + checkClosed := func(c *Conn) error { + select { + case eoe, ok := <-c.eventChan: + if ok { + return fmt.Errorf("(*Conn).eventChan should be closed, but is not and returns %v", eoe) + } + case <-time.After(timeout): + return fmt.Errorf("(*Conn).eventChan should be closed, but is not and was blocking for %v", timeout) + } + return nil + } testCases := []struct { description string @@ -225,6 +236,14 @@ func TestConnOnNonBlockingDummyXServer(t *testing.T) { {"close", []func(*Conn) error{}, }, + {"double close", + []func(*Conn) error{ + func(c *Conn) error { + c.Close() + return nil + }, + }, + }, {"checked requests with reply", []func(*Conn) error{ checkedReply(false), @@ -261,23 +280,28 @@ func TestConnOnNonBlockingDummyXServer(t *testing.T) { waitEvent(false), }, }, + {"close with pending requests", + []func(*Conn) error{ + func(c *Conn) error { + c.conn.(*dNC).ReadLock() + defer c.conn.(*dNC).ReadUnlock() + c.NewRequest([]byte("reply"), c.NewCookie(false, true)) + c.Close() + return nil + }, + checkClosed, + }, + }, {"unexpected conn close", []func(*Conn) error{ func(c *Conn) error { c.conn.Close() if ev, err := c.WaitForEvent(); ev != nil || err != nil { - return fmt.Errorf("after conn close WaitForEvent() = (%v, %v), want (nil, nil)", ev, err) - } - select { - case eoe, ok := <-c.eventChan: - if ok { - return fmt.Errorf("(*Conn).eventChan should be closed by now, but is not and returns %v", eoe) - } - case <-time.After(timeout): - return fmt.Errorf("(*Conn).eventChan should be closed by now, but is not and was blocking for %v", timeout) + return fmt.Errorf("WaitForEvent() = (%v, %v), want (nil, nil)", ev, err) } return nil }, + checkClosed, }, }, } @@ -338,14 +362,20 @@ func TestConnOnNonBlockingDummyXServer(t *testing.T) { } } - c.Close() - select { - case eoe, ok := <-c.eventChan: - if ok { - t.Errorf("(*Conn).eventChan should be closed after (*Conn),Close(), but is not and returned %v", eoe) + recovered := false + func() { + defer func() { + if err := recover(); err != nil { + t.Errorf("(*Conn).Close() panic recover: %v", err) + recovered = true + } + }() + c.Close() + }() + if !recovered { + if err := checkClosed(c); err != nil { + t.Error(err) } - case <-time.After(timeout): - t.Errorf("(*Conn).eventChan should be closed after (*Conn),Close(), but is not and was blocking for %v", timeout) } rlm.checkTesting(t) -- cgit v1.2.3 From 01e3ef92338ac79a57aa6542633770366db1ff52 Mon Sep 17 00:00:00 2001 From: jEzEk Date: Mon, 29 Oct 2018 19:41:24 +0100 Subject: refactor to testingTools.go with more tests leak testing added dummy X server replier fo dummy net.Conn tests --- dummyNetConn.go | 261 ------------------------------- dummyNetConn_test.go | 273 --------------------------------- testingTools.go | 426 +++++++++++++++++++++++++++++++++++++++++++++++++++ testingTools_test.go | 350 ++++++++++++++++++++++++++++++++++++++++++ xgb_test.go | 174 +-------------------- 5 files changed, 783 insertions(+), 701 deletions(-) delete mode 100644 dummyNetConn.go delete mode 100644 dummyNetConn_test.go create mode 100644 testingTools.go create mode 100644 testingTools_test.go diff --git a/dummyNetConn.go b/dummyNetConn.go deleted file mode 100644 index 91bae4f..0000000 --- a/dummyNetConn.go +++ /dev/null @@ -1,261 +0,0 @@ -package xgb - -import ( - "bytes" - "errors" - "io" - "net" - "time" -) - -type dAddr struct { - s string -} - -func (_ dAddr) Network() string { return "dummy" } -func (a dAddr) String() string { return a.s } - -var ( - dNCErrNotImplemented = errors.New("command not implemented") - dNCErrClosed = errors.New("server closed") - dNCErrWrite = errors.New("server write failed") - dNCErrRead = errors.New("server read failed") - dNCErrResponse = errors.New("server response error") -) - -type dNCIoResult struct { - n int - err error -} -type dNCIo struct { - b []byte - result chan dNCIoResult -} - -type dNCCWriteLock struct{} -type dNCCWriteUnlock struct{} -type dNCCWriteError struct{} -type dNCCWriteSuccess struct{} -type dNCCReadLock struct{} -type dNCCReadUnlock struct{} -type dNCCReadError struct{} -type dNCCReadSuccess struct{} - -// dummy net.Conn interface. Needs to be constructed via newDummyNetConn([...]) function. -type dNC struct { - reply func([]byte) []byte - addr dAddr - in, out chan dNCIo - control chan interface{} - done chan struct{} -} - -// Results running dummy server, satisfying net.Conn interface for test purposes. -// 'name' parameter will be returned via (*dNC).Local/RemoteAddr().String() -// 'reply' parameter function will be runned only on successful (*dNC).Write(b) with 'b' as parameter to 'reply'. The result will be stored in internal buffer and can be retrieved later via (*dNC).Read([...]) method. -// It is users responsibility to stop and clean up resources with (*dNC).Close, if not needed anymore. -// By default, the (*dNC).Write([...]) and (*dNC).Read([...]) methods are unlocked and will not result in error. -//TODO make (*dNC).SetDeadline, (*dNC).SetReadDeadline, (*dNC).SetWriteDeadline work proprely. -func newDummyNetConn(name string, reply func([]byte) []byte) *dNC { - - s := &dNC{ - reply, - dAddr{name}, - make(chan dNCIo), make(chan dNCIo), - make(chan interface{}), - make(chan struct{}), - } - - in, out := s.in, chan dNCIo(nil) - buf := &bytes.Buffer{} - errorRead, errorWrite := false, false - lockRead := false - - go func() { - defer close(s.done) - for { - select { - case dxsio := <-in: - if errorWrite { - dxsio.result <- dNCIoResult{0, dNCErrWrite} - break - } - - response := s.reply(dxsio.b) - - buf.Write(response) - dxsio.result <- dNCIoResult{len(dxsio.b), nil} - - if !lockRead && buf.Len() > 0 && out == nil { - out = s.out - } - case dxsio := <-out: - if errorRead { - dxsio.result <- dNCIoResult{0, dNCErrRead} - break - } - - n, err := buf.Read(dxsio.b) - dxsio.result <- dNCIoResult{n, err} - - if buf.Len() == 0 { - out = nil - } - case ci := <-s.control: - if ci == nil { - return - } - switch ci.(type) { - case dNCCWriteLock: - in = nil - case dNCCWriteUnlock: - in = s.in - case dNCCWriteError: - errorWrite = true - case dNCCWriteSuccess: - errorWrite = false - case dNCCReadLock: - out = nil - lockRead = true - case dNCCReadUnlock: - lockRead = false - if buf.Len() > 0 && out == nil { - out = s.out - } - case dNCCReadError: - errorRead = true - case dNCCReadSuccess: - errorRead = false - default: - } - } - } - }() - return s -} - -// Shuts down dummy net.Conn server. Every blocking or future method calls will do nothing and result in error. -// Result will be dNCErrClosed if server was allready closed. -// Server can not be unclosed. -func (s *dNC) Close() error { - select { - case s.control <- nil: - <-s.done - return nil - case <-s.done: - } - return dNCErrClosed -} - -// Performs a write action to server. -// If not locked by (*dNC).WriteLock, it results in error or success. If locked, this method will block until unlocked, or closed. -// -// This method can be set to result in error or success, via (*dNC).WriteError() or (*dNC).WriteSuccess() methods. -// -// If setted to result in error, the 'reply' function will NOT be called and internal buffer will NOT increasethe. -// Result will be (0, dNCErrWrite). -// -// If setted to result in success, the 'reply' function will be called and its result will be writen to internal buffer. -// If there is something in the internal buffer, the (*dNC).Read([...]) will be unblocked (if not previously locked with (*dNC).ReadLock). -// Result will be (len(b), nil) -// -// If server was closed previously, result will be (0, dNCErrClosed). -func (s *dNC) Write(b []byte) (int, error) { - resChan := make(chan dNCIoResult) - select { - case s.in <- dNCIo{b, resChan}: - res := <-resChan - return res.n, res.err - case <-s.done: - } - return 0, dNCErrClosed -} - -// Performs a read action from server. -// If locked by (*dNC).ReadLock(), this method will block until unlocked with (*dNC).ReadUnlock(), or server closes. -// -// If not locked, this method can be setted to result imidiatly in error, will block if internal buffer is empty or will perform an read operation from internal buffer. -// -// If setted to result in error via (*dNC).ReadError(), the result will be (0, dNCErrWrite). -// -// If not locked and not setted to result in error via (*dNC).ReadSuccess(), this method will block until internall buffer is not empty, than it returns the result of the buffer read operation via (*bytes.Buffer).Read([...]). -// If the internal buffer is empty after this method, all follwing (*dNC).Read([...]), requests will block until internall buffer is filled after successful write requests. -// -// If server was closed previously, result will be (0, io.EOF). -func (s *dNC) Read(b []byte) (int, error) { - resChan := make(chan dNCIoResult) - select { - case s.out <- dNCIo{b, resChan}: - res := <-resChan - return res.n, res.err - case <-s.done: - } - return 0, io.EOF -} -func (s *dNC) LocalAddr() net.Addr { return s.addr } -func (s *dNC) RemoteAddr() net.Addr { return s.addr } -func (s *dNC) SetDeadline(t time.Time) error { return dNCErrNotImplemented } -func (s *dNC) SetReadDeadline(t time.Time) error { return dNCErrNotImplemented } -func (s *dNC) SetWriteDeadline(t time.Time) error { return dNCErrNotImplemented } - -func (s *dNC) Control(i interface{}) error { - select { - case s.control <- i: - return nil - case <-s.done: - } - return dNCErrClosed -} - -// Locks writing. All write requests will be blocked until write is unlocked with (*dNC).WriteUnlock, or server closes. -func (s *dNC) WriteLock() error { - return s.Control(dNCCWriteLock{}) -} - -// Unlocks writing. All blocked write requests until now will be accepted. -func (s *dNC) WriteUnlock() error { - return s.Control(dNCCWriteUnlock{}) -} - -// Unlocks writing and makes (*dNC).Write to result (0, dNCErrWrite). -func (s *dNC) WriteError() error { - if err := s.WriteUnlock(); err != nil { - return err - } - return s.Control(dNCCWriteError{}) -} - -// Unlocks writing and makes (*dNC).Write([...]) not result in error. See (*dNC).Write for details. -func (s *dNC) WriteSuccess() error { - if err := s.WriteUnlock(); err != nil { - return err - } - return s.Control(dNCCWriteSuccess{}) -} - -// Locks reading. All read requests will be blocked until read is unlocked with (*dNC).ReadUnlock, or server closes. -// (*dNC).Read([...]) wil block even after successful write. -func (s *dNC) ReadLock() error { - return s.Control(dNCCReadLock{}) -} - -// Unlocks reading. If the internall buffer is not empty, next read will not block. -func (s *dNC) ReadUnlock() error { - return s.Control(dNCCReadUnlock{}) -} - -// Unlocks read and makes every blocked and following (*dNC).Read([...]) imidiatly result in error. See (*dNC).Read for details. -func (s *dNC) ReadError() error { - if err := s.ReadUnlock(); err != nil { - return err - } - return s.Control(dNCCReadError{}) -} - -// Unlocks read and makes every blocked and following (*dNC).Read([...]) requests be handled, if according to internal buffer. See (*dNC).Read for details. -func (s *dNC) ReadSuccess() error { - if err := s.ReadUnlock(); err != nil { - return err - } - return s.Control(dNCCReadSuccess{}) -} diff --git a/dummyNetConn_test.go b/dummyNetConn_test.go deleted file mode 100644 index 94691be..0000000 --- a/dummyNetConn_test.go +++ /dev/null @@ -1,273 +0,0 @@ -package xgb - -import ( - "errors" - "fmt" - "io" - "reflect" - "testing" - "time" -) - -func TestDummyNetConn(t *testing.T) { - ioStatesPairGenerator := func(writeStates, readStates []string) []func() (*dNC, error) { - writeSetters := map[string]func(*dNC) error{ - "lock": (*dNC).WriteLock, - "error": (*dNC).WriteError, - "success": (*dNC).WriteSuccess, - } - readSetters := map[string]func(*dNC) error{ - "lock": (*dNC).ReadLock, - "error": (*dNC).ReadError, - "success": (*dNC).ReadSuccess, - } - - res := []func() (*dNC, error){} - for _, writeState := range writeStates { - writeState, writeSetter := writeState, writeSetters[writeState] - if writeSetter == nil { - panic("unknown write state: " + writeState) - continue - } - for _, readState := range readStates { - readState, readSetter := readState, readSetters[readState] - if readSetter == nil { - panic("unknown read state: " + readState) - continue - } - res = append(res, func() (*dNC, error) { - - // loopback server - s := newDummyNetConn("w:"+writeState+";r:"+readState, func(b []byte) []byte { return b }) - - if err := readSetter(s); err != nil { - s.Close() - return nil, errors.New("set read " + readState + " error: " + err.Error()) - } - - if err := writeSetter(s); err != nil { - s.Close() - return nil, errors.New("set write " + writeState + " error: " + err.Error()) - } - - return s, nil - }) - } - } - return res - } - - timeout := time.Millisecond - wantResponse := func(action func(*dNC) error, want, block error) func(*dNC) error { - return func(s *dNC) error { - actionResult := make(chan error) - timedOut := make(chan struct{}) - go func() { - err := action(s) - select { - case <-timedOut: - if err != block { - t.Errorf("after unblocking, action result=%v, want %v", err, block) - } - case actionResult <- err: - } - }() - select { - case err := <-actionResult: - if err != want { - return errors.New(fmt.Sprintf("action result=%v, want %v", err, want)) - } - case <-time.After(timeout): - close(timedOut) - return errors.New(fmt.Sprintf("action did not respond for %v, result want %v", timeout, want)) - } - return nil - } - } - wantBlock := func(action func(*dNC) error, unblock error) func(*dNC) error { - return func(s *dNC) error { - actionResult := make(chan error) - timedOut := make(chan struct{}) - go func() { - err := action(s) - select { - case <-timedOut: - if err != unblock { - t.Errorf("after unblocking, action result=%v, want %v", err, unblock) - } - case actionResult <- err: - } - }() - select { - case err := <-actionResult: - return errors.New(fmt.Sprintf("action result=%v, want to be blocked", err)) - case <-time.After(timeout): - close(timedOut) - } - return nil - } - } - write := func(b string) func(*dNC) error { - return func(s *dNC) error { - n, err := s.Write([]byte(b)) - if err == nil && n != len(b) { - return errors.New("Write returned nil error, but not everything was written") - } - return err - } - } - read := func(b string) func(*dNC) error { - return func(s *dNC) error { - r := make([]byte, len(b)) - n, err := s.Read(r) - if err == nil { - if n != len(b) { - return errors.New("Read returned nil error, but not everything was read") - } - if !reflect.DeepEqual(r, []byte(b)) { - return errors.New("Read=\"" + string(r) + "\", want \"" + string(b) + "\"") - } - } - return err - } - } - - testCases := []struct { - description string - servers []func() (*dNC, error) - actions []func(*dNC) error // actions per server - }{ - {"close,close", - ioStatesPairGenerator( - []string{"lock", "error", "success"}, - []string{"lock", "error", "success"}, - ), - []func(*dNC) error{ - wantResponse((*dNC).Close, nil, dNCErrClosed), - wantResponse((*dNC).Close, dNCErrClosed, dNCErrClosed), - }, - }, - {"write,close,write", - ioStatesPairGenerator( - []string{"lock"}, - []string{"lock", "error", "success"}, - ), - []func(*dNC) error{ - wantBlock(write(""), dNCErrClosed), - wantResponse((*dNC).Close, nil, dNCErrClosed), - wantResponse(write(""), dNCErrClosed, dNCErrClosed), - }, - }, - {"write,close,write", - ioStatesPairGenerator( - []string{"error"}, - []string{"lock", "error", "success"}, - ), - []func(*dNC) error{ - wantResponse(write(""), dNCErrWrite, dNCErrClosed), - wantResponse((*dNC).Close, nil, dNCErrClosed), - wantResponse(write(""), dNCErrClosed, dNCErrClosed), - }, - }, - {"write,close,write", - ioStatesPairGenerator( - []string{"success"}, - []string{"lock", "error", "success"}, - ), - []func(*dNC) error{ - wantResponse(write(""), nil, dNCErrClosed), - wantResponse((*dNC).Close, nil, dNCErrClosed), - wantResponse(write(""), dNCErrClosed, dNCErrClosed), - }, - }, - {"read,close,read", - ioStatesPairGenerator( - []string{"lock", "error", "success"}, - []string{"lock", "error", "success"}, - ), - []func(*dNC) error{ - wantBlock(read(""), io.EOF), - wantResponse((*dNC).Close, nil, dNCErrClosed), - wantResponse(read(""), io.EOF, io.EOF), - }, - }, - {"write,read", - ioStatesPairGenerator( - []string{"lock"}, - []string{"lock", "error", "success"}, - ), - []func(*dNC) error{ - wantBlock(write("1"), dNCErrClosed), - wantBlock(read("1"), io.EOF), - }, - }, - {"write,read", - ioStatesPairGenerator( - []string{"error"}, - []string{"lock", "error", "success"}, - ), - []func(*dNC) error{ - wantResponse(write("1"), dNCErrWrite, dNCErrClosed), - wantBlock(read("1"), io.EOF), - }, - }, - {"write,read", - ioStatesPairGenerator( - []string{"success"}, - []string{"lock"}, - ), - []func(*dNC) error{ - wantResponse(write("1"), nil, dNCErrClosed), - wantBlock(read("1"), io.EOF), - }, - }, - {"write,read", - ioStatesPairGenerator( - []string{"success"}, - []string{"error"}, - ), - []func(*dNC) error{ - wantResponse(write("1"), nil, dNCErrClosed), - wantResponse(read("1"), dNCErrRead, io.EOF), - }, - }, - {"write,read", - ioStatesPairGenerator( - []string{"success"}, - []string{"success"}, - ), - []func(*dNC) error{ - wantResponse(write("1"), nil, dNCErrClosed), - wantResponse(read("1"), nil, io.EOF), - }, - }, - } - for _, tc := range testCases { - t.Run(tc.description, func(t *testing.T) { - defer leaksMonitor(tc.description).checkTesting(t) - - for _, server := range tc.servers { - s, err := server() - if err != nil { - t.Error(err) - continue - } - if s == nil { - t.Error("nil server in testcase") - continue - } - - t.Run(s.LocalAddr().String(), func(t *testing.T) { - defer leaksMonitor(s.LocalAddr().String()).checkTesting(t) - for _, action := range tc.actions { - if err := action(s); err != nil { - t.Error(err) - break - } - } - s.Close() - }) - } - }) - } -} diff --git a/testingTools.go b/testingTools.go new file mode 100644 index 0000000..2f73031 --- /dev/null +++ b/testingTools.go @@ -0,0 +1,426 @@ +package xgb + +import ( + "bytes" + "errors" + "io" + "net" + "regexp" + "runtime" + "strconv" + "strings" + "testing" + "time" +) + +// Leaks monitor + +type goroutine struct { + id int + name string + stack []byte +} + +type leaks struct { + name string + goroutines map[int]goroutine + report []*leaks +} + +func leaksMonitor(name string, monitors ...*leaks) *leaks { + return &leaks{ + name, + leaks{}.collectGoroutines(), + monitors, + } +} + +// ispired by https://golang.org/src/runtime/debug/stack.go?s=587:606#L21 +// stack returns a formatted stack trace of all goroutines. +// It calls runtime.Stack with a large enough buffer to capture the entire trace. +func (_ leaks) stack() []byte { + buf := make([]byte, 1024) + for { + n := runtime.Stack(buf, true) + if n < len(buf) { + return buf[:n] + } + buf = make([]byte, 2*len(buf)) + } +} + +func (l leaks) collectGoroutines() map[int]goroutine { + res := make(map[int]goroutine) + stacks := bytes.Split(l.stack(), []byte{'\n', '\n'}) + + regexpId := regexp.MustCompile(`^\s*goroutine\s*(\d+)`) + for _, st := range stacks { + lines := bytes.Split(st, []byte{'\n'}) + if len(lines) < 2 { + panic("routine stach has less tnan two lines: " + string(st)) + } + + idMatches := regexpId.FindSubmatch(lines[0]) + if len(idMatches) < 2 { + panic("no id found in goroutine stack's first line: " + string(lines[0])) + } + id, err := strconv.Atoi(string(idMatches[1])) + if err != nil { + panic("converting goroutine id to number error: " + err.Error()) + } + if _, ok := res[id]; ok { + panic("2 goroutines with same id: " + strconv.Itoa(id)) + } + name := strings.TrimSpace(string(lines[1])) + + //filter out our stack routine + if strings.Contains(name, "xgb.leaks.stack") { + continue + } + + res[id] = goroutine{id, name, st} + } + return res +} + +func (l leaks) leakingGoroutines() []goroutine { + goroutines := l.collectGoroutines() + res := []goroutine{} + for id, gr := range goroutines { + if _, ok := l.goroutines[id]; ok { + continue + } + res = append(res, gr) + } + return res +} +func (l leaks) checkTesting(t *testing.T) { + if len(l.leakingGoroutines()) == 0 { + return + } + leakTimeout := 10 * time.Millisecond + time.Sleep(leakTimeout) + //t.Logf("possible goroutine leakage, waiting %v", leakTimeout) + grs := l.leakingGoroutines() + for _, gr := range grs { + t.Errorf("%s: %s is leaking", l.name, gr.name) + //t.Errorf("%s: %s is leaking\n%v", l.name, gr.name, string(gr.stack)) + } + for _, rl := range l.report { + rl.ignoreLeak(grs...) + } +} +func (l *leaks) ignoreLeak(grs ...goroutine) { + for _, gr := range grs { + l.goroutines[gr.id] = gr + } +} + +// dummy net.Conn + +type dAddr struct { + s string +} + +func (_ dAddr) Network() string { return "dummy" } +func (a dAddr) String() string { return a.s } + +var ( + dNCErrNotImplemented = errors.New("command not implemented") + dNCErrClosed = errors.New("server closed") + dNCErrWrite = errors.New("server write failed") + dNCErrRead = errors.New("server read failed") + dNCErrResponse = errors.New("server response error") +) + +type dNCIoResult struct { + n int + err error +} +type dNCIo struct { + b []byte + result chan dNCIoResult +} + +type dNCCWriteLock struct{} +type dNCCWriteUnlock struct{} +type dNCCWriteError struct{} +type dNCCWriteSuccess struct{} +type dNCCReadLock struct{} +type dNCCReadUnlock struct{} +type dNCCReadError struct{} +type dNCCReadSuccess struct{} + +// dummy net.Conn interface. Needs to be constructed via newDummyNetConn([...]) function. +type dNC struct { + reply func([]byte) []byte + addr dAddr + in, out chan dNCIo + control chan interface{} + done chan struct{} +} + +// Results running dummy server, satisfying net.Conn interface for test purposes. +// 'name' parameter will be returned via (*dNC).Local/RemoteAddr().String() +// 'reply' parameter function will be runned only on successful (*dNC).Write(b) with 'b' as parameter to 'reply'. The result will be stored in internal buffer and can be retrieved later via (*dNC).Read([...]) method. +// It is users responsibility to stop and clean up resources with (*dNC).Close, if not needed anymore. +// By default, the (*dNC).Write([...]) and (*dNC).Read([...]) methods are unlocked and will not result in error. +//TODO make (*dNC).SetDeadline, (*dNC).SetReadDeadline, (*dNC).SetWriteDeadline work proprely. +func newDummyNetConn(name string, reply func([]byte) []byte) *dNC { + + s := &dNC{ + reply, + dAddr{name}, + make(chan dNCIo), make(chan dNCIo), + make(chan interface{}), + make(chan struct{}), + } + + in, out := s.in, chan dNCIo(nil) + buf := &bytes.Buffer{} + errorRead, errorWrite := false, false + lockRead := false + + go func() { + defer close(s.done) + for { + select { + case dxsio := <-in: + if errorWrite { + dxsio.result <- dNCIoResult{0, dNCErrWrite} + break + } + + response := s.reply(dxsio.b) + + buf.Write(response) + dxsio.result <- dNCIoResult{len(dxsio.b), nil} + + if !lockRead && buf.Len() > 0 && out == nil { + out = s.out + } + case dxsio := <-out: + if errorRead { + dxsio.result <- dNCIoResult{0, dNCErrRead} + break + } + + n, err := buf.Read(dxsio.b) + dxsio.result <- dNCIoResult{n, err} + + if buf.Len() == 0 { + out = nil + } + case ci := <-s.control: + if ci == nil { + return + } + switch ci.(type) { + case dNCCWriteLock: + in = nil + case dNCCWriteUnlock: + in = s.in + case dNCCWriteError: + errorWrite = true + case dNCCWriteSuccess: + errorWrite = false + case dNCCReadLock: + out = nil + lockRead = true + case dNCCReadUnlock: + lockRead = false + if buf.Len() > 0 && out == nil { + out = s.out + } + case dNCCReadError: + errorRead = true + case dNCCReadSuccess: + errorRead = false + default: + } + } + } + }() + return s +} + +// Shuts down dummy net.Conn server. Every blocking or future method calls will do nothing and result in error. +// Result will be dNCErrClosed if server was allready closed. +// Server can not be unclosed. +func (s *dNC) Close() error { + select { + case s.control <- nil: + <-s.done + return nil + case <-s.done: + } + return dNCErrClosed +} + +// Performs a write action to server. +// If not locked by (*dNC).WriteLock, it results in error or success. If locked, this method will block until unlocked, or closed. +// +// This method can be set to result in error or success, via (*dNC).WriteError() or (*dNC).WriteSuccess() methods. +// +// If setted to result in error, the 'reply' function will NOT be called and internal buffer will NOT increasethe. +// Result will be (0, dNCErrWrite). +// +// If setted to result in success, the 'reply' function will be called and its result will be writen to internal buffer. +// If there is something in the internal buffer, the (*dNC).Read([...]) will be unblocked (if not previously locked with (*dNC).ReadLock). +// Result will be (len(b), nil) +// +// If server was closed previously, result will be (0, dNCErrClosed). +func (s *dNC) Write(b []byte) (int, error) { + resChan := make(chan dNCIoResult) + select { + case s.in <- dNCIo{b, resChan}: + res := <-resChan + return res.n, res.err + case <-s.done: + } + return 0, dNCErrClosed +} + +// Performs a read action from server. +// If locked by (*dNC).ReadLock(), this method will block until unlocked with (*dNC).ReadUnlock(), or server closes. +// +// If not locked, this method can be setted to result imidiatly in error, will block if internal buffer is empty or will perform an read operation from internal buffer. +// +// If setted to result in error via (*dNC).ReadError(), the result will be (0, dNCErrWrite). +// +// If not locked and not setted to result in error via (*dNC).ReadSuccess(), this method will block until internall buffer is not empty, than it returns the result of the buffer read operation via (*bytes.Buffer).Read([...]). +// If the internal buffer is empty after this method, all follwing (*dNC).Read([...]), requests will block until internall buffer is filled after successful write requests. +// +// If server was closed previously, result will be (0, io.EOF). +func (s *dNC) Read(b []byte) (int, error) { + resChan := make(chan dNCIoResult) + select { + case s.out <- dNCIo{b, resChan}: + res := <-resChan + return res.n, res.err + case <-s.done: + } + return 0, io.EOF +} +func (s *dNC) LocalAddr() net.Addr { return s.addr } +func (s *dNC) RemoteAddr() net.Addr { return s.addr } +func (s *dNC) SetDeadline(t time.Time) error { return dNCErrNotImplemented } +func (s *dNC) SetReadDeadline(t time.Time) error { return dNCErrNotImplemented } +func (s *dNC) SetWriteDeadline(t time.Time) error { return dNCErrNotImplemented } + +func (s *dNC) Control(i interface{}) error { + select { + case s.control <- i: + return nil + case <-s.done: + } + return dNCErrClosed +} + +// Locks writing. All write requests will be blocked until write is unlocked with (*dNC).WriteUnlock, or server closes. +func (s *dNC) WriteLock() error { + return s.Control(dNCCWriteLock{}) +} + +// Unlocks writing. All blocked write requests until now will be accepted. +func (s *dNC) WriteUnlock() error { + return s.Control(dNCCWriteUnlock{}) +} + +// Unlocks writing and makes (*dNC).Write to result (0, dNCErrWrite). +func (s *dNC) WriteError() error { + if err := s.WriteUnlock(); err != nil { + return err + } + return s.Control(dNCCWriteError{}) +} + +// Unlocks writing and makes (*dNC).Write([...]) not result in error. See (*dNC).Write for details. +func (s *dNC) WriteSuccess() error { + if err := s.WriteUnlock(); err != nil { + return err + } + return s.Control(dNCCWriteSuccess{}) +} + +// Locks reading. All read requests will be blocked until read is unlocked with (*dNC).ReadUnlock, or server closes. +// (*dNC).Read([...]) wil block even after successful write. +func (s *dNC) ReadLock() error { + return s.Control(dNCCReadLock{}) +} + +// Unlocks reading. If the internall buffer is not empty, next read will not block. +func (s *dNC) ReadUnlock() error { + return s.Control(dNCCReadUnlock{}) +} + +// Unlocks read and makes every blocked and following (*dNC).Read([...]) imidiatly result in error. See (*dNC).Read for details. +func (s *dNC) ReadError() error { + if err := s.ReadUnlock(); err != nil { + return err + } + return s.Control(dNCCReadError{}) +} + +// Unlocks read and makes every blocked and following (*dNC).Read([...]) requests be handled, if according to internal buffer. See (*dNC).Read for details. +func (s *dNC) ReadSuccess() error { + if err := s.ReadUnlock(); err != nil { + return err + } + return s.Control(dNCCReadSuccess{}) +} + +// dummy X server replier for dummy net.Conn + +type dXSEvent struct{} + +func (_ dXSEvent) Bytes() []byte { return nil } +func (_ dXSEvent) String() string { return "dummy X server event" } + +type dXSError struct { + seqId uint16 +} + +func (e dXSError) SequenceId() uint16 { return e.seqId } +func (_ dXSError) BadId() uint32 { return 0 } +func (_ dXSError) Error() string { return "dummy X server error reply" } + +func newDummyXServerReplier() func([]byte) []byte { + // register xgb error & event replies + NewErrorFuncs[255] = func(buf []byte) Error { + return dXSError{Get16(buf[2:])} + } + NewEventFuncs[128&127] = func(buf []byte) Event { + return dXSEvent{} + } + + // sequence number generator + seqId := uint16(1) + incrementSequenceId := func() { + // this has to be the same algorithm as in (*Conn).generateSeqIds + if seqId == uint16((1<<16)-1) { + seqId = 0 + } else { + seqId++ + } + } + return func(request []byte) []byte { + res := make([]byte, 32) + switch string(request) { + case "event": + res[0] = 128 + return res + case "error": + res[0] = 0 // error + res[1] = 255 // error function + default: + res[0] = 1 // reply + } + Put16(res[2:], seqId) // sequence number + incrementSequenceId() + if string(request) == "noreply" { + return nil + } + return res + } +} diff --git a/testingTools_test.go b/testingTools_test.go new file mode 100644 index 0000000..518b326 --- /dev/null +++ b/testingTools_test.go @@ -0,0 +1,350 @@ +package xgb + +import ( + "bytes" + "errors" + "fmt" + "io" + "reflect" + "sync" + "testing" + "time" +) + +func TestLeaks(t *testing.T) { + lm := leaksMonitor("lm") + if lgrs := lm.leakingGoroutines(); len(lgrs) != 0 { + t.Errorf("leakingGoroutines returned %d leaking goroutines, want 0", len(lgrs)) + } + + done := make(chan struct{}) + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { + <-done + wg.Done() + }() + + if lgrs := lm.leakingGoroutines(); len(lgrs) != 1 { + t.Errorf("leakingGoroutines returned %d leaking goroutines, want 1", len(lgrs)) + } + + wg.Add(1) + go func() { + <-done + wg.Done() + }() + + if lgrs := lm.leakingGoroutines(); len(lgrs) != 2 { + t.Errorf("leakingGoroutines returned %d leaking goroutines, want 2", len(lgrs)) + } + + close(done) + wg.Wait() + + if lgrs := lm.leakingGoroutines(); len(lgrs) != 0 { + t.Errorf("leakingGoroutines returned %d leaking goroutines, want 0", len(lgrs)) + } + + lm.checkTesting(t) + //TODO multiple leak monitors with report ignore tests +} + +func TestDummyNetConn(t *testing.T) { + ioStatesPairGenerator := func(writeStates, readStates []string) []func() (*dNC, error) { + writeSetters := map[string]func(*dNC) error{ + "lock": (*dNC).WriteLock, + "error": (*dNC).WriteError, + "success": (*dNC).WriteSuccess, + } + readSetters := map[string]func(*dNC) error{ + "lock": (*dNC).ReadLock, + "error": (*dNC).ReadError, + "success": (*dNC).ReadSuccess, + } + + res := []func() (*dNC, error){} + for _, writeState := range writeStates { + writeState, writeSetter := writeState, writeSetters[writeState] + if writeSetter == nil { + panic("unknown write state: " + writeState) + continue + } + for _, readState := range readStates { + readState, readSetter := readState, readSetters[readState] + if readSetter == nil { + panic("unknown read state: " + readState) + continue + } + res = append(res, func() (*dNC, error) { + + // loopback server + s := newDummyNetConn("w:"+writeState+";r:"+readState, func(b []byte) []byte { return b }) + + if err := readSetter(s); err != nil { + s.Close() + return nil, errors.New("set read " + readState + " error: " + err.Error()) + } + + if err := writeSetter(s); err != nil { + s.Close() + return nil, errors.New("set write " + writeState + " error: " + err.Error()) + } + + return s, nil + }) + } + } + return res + } + + timeout := 10 * time.Millisecond + wantResponse := func(action func(*dNC) error, want, block error) func(*dNC) error { + return func(s *dNC) error { + actionResult := make(chan error) + timedOut := make(chan struct{}) + go func() { + err := action(s) + select { + case <-timedOut: + if err != block { + t.Errorf("after unblocking, action result=%v, want %v", err, block) + } + case actionResult <- err: + } + }() + select { + case err := <-actionResult: + if err != want { + return errors.New(fmt.Sprintf("action result=%v, want %v", err, want)) + } + case <-time.After(timeout): + close(timedOut) + return errors.New(fmt.Sprintf("action did not respond for %v, result want %v", timeout, want)) + } + return nil + } + } + wantBlock := func(action func(*dNC) error, unblock error) func(*dNC) error { + return func(s *dNC) error { + actionResult := make(chan error) + timedOut := make(chan struct{}) + go func() { + err := action(s) + select { + case <-timedOut: + if err != unblock { + t.Errorf("after unblocking, action result=%v, want %v", err, unblock) + } + case actionResult <- err: + } + }() + select { + case err := <-actionResult: + return errors.New(fmt.Sprintf("action result=%v, want to be blocked", err)) + case <-time.After(timeout): + close(timedOut) + } + return nil + } + } + write := func(b string) func(*dNC) error { + return func(s *dNC) error { + n, err := s.Write([]byte(b)) + if err == nil && n != len(b) { + return errors.New("Write returned nil error, but not everything was written") + } + return err + } + } + read := func(b string) func(*dNC) error { + return func(s *dNC) error { + r := make([]byte, len(b)) + n, err := s.Read(r) + if err == nil { + if n != len(b) { + return errors.New("Read returned nil error, but not everything was read") + } + if !reflect.DeepEqual(r, []byte(b)) { + return errors.New("Read=\"" + string(r) + "\", want \"" + string(b) + "\"") + } + } + return err + } + } + + testCases := []struct { + description string + servers []func() (*dNC, error) + actions []func(*dNC) error // actions per server + }{ + {"close,close", + ioStatesPairGenerator( + []string{"lock", "error", "success"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantResponse((*dNC).Close, nil, dNCErrClosed), + wantResponse((*dNC).Close, dNCErrClosed, dNCErrClosed), + }, + }, + {"write,close,write", + ioStatesPairGenerator( + []string{"lock"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantBlock(write(""), dNCErrClosed), + wantResponse((*dNC).Close, nil, dNCErrClosed), + wantResponse(write(""), dNCErrClosed, dNCErrClosed), + }, + }, + {"write,close,write", + ioStatesPairGenerator( + []string{"error"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantResponse(write(""), dNCErrWrite, dNCErrClosed), + wantResponse((*dNC).Close, nil, dNCErrClosed), + wantResponse(write(""), dNCErrClosed, dNCErrClosed), + }, + }, + {"write,close,write", + ioStatesPairGenerator( + []string{"success"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantResponse(write(""), nil, dNCErrClosed), + wantResponse((*dNC).Close, nil, dNCErrClosed), + wantResponse(write(""), dNCErrClosed, dNCErrClosed), + }, + }, + {"read,close,read", + ioStatesPairGenerator( + []string{"lock", "error", "success"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantBlock(read(""), io.EOF), + wantResponse((*dNC).Close, nil, dNCErrClosed), + wantResponse(read(""), io.EOF, io.EOF), + }, + }, + {"write,read", + ioStatesPairGenerator( + []string{"lock"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantBlock(write("1"), dNCErrClosed), + wantBlock(read("1"), io.EOF), + }, + }, + {"write,read", + ioStatesPairGenerator( + []string{"error"}, + []string{"lock", "error", "success"}, + ), + []func(*dNC) error{ + wantResponse(write("1"), dNCErrWrite, dNCErrClosed), + wantBlock(read("1"), io.EOF), + }, + }, + {"write,read", + ioStatesPairGenerator( + []string{"success"}, + []string{"lock"}, + ), + []func(*dNC) error{ + wantResponse(write("1"), nil, dNCErrClosed), + wantBlock(read("1"), io.EOF), + }, + }, + {"write,read", + ioStatesPairGenerator( + []string{"success"}, + []string{"error"}, + ), + []func(*dNC) error{ + wantResponse(write("1"), nil, dNCErrClosed), + wantResponse(read("1"), dNCErrRead, io.EOF), + }, + }, + {"write,read", + ioStatesPairGenerator( + []string{"success"}, + []string{"success"}, + ), + []func(*dNC) error{ + wantResponse(write("1"), nil, dNCErrClosed), + wantResponse(read("1"), nil, io.EOF), + }, + }, + } + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + defer leaksMonitor(tc.description).checkTesting(t) + + for _, server := range tc.servers { + s, err := server() + if err != nil { + t.Error(err) + continue + } + if s == nil { + t.Error("nil server in testcase") + continue + } + + t.Run(s.LocalAddr().String(), func(t *testing.T) { + defer leaksMonitor(s.LocalAddr().String()).checkTesting(t) + for _, action := range tc.actions { + if err := action(s); err != nil { + t.Error(err) + break + } + } + s.Close() + }) + } + }) + } +} + +func TestDummyXServerReplier(t *testing.T) { + testCases := [][][2][]byte{ + { + [2][]byte{[]byte("reply"), []byte{1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}, + [2][]byte{[]byte("eply"), []byte{1, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}, + [2][]byte{[]byte("ply"), []byte{1, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}, + [2][]byte{[]byte("event"), []byte{128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}, + [2][]byte{[]byte("ly"), []byte{1, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}, + [2][]byte{[]byte("y"), []byte{1, 0, 5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}, + [2][]byte{[]byte(""), []byte{1, 0, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}, + [2][]byte{[]byte("event"), []byte{128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}, + [2][]byte{[]byte("reply"), []byte{1, 0, 7, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}, + [2][]byte{[]byte("error"), []byte{0, 255, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}, + [2][]byte{[]byte("ply"), []byte{1, 0, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}, + [2][]byte{[]byte("event"), []byte{128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}, + [2][]byte{[]byte("ly"), []byte{1, 0, 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}, + [2][]byte{[]byte("noreply"), nil}, + [2][]byte{[]byte("error"), []byte{0, 255, 12, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}, + [2][]byte{[]byte("noreply"), nil}, + [2][]byte{[]byte(""), []byte{1, 0, 14, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}, + }, + } + + for tci, tc := range testCases { + replier := newDummyXServerReplier() + for ai, ioPair := range tc { + in, want := ioPair[0], ioPair[1] + if out := replier(in); !bytes.Equal(out, want) { + t.Errorf("testCase %d, action %d, replier(%s) = %v, want %v", tci, ai, string(in), out, want) + break + } + } + } +} diff --git a/xgb_test.go b/xgb_test.go index 6931c3d..19ed307 100644 --- a/xgb_test.go +++ b/xgb_test.go @@ -1,139 +1,14 @@ package xgb import ( - "bytes" "errors" "fmt" - "regexp" - "runtime" - "strconv" - "strings" "testing" "time" ) -type goroutine struct { - id int - name string - stack []byte -} - -type leaks struct { - name string - goroutines map[int]goroutine - report []*leaks -} - -func leaksMonitor(name string, monitors ...*leaks) *leaks { - return &leaks{ - name, - leaks{}.collectGoroutines(), - monitors, - } -} - -// ispired by https://golang.org/src/runtime/debug/stack.go?s=587:606#L21 -// stack returns a formatted stack trace of all goroutines. -// It calls runtime.Stack with a large enough buffer to capture the entire trace. -func (_ leaks) stack() []byte { - buf := make([]byte, 1024) - for { - n := runtime.Stack(buf, true) - if n < len(buf) { - return buf[:n] - } - buf = make([]byte, 2*len(buf)) - } -} - -func (l leaks) collectGoroutines() map[int]goroutine { - res := make(map[int]goroutine) - stacks := bytes.Split(l.stack(), []byte{'\n', '\n'}) - - regexpId := regexp.MustCompile(`^\s*goroutine\s*(\d+)`) - for _, st := range stacks { - lines := bytes.Split(st, []byte{'\n'}) - if len(lines) < 2 { - panic("routine stach has less tnan two lines: " + string(st)) - } - - idMatches := regexpId.FindSubmatch(lines[0]) - if len(idMatches) < 2 { - panic("no id found in goroutine stack's first line: " + string(lines[0])) - } - id, err := strconv.Atoi(string(idMatches[1])) - if err != nil { - panic("converting goroutine id to number error: " + err.Error()) - } - if _, ok := res[id]; ok { - panic("2 goroutines with same id: " + strconv.Itoa(id)) - } - name := strings.TrimSpace(string(lines[1])) - - //filter out our stack routine - if strings.Contains(name, "xgb.leaks.stack") { - continue - } - - res[id] = goroutine{id, name, st} - } - return res -} - -func (l leaks) leakingGoroutines() []goroutine { - goroutines := l.collectGoroutines() - res := []goroutine{} - for id, gr := range goroutines { - if _, ok := l.goroutines[id]; ok { - continue - } - res = append(res, gr) - } - return res -} -func (l leaks) checkTesting(t *testing.T) { - if len(l.leakingGoroutines()) == 0 { - return - } - leakTimeout := time.Second - time.Sleep(leakTimeout) - //t.Logf("possible goroutine leakage, waiting %v", leakTimeout) - grs := l.leakingGoroutines() - for _, gr := range grs { - t.Errorf("%s: %s is leaking", l.name, gr.name) - //t.Errorf("%s: %s is leaking\n%v", l.name, gr.name, string(gr.stack)) - } - for _, rl := range l.report { - rl.ignoreLeak(grs...) - } -} -func (l *leaks) ignoreLeak(grs ...goroutine) { - for _, gr := range grs { - l.goroutines[gr.id] = gr - } -} - -type dNCEvent struct{} - -func (_ dNCEvent) Bytes() []byte { return nil } -func (_ dNCEvent) String() string { return "dummy X server event" } - -type dNCError struct { - seqId uint16 -} - -func (e dNCError) SequenceId() uint16 { return e.seqId } -func (_ dNCError) BadId() uint32 { return 0 } -func (_ dNCError) Error() string { return "dummy X server error reply" } - func TestConnOnNonBlockingDummyXServer(t *testing.T) { - timeout := time.Millisecond - NewErrorFuncs[255] = func(buf []byte) Error { - return dNCError{Get16(buf[2:])} - } - NewEventFuncs[128&127] = func(buf []byte) Event { - return dNCEvent{} - } + timeout := 10 * time.Millisecond checkedReply := func(wantError bool) func(*Conn) error { request := "reply" if wantError { @@ -307,45 +182,10 @@ func TestConnOnNonBlockingDummyXServer(t *testing.T) { } for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - tclm := leaksMonitor("test case " + tc.description) - defer tclm.checkTesting(t) - - seqId := uint16(1) - incrementSequenceId := func() { - // this has to be the same algorithm as in (*Conn).generateSeqIds - if seqId == uint16((1<<16)-1) { - seqId = 0 - } else { - seqId++ - } - } - dummyXreplyer := func(request []byte) []byte { - //fmt.Printf("dummyXreplyer got request: %s\n", string(request)) - res := make([]byte, 32) - switch string(request) { - case "event": - res[0] = 128 - //fmt.Printf("dummyXreplyer sent response: %v\n", res) - return res - case "error": - res[0] = 0 // error - res[1] = 255 // error function - default: - res[0] = 1 // reply - } - Put16(res[2:], seqId) // sequence number - incrementSequenceId() - if string(request) == "noreply" { - //fmt.Printf("dummyXreplyer no response sent\n") - return nil - } - //fmt.Printf("dummyXreplyer sent response: %v\n", res) - return res - } - - sclm := leaksMonitor("after server close", tclm) + sclm := leaksMonitor("after server close, before testcase exit") defer sclm.checkTesting(t) - s := newDummyNetConn("dummX", dummyXreplyer) + + s := newDummyNetConn("dummyX", newDummyXServerReplier()) defer s.Close() c, err := postNewConn(&Conn{conn: s}) @@ -354,7 +194,8 @@ func TestConnOnNonBlockingDummyXServer(t *testing.T) { return } - rlm := leaksMonitor("after actions end") + defer leaksMonitor("after actions end", sclm).checkTesting(t) + for _, action := range tc.actions { if err := action(c); err != nil { t.Error(err) @@ -370,6 +211,7 @@ func TestConnOnNonBlockingDummyXServer(t *testing.T) { recovered = true } }() + c.Close() }() if !recovered { @@ -378,8 +220,6 @@ func TestConnOnNonBlockingDummyXServer(t *testing.T) { } } - rlm.checkTesting(t) - }) } } -- cgit v1.2.3