summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjEzEk <[email protected]>2018-10-02 16:52:41 +0200
committerjEzEk <[email protected]>2018-10-27 19:45:25 +0200
commit67a5ab1e1a19c2cfc42fd9f539194dd17aafeacb (patch)
tree1b2a4099db71c94225de7f226b1f0a72ef7f6d87
parenta898cd2db36d1b9556fe79c35f67d0eb63540517 (diff)
Handle Conn's spawned goroutines upon close, ...
fix double close panic, fix occaional panic on sudden connection to server close handle all channel waitings in cookies and requests properly
-rw-r--r--cookie.go13
-rw-r--r--xgb.go162
2 files changed, 121 insertions, 54 deletions
diff --git a/cookie.go b/cookie.go
index d5cdb29..c012cfd 100644
--- a/cookie.go
+++ b/cookie.go
@@ -2,6 +2,7 @@ package xgb
import (
"errors"
+ "io"
)
// Cookie is the internal representation of a cookie, where one is generated
@@ -80,6 +81,7 @@ func (c Cookie) Reply() ([]byte, error) {
// channels. If the former arrives, the bytes are returned with a nil error.
// If the latter arrives, no bytes are returned (nil) and the error received
// is returned.
+// Returns (nil, io.EOF) when the connection is closed.
//
// Unless you're building requests from bytes by hand, this method should
// not be used.
@@ -98,6 +100,9 @@ func (c Cookie) replyChecked() ([]byte, error) {
return reply, nil
case err := <-c.errorChan:
return nil, err
+ case <-c.conn.doneRead:
+ // c.conn.readResponses is no more, there will be no replys or errors
+ return nil, io.EOF
}
}
@@ -106,6 +111,7 @@ func (c Cookie) replyChecked() ([]byte, error) {
// If the latter arrives, no bytes are returned (nil) and a nil error
// is returned. (In the latter case, the corresponding error can be retrieved
// from (Wait|Poll)ForEvent asynchronously.)
+// Returns (nil, io.EOF) when the connection is closed.
// In all honesty, you *probably* don't want to use this method.
//
// Unless you're building requests from bytes by hand, this method should
@@ -121,6 +127,9 @@ func (c Cookie) replyUnchecked() ([]byte, error) {
return reply, nil
case <-c.pingChan:
return nil, nil
+ case <-c.conn.doneRead:
+ // c.conn.readResponses is no more, there will be no replys or pings
+ return nil, io.EOF
}
}
@@ -132,6 +141,7 @@ func (c Cookie) replyUnchecked() ([]byte, error) {
// Thus, pingChan is sent a value when the *next* reply is read.
// If no more replies are being processed, we force a round trip request with
// GetInputFocus.
+// Returns io.EOF error when the connection is closed.
//
// Unless you're building requests from bytes by hand, this method should
// not be used.
@@ -161,5 +171,8 @@ func (c Cookie) Check() error {
return err
case <-c.pingChan:
return nil
+ case <-c.conn.doneRead:
+ // c.conn.readResponses is no more, there will be no errors or pings
+ return io.EOF
}
}
diff --git a/xgb.go b/xgb.go
index c3eded5..d605da6 100644
--- a/xgb.go
+++ b/xgb.go
@@ -60,7 +60,8 @@ type Conn struct {
xidChan chan xid
seqChan chan uint16
reqChan chan *request
- closing chan chan struct{}
+ doneSend chan struct{}
+ doneRead chan struct{}
// ExtLock is a lock used whenever new extensions are initialized.
// It should not be used. It is exported for use in the extension
@@ -75,11 +76,13 @@ type Conn struct {
// NewConn creates a new connection instance. It initializes locks, data
// structures, and performs the initial handshake. (The code for the handshake
// has been relegated to conn.go.)
+// It is up to user to close connection with Close() method to finish all unfinished requests and clean up spawned goroutines.
+// If the connection unexpectedly closes itself and WaitForEvent() returns "nil, nil", everything is cleaned by that moment, but nothing bad happens if you call Close() after.
func NewConn() (*Conn, error) {
return NewConnDisplay("")
}
-// NewConnDisplay is just like NewConn, but allows a specific DISPLAY
+// NewConnDisplay is just like NewConn (see closing instructions), but allows a specific DISPLAY
// string to be used.
// If 'display' is empty it will be taken from os.Getenv("DISPLAY").
//
@@ -101,7 +104,7 @@ func NewConnDisplay(display string) (*Conn, error) {
return postNewConn(c)
}
-// NewConnNet is just like NewConn, but allows a specific net.Conn
+// NewConnNet is just like NewConn (see closing instructions), but allows a specific net.Conn
// to be used.
func NewConnNet(netConn net.Conn) (*Conn, error) {
c := &Conn{}
@@ -125,7 +128,8 @@ func postNewConn(c *Conn) (*Conn, error) {
c.seqChan = make(chan uint16, seqBuffer)
c.reqChan = make(chan *request, reqBuffer)
c.eventChan = make(chan eventOrError, eventBuffer)
- c.closing = make(chan chan struct{}, 1)
+ c.doneSend = make(chan struct{})
+ c.doneRead = make(chan struct{})
go c.generateXIds()
go c.generateSeqIds()
@@ -136,8 +140,12 @@ func postNewConn(c *Conn) (*Conn, error) {
}
// Close gracefully closes the connection to the X server.
+// When everything is cleaned up, the WaitForEvent method will return (nil, nil)
func (c *Conn) Close() {
- close(c.reqChan)
+ select {
+ case c.reqChan <- nil:
+ case <-c.doneSend:
+ }
}
// Event is an interface that can contain any of the events returned by the
@@ -196,8 +204,12 @@ type eventOrError interface{}
// If you need identifiers, use the appropriate constructor.
// e.g., For a window id, use xproto.NewWindowId. For
// a new pixmap id, use xproto.NewPixmapId. And so on.
+// Returns (0, io.EOF) when the connection is closed.
func (c *Conn) NewId() (uint32, error) {
- xid := <-c.xidChan
+ xid, ok := <-c.xidChan
+ if !ok {
+ return 0, io.EOF
+ }
if xid.err != nil {
return 0, xid.err
}
@@ -240,19 +252,26 @@ func (c *Conn) generateXIds() {
max := c.setupResourceIdMask
last := uint32(0)
for {
- // TODO: Use the XC Misc extension to look for released ids.
+ id := xid{}
if last > 0 && last >= max-inc+1 {
- c.xidChan <- xid{
- id: 0,
- err: errors.New("There are no more available resource" +
- "identifiers."),
+ // TODO: Use the XC Misc extension to look for released ids.
+ id = xid{
+ id: 0,
+ err: errors.New("There are no more available resource identifiers."),
+ }
+ } else {
+ last += inc
+ id = xid{
+ id: last | c.setupResourceIdBase,
+ err: nil,
}
}
- last += inc
- c.xidChan <- xid{
- id: last | c.setupResourceIdBase,
- err: nil,
+ select {
+ case c.xidChan <- id:
+ case <-c.doneSend:
+ // c.sendRequests is down and since this id is used by requests, we don't need this goroutine running anymore.
+ return
}
}
}
@@ -275,11 +294,16 @@ func (c *Conn) generateSeqIds() {
seqid := uint16(1)
for {
- c.seqChan <- seqid
- if seqid == uint16((1<<16)-1) {
- seqid = 0
- } else {
- seqid++
+ select {
+ case c.seqChan <- seqid:
+ if seqid == uint16((1<<16)-1) {
+ seqid = 0
+ } else {
+ seqid++
+ }
+ case <-c.doneSend:
+ // c.sendRequests is down and since only that function uses sequence ids (via newSequenceId method), we don't need this goroutine running anymore.
+ return
}
}
}
@@ -315,8 +339,19 @@ type request struct {
// edits the generated code for the request you want to issue.
func (c *Conn) NewRequest(buf []byte, cookie *Cookie) {
seq := make(chan struct{})
- c.reqChan <- &request{buf: buf, cookie: cookie, seq: seq}
- <-seq
+ select {
+ case c.reqChan <- &request{buf: buf, cookie: cookie, seq: seq}:
+ // request is in buffer
+ // wait until request is processed or connection is closed
+ select {
+ case <-seq:
+ // request was successfully sent to X server
+ case <-c.doneSend:
+ // c.sendRequests is down, your request was not handled
+ }
+ case <-c.doneSend:
+ // c.sendRequests is down, nobody is listening to your requests
+ }
}
// sendRequests is run as a single goroutine that takes requests and writes
@@ -324,28 +359,45 @@ func (c *Conn) NewRequest(buf []byte, cookie *Cookie) {
// It is meant to be run as its own goroutine.
func (c *Conn) sendRequests() {
defer close(c.cookieChan)
+ defer c.conn.Close()
+ defer close(c.doneSend)
- for req := range c.reqChan {
- // ho there! if the cookie channel is nearly full, force a round
- // trip to clear out the cookie buffer.
- // Note that we circumvent the request channel, because we're *in*
- // the request channel.
- if len(c.cookieChan) == cookieBuffer-1 {
- if err := c.noop(); err != nil {
- // Shut everything down.
- break
+ for {
+ select {
+ case req := <-c.reqChan:
+ if req == nil {
+ // a request by c.Close() to gracefully exit
+ // Flush the response reading goroutine.
+ if err := c.noop(); err != nil {
+ c.conn.Close()
+ <-c.doneRead
+ }
+ return
+ }
+ // ho there! if the cookie channel is nearly full, force a round
+ // trip to clear out the cookie buffer.
+ // Note that we circumvent the request channel, because we're *in*
+ // the request channel.
+ if len(c.cookieChan) == cookieBuffer-1 {
+ if err := c.noop(); err != nil {
+ // Shut everything down.
+ c.conn.Close()
+ <-c.doneRead
+ return
+ }
+ }
+ req.cookie.Sequence = c.newSequenceId()
+ c.cookieChan <- req.cookie
+ if err := c.writeBuffer(req.buf); err != nil {
+ c.conn.Close()
+ <-c.doneRead
+ return
}
+ close(req.seq)
+ case <-c.doneRead:
+ return
}
- req.cookie.Sequence = c.newSequenceId()
- c.cookieChan <- req.cookie
- c.writeBuffer(req.buf)
- close(req.seq)
}
- response := make(chan struct{})
- c.closing <- response
- c.noop() // Flush the response reading goroutine, ignore error.
- <-response
- c.conn.Close()
}
// noop circumvents the usual request sending goroutines and forces a round
@@ -366,9 +418,8 @@ func (c *Conn) writeBuffer(buf []byte) error {
if _, err := c.conn.Write(buf); err != nil {
Logger.Printf("A write error is unrecoverable: %s", err)
return err
- } else {
- return nil
}
+ return nil
}
// readResponses is a goroutine that reads events, errors and
@@ -382,6 +433,8 @@ func (c *Conn) writeBuffer(buf []byte) error {
// Finally, cookies that came "before" this reply are always cleaned up.
func (c *Conn) readResponses() {
defer close(c.eventChan)
+ defer c.conn.Close()
+ defer close(c.doneRead)
var (
err Error
@@ -390,20 +443,18 @@ func (c *Conn) readResponses() {
)
for {
- select {
- case respond := <-c.closing:
- respond <- struct{}{}
- return
- default:
- }
-
buf := make([]byte, 32)
err, seq = nil, 0
if _, err := io.ReadFull(c.conn, buf); err != nil {
+ select {
+ case <-c.doneSend:
+ // gracefully closing
+ return
+ default:
+ }
Logger.Printf("A read error is unrecoverable: %s", err)
c.eventChan <- err
- c.Close()
- continue
+ return
}
switch buf[0] {
case 0: // This is an error
@@ -432,8 +483,7 @@ func (c *Conn) readResponses() {
if _, err := io.ReadFull(c.conn, biggerBuf[32:]); err != nil {
Logger.Printf("A read error is unrecoverable: %s", err)
c.eventChan <- err
- c.Close()
- continue
+ return
}
replyBytes = biggerBuf
} else {
@@ -522,10 +572,14 @@ func processEventOrError(everr eventOrError) (Event, Error) {
return ee, nil
case Error:
return nil, ee
+ case error:
+ // c.conn read error
+ case nil:
+ // c.eventChan is closed
default:
Logger.Printf("Invalid event/error type: %T", everr)
- return nil, nil
}
+ return nil, nil
}
// WaitForEvent returns the next event from the server.