diff options
| author | jEzEk <[email protected]> | 2018-10-02 16:52:41 +0200 |
|---|---|---|
| committer | jEzEk <[email protected]> | 2018-10-27 19:45:25 +0200 |
| commit | 67a5ab1e1a19c2cfc42fd9f539194dd17aafeacb (patch) | |
| tree | 1b2a4099db71c94225de7f226b1f0a72ef7f6d87 | |
| parent | a898cd2db36d1b9556fe79c35f67d0eb63540517 (diff) | |
Handle Conn's spawned goroutines upon close, ...
fix double close panic,
fix occaional panic on sudden connection to server close
handle all channel waitings in cookies and requests properly
| -rw-r--r-- | cookie.go | 13 | ||||
| -rw-r--r-- | xgb.go | 162 |
2 files changed, 121 insertions, 54 deletions
@@ -2,6 +2,7 @@ package xgb import ( "errors" + "io" ) // Cookie is the internal representation of a cookie, where one is generated @@ -80,6 +81,7 @@ func (c Cookie) Reply() ([]byte, error) { // channels. If the former arrives, the bytes are returned with a nil error. // If the latter arrives, no bytes are returned (nil) and the error received // is returned. +// Returns (nil, io.EOF) when the connection is closed. // // Unless you're building requests from bytes by hand, this method should // not be used. @@ -98,6 +100,9 @@ func (c Cookie) replyChecked() ([]byte, error) { return reply, nil case err := <-c.errorChan: return nil, err + case <-c.conn.doneRead: + // c.conn.readResponses is no more, there will be no replys or errors + return nil, io.EOF } } @@ -106,6 +111,7 @@ func (c Cookie) replyChecked() ([]byte, error) { // If the latter arrives, no bytes are returned (nil) and a nil error // is returned. (In the latter case, the corresponding error can be retrieved // from (Wait|Poll)ForEvent asynchronously.) +// Returns (nil, io.EOF) when the connection is closed. // In all honesty, you *probably* don't want to use this method. // // Unless you're building requests from bytes by hand, this method should @@ -121,6 +127,9 @@ func (c Cookie) replyUnchecked() ([]byte, error) { return reply, nil case <-c.pingChan: return nil, nil + case <-c.conn.doneRead: + // c.conn.readResponses is no more, there will be no replys or pings + return nil, io.EOF } } @@ -132,6 +141,7 @@ func (c Cookie) replyUnchecked() ([]byte, error) { // Thus, pingChan is sent a value when the *next* reply is read. // If no more replies are being processed, we force a round trip request with // GetInputFocus. +// Returns io.EOF error when the connection is closed. // // Unless you're building requests from bytes by hand, this method should // not be used. @@ -161,5 +171,8 @@ func (c Cookie) Check() error { return err case <-c.pingChan: return nil + case <-c.conn.doneRead: + // c.conn.readResponses is no more, there will be no errors or pings + return io.EOF } } @@ -60,7 +60,8 @@ type Conn struct { xidChan chan xid seqChan chan uint16 reqChan chan *request - closing chan chan struct{} + doneSend chan struct{} + doneRead chan struct{} // ExtLock is a lock used whenever new extensions are initialized. // It should not be used. It is exported for use in the extension @@ -75,11 +76,13 @@ type Conn struct { // NewConn creates a new connection instance. It initializes locks, data // structures, and performs the initial handshake. (The code for the handshake // has been relegated to conn.go.) +// It is up to user to close connection with Close() method to finish all unfinished requests and clean up spawned goroutines. +// If the connection unexpectedly closes itself and WaitForEvent() returns "nil, nil", everything is cleaned by that moment, but nothing bad happens if you call Close() after. func NewConn() (*Conn, error) { return NewConnDisplay("") } -// NewConnDisplay is just like NewConn, but allows a specific DISPLAY +// NewConnDisplay is just like NewConn (see closing instructions), but allows a specific DISPLAY // string to be used. // If 'display' is empty it will be taken from os.Getenv("DISPLAY"). // @@ -101,7 +104,7 @@ func NewConnDisplay(display string) (*Conn, error) { return postNewConn(c) } -// NewConnNet is just like NewConn, but allows a specific net.Conn +// NewConnNet is just like NewConn (see closing instructions), but allows a specific net.Conn // to be used. func NewConnNet(netConn net.Conn) (*Conn, error) { c := &Conn{} @@ -125,7 +128,8 @@ func postNewConn(c *Conn) (*Conn, error) { c.seqChan = make(chan uint16, seqBuffer) c.reqChan = make(chan *request, reqBuffer) c.eventChan = make(chan eventOrError, eventBuffer) - c.closing = make(chan chan struct{}, 1) + c.doneSend = make(chan struct{}) + c.doneRead = make(chan struct{}) go c.generateXIds() go c.generateSeqIds() @@ -136,8 +140,12 @@ func postNewConn(c *Conn) (*Conn, error) { } // Close gracefully closes the connection to the X server. +// When everything is cleaned up, the WaitForEvent method will return (nil, nil) func (c *Conn) Close() { - close(c.reqChan) + select { + case c.reqChan <- nil: + case <-c.doneSend: + } } // Event is an interface that can contain any of the events returned by the @@ -196,8 +204,12 @@ type eventOrError interface{} // If you need identifiers, use the appropriate constructor. // e.g., For a window id, use xproto.NewWindowId. For // a new pixmap id, use xproto.NewPixmapId. And so on. +// Returns (0, io.EOF) when the connection is closed. func (c *Conn) NewId() (uint32, error) { - xid := <-c.xidChan + xid, ok := <-c.xidChan + if !ok { + return 0, io.EOF + } if xid.err != nil { return 0, xid.err } @@ -240,19 +252,26 @@ func (c *Conn) generateXIds() { max := c.setupResourceIdMask last := uint32(0) for { - // TODO: Use the XC Misc extension to look for released ids. + id := xid{} if last > 0 && last >= max-inc+1 { - c.xidChan <- xid{ - id: 0, - err: errors.New("There are no more available resource" + - "identifiers."), + // TODO: Use the XC Misc extension to look for released ids. + id = xid{ + id: 0, + err: errors.New("There are no more available resource identifiers."), + } + } else { + last += inc + id = xid{ + id: last | c.setupResourceIdBase, + err: nil, } } - last += inc - c.xidChan <- xid{ - id: last | c.setupResourceIdBase, - err: nil, + select { + case c.xidChan <- id: + case <-c.doneSend: + // c.sendRequests is down and since this id is used by requests, we don't need this goroutine running anymore. + return } } } @@ -275,11 +294,16 @@ func (c *Conn) generateSeqIds() { seqid := uint16(1) for { - c.seqChan <- seqid - if seqid == uint16((1<<16)-1) { - seqid = 0 - } else { - seqid++ + select { + case c.seqChan <- seqid: + if seqid == uint16((1<<16)-1) { + seqid = 0 + } else { + seqid++ + } + case <-c.doneSend: + // c.sendRequests is down and since only that function uses sequence ids (via newSequenceId method), we don't need this goroutine running anymore. + return } } } @@ -315,8 +339,19 @@ type request struct { // edits the generated code for the request you want to issue. func (c *Conn) NewRequest(buf []byte, cookie *Cookie) { seq := make(chan struct{}) - c.reqChan <- &request{buf: buf, cookie: cookie, seq: seq} - <-seq + select { + case c.reqChan <- &request{buf: buf, cookie: cookie, seq: seq}: + // request is in buffer + // wait until request is processed or connection is closed + select { + case <-seq: + // request was successfully sent to X server + case <-c.doneSend: + // c.sendRequests is down, your request was not handled + } + case <-c.doneSend: + // c.sendRequests is down, nobody is listening to your requests + } } // sendRequests is run as a single goroutine that takes requests and writes @@ -324,28 +359,45 @@ func (c *Conn) NewRequest(buf []byte, cookie *Cookie) { // It is meant to be run as its own goroutine. func (c *Conn) sendRequests() { defer close(c.cookieChan) + defer c.conn.Close() + defer close(c.doneSend) - for req := range c.reqChan { - // ho there! if the cookie channel is nearly full, force a round - // trip to clear out the cookie buffer. - // Note that we circumvent the request channel, because we're *in* - // the request channel. - if len(c.cookieChan) == cookieBuffer-1 { - if err := c.noop(); err != nil { - // Shut everything down. - break + for { + select { + case req := <-c.reqChan: + if req == nil { + // a request by c.Close() to gracefully exit + // Flush the response reading goroutine. + if err := c.noop(); err != nil { + c.conn.Close() + <-c.doneRead + } + return + } + // ho there! if the cookie channel is nearly full, force a round + // trip to clear out the cookie buffer. + // Note that we circumvent the request channel, because we're *in* + // the request channel. + if len(c.cookieChan) == cookieBuffer-1 { + if err := c.noop(); err != nil { + // Shut everything down. + c.conn.Close() + <-c.doneRead + return + } + } + req.cookie.Sequence = c.newSequenceId() + c.cookieChan <- req.cookie + if err := c.writeBuffer(req.buf); err != nil { + c.conn.Close() + <-c.doneRead + return } + close(req.seq) + case <-c.doneRead: + return } - req.cookie.Sequence = c.newSequenceId() - c.cookieChan <- req.cookie - c.writeBuffer(req.buf) - close(req.seq) } - response := make(chan struct{}) - c.closing <- response - c.noop() // Flush the response reading goroutine, ignore error. - <-response - c.conn.Close() } // noop circumvents the usual request sending goroutines and forces a round @@ -366,9 +418,8 @@ func (c *Conn) writeBuffer(buf []byte) error { if _, err := c.conn.Write(buf); err != nil { Logger.Printf("A write error is unrecoverable: %s", err) return err - } else { - return nil } + return nil } // readResponses is a goroutine that reads events, errors and @@ -382,6 +433,8 @@ func (c *Conn) writeBuffer(buf []byte) error { // Finally, cookies that came "before" this reply are always cleaned up. func (c *Conn) readResponses() { defer close(c.eventChan) + defer c.conn.Close() + defer close(c.doneRead) var ( err Error @@ -390,20 +443,18 @@ func (c *Conn) readResponses() { ) for { - select { - case respond := <-c.closing: - respond <- struct{}{} - return - default: - } - buf := make([]byte, 32) err, seq = nil, 0 if _, err := io.ReadFull(c.conn, buf); err != nil { + select { + case <-c.doneSend: + // gracefully closing + return + default: + } Logger.Printf("A read error is unrecoverable: %s", err) c.eventChan <- err - c.Close() - continue + return } switch buf[0] { case 0: // This is an error @@ -432,8 +483,7 @@ func (c *Conn) readResponses() { if _, err := io.ReadFull(c.conn, biggerBuf[32:]); err != nil { Logger.Printf("A read error is unrecoverable: %s", err) c.eventChan <- err - c.Close() - continue + return } replyBytes = biggerBuf } else { @@ -522,10 +572,14 @@ func processEventOrError(everr eventOrError) (Event, Error) { return ee, nil case Error: return nil, ee + case error: + // c.conn read error + case nil: + // c.eventChan is closed default: Logger.Printf("Invalid event/error type: %T", everr) - return nil, nil } + return nil, nil } // WaitForEvent returns the next event from the server. |
