diff options
| author | jEzEk <[email protected]> | 2018-10-02 16:52:41 +0200 |
|---|---|---|
| committer | jEzEk <[email protected]> | 2018-10-27 19:45:25 +0200 |
| commit | 67a5ab1e1a19c2cfc42fd9f539194dd17aafeacb (patch) | |
| tree | 1b2a4099db71c94225de7f226b1f0a72ef7f6d87 /xgb.go | |
| parent | a898cd2db36d1b9556fe79c35f67d0eb63540517 (diff) | |
Handle Conn's spawned goroutines upon close, ...
fix double close panic,
fix occaional panic on sudden connection to server close
handle all channel waitings in cookies and requests properly
Diffstat (limited to 'xgb.go')
| -rw-r--r-- | xgb.go | 162 |
1 files changed, 108 insertions, 54 deletions
@@ -60,7 +60,8 @@ type Conn struct { xidChan chan xid seqChan chan uint16 reqChan chan *request - closing chan chan struct{} + doneSend chan struct{} + doneRead chan struct{} // ExtLock is a lock used whenever new extensions are initialized. // It should not be used. It is exported for use in the extension @@ -75,11 +76,13 @@ type Conn struct { // NewConn creates a new connection instance. It initializes locks, data // structures, and performs the initial handshake. (The code for the handshake // has been relegated to conn.go.) +// It is up to user to close connection with Close() method to finish all unfinished requests and clean up spawned goroutines. +// If the connection unexpectedly closes itself and WaitForEvent() returns "nil, nil", everything is cleaned by that moment, but nothing bad happens if you call Close() after. func NewConn() (*Conn, error) { return NewConnDisplay("") } -// NewConnDisplay is just like NewConn, but allows a specific DISPLAY +// NewConnDisplay is just like NewConn (see closing instructions), but allows a specific DISPLAY // string to be used. // If 'display' is empty it will be taken from os.Getenv("DISPLAY"). // @@ -101,7 +104,7 @@ func NewConnDisplay(display string) (*Conn, error) { return postNewConn(c) } -// NewConnNet is just like NewConn, but allows a specific net.Conn +// NewConnNet is just like NewConn (see closing instructions), but allows a specific net.Conn // to be used. func NewConnNet(netConn net.Conn) (*Conn, error) { c := &Conn{} @@ -125,7 +128,8 @@ func postNewConn(c *Conn) (*Conn, error) { c.seqChan = make(chan uint16, seqBuffer) c.reqChan = make(chan *request, reqBuffer) c.eventChan = make(chan eventOrError, eventBuffer) - c.closing = make(chan chan struct{}, 1) + c.doneSend = make(chan struct{}) + c.doneRead = make(chan struct{}) go c.generateXIds() go c.generateSeqIds() @@ -136,8 +140,12 @@ func postNewConn(c *Conn) (*Conn, error) { } // Close gracefully closes the connection to the X server. +// When everything is cleaned up, the WaitForEvent method will return (nil, nil) func (c *Conn) Close() { - close(c.reqChan) + select { + case c.reqChan <- nil: + case <-c.doneSend: + } } // Event is an interface that can contain any of the events returned by the @@ -196,8 +204,12 @@ type eventOrError interface{} // If you need identifiers, use the appropriate constructor. // e.g., For a window id, use xproto.NewWindowId. For // a new pixmap id, use xproto.NewPixmapId. And so on. +// Returns (0, io.EOF) when the connection is closed. func (c *Conn) NewId() (uint32, error) { - xid := <-c.xidChan + xid, ok := <-c.xidChan + if !ok { + return 0, io.EOF + } if xid.err != nil { return 0, xid.err } @@ -240,19 +252,26 @@ func (c *Conn) generateXIds() { max := c.setupResourceIdMask last := uint32(0) for { - // TODO: Use the XC Misc extension to look for released ids. + id := xid{} if last > 0 && last >= max-inc+1 { - c.xidChan <- xid{ - id: 0, - err: errors.New("There are no more available resource" + - "identifiers."), + // TODO: Use the XC Misc extension to look for released ids. + id = xid{ + id: 0, + err: errors.New("There are no more available resource identifiers."), + } + } else { + last += inc + id = xid{ + id: last | c.setupResourceIdBase, + err: nil, } } - last += inc - c.xidChan <- xid{ - id: last | c.setupResourceIdBase, - err: nil, + select { + case c.xidChan <- id: + case <-c.doneSend: + // c.sendRequests is down and since this id is used by requests, we don't need this goroutine running anymore. + return } } } @@ -275,11 +294,16 @@ func (c *Conn) generateSeqIds() { seqid := uint16(1) for { - c.seqChan <- seqid - if seqid == uint16((1<<16)-1) { - seqid = 0 - } else { - seqid++ + select { + case c.seqChan <- seqid: + if seqid == uint16((1<<16)-1) { + seqid = 0 + } else { + seqid++ + } + case <-c.doneSend: + // c.sendRequests is down and since only that function uses sequence ids (via newSequenceId method), we don't need this goroutine running anymore. + return } } } @@ -315,8 +339,19 @@ type request struct { // edits the generated code for the request you want to issue. func (c *Conn) NewRequest(buf []byte, cookie *Cookie) { seq := make(chan struct{}) - c.reqChan <- &request{buf: buf, cookie: cookie, seq: seq} - <-seq + select { + case c.reqChan <- &request{buf: buf, cookie: cookie, seq: seq}: + // request is in buffer + // wait until request is processed or connection is closed + select { + case <-seq: + // request was successfully sent to X server + case <-c.doneSend: + // c.sendRequests is down, your request was not handled + } + case <-c.doneSend: + // c.sendRequests is down, nobody is listening to your requests + } } // sendRequests is run as a single goroutine that takes requests and writes @@ -324,28 +359,45 @@ func (c *Conn) NewRequest(buf []byte, cookie *Cookie) { // It is meant to be run as its own goroutine. func (c *Conn) sendRequests() { defer close(c.cookieChan) + defer c.conn.Close() + defer close(c.doneSend) - for req := range c.reqChan { - // ho there! if the cookie channel is nearly full, force a round - // trip to clear out the cookie buffer. - // Note that we circumvent the request channel, because we're *in* - // the request channel. - if len(c.cookieChan) == cookieBuffer-1 { - if err := c.noop(); err != nil { - // Shut everything down. - break + for { + select { + case req := <-c.reqChan: + if req == nil { + // a request by c.Close() to gracefully exit + // Flush the response reading goroutine. + if err := c.noop(); err != nil { + c.conn.Close() + <-c.doneRead + } + return + } + // ho there! if the cookie channel is nearly full, force a round + // trip to clear out the cookie buffer. + // Note that we circumvent the request channel, because we're *in* + // the request channel. + if len(c.cookieChan) == cookieBuffer-1 { + if err := c.noop(); err != nil { + // Shut everything down. + c.conn.Close() + <-c.doneRead + return + } + } + req.cookie.Sequence = c.newSequenceId() + c.cookieChan <- req.cookie + if err := c.writeBuffer(req.buf); err != nil { + c.conn.Close() + <-c.doneRead + return } + close(req.seq) + case <-c.doneRead: + return } - req.cookie.Sequence = c.newSequenceId() - c.cookieChan <- req.cookie - c.writeBuffer(req.buf) - close(req.seq) } - response := make(chan struct{}) - c.closing <- response - c.noop() // Flush the response reading goroutine, ignore error. - <-response - c.conn.Close() } // noop circumvents the usual request sending goroutines and forces a round @@ -366,9 +418,8 @@ func (c *Conn) writeBuffer(buf []byte) error { if _, err := c.conn.Write(buf); err != nil { Logger.Printf("A write error is unrecoverable: %s", err) return err - } else { - return nil } + return nil } // readResponses is a goroutine that reads events, errors and @@ -382,6 +433,8 @@ func (c *Conn) writeBuffer(buf []byte) error { // Finally, cookies that came "before" this reply are always cleaned up. func (c *Conn) readResponses() { defer close(c.eventChan) + defer c.conn.Close() + defer close(c.doneRead) var ( err Error @@ -390,20 +443,18 @@ func (c *Conn) readResponses() { ) for { - select { - case respond := <-c.closing: - respond <- struct{}{} - return - default: - } - buf := make([]byte, 32) err, seq = nil, 0 if _, err := io.ReadFull(c.conn, buf); err != nil { + select { + case <-c.doneSend: + // gracefully closing + return + default: + } Logger.Printf("A read error is unrecoverable: %s", err) c.eventChan <- err - c.Close() - continue + return } switch buf[0] { case 0: // This is an error @@ -432,8 +483,7 @@ func (c *Conn) readResponses() { if _, err := io.ReadFull(c.conn, biggerBuf[32:]); err != nil { Logger.Printf("A read error is unrecoverable: %s", err) c.eventChan <- err - c.Close() - continue + return } replyBytes = biggerBuf } else { @@ -522,10 +572,14 @@ func processEventOrError(everr eventOrError) (Event, Error) { return ee, nil case Error: return nil, ee + case error: + // c.conn read error + case nil: + // c.eventChan is closed default: Logger.Printf("Invalid event/error type: %T", everr) - return nil, nil } + return nil, nil } // WaitForEvent returns the next event from the server. |
