diff --git a/server/client.go b/server/client.go index 1e884472..aad516ef 100644 --- a/server/client.go +++ b/server/client.go @@ -88,10 +88,14 @@ func (c *client) readLoop() { cp.mu.Unlock() if err != nil { // FIXME, close connection? - Logf("Error flushing client connection: %v\n", err) + Debugf("Error flushing: %v", err) } delete(c.pcd, cp) } + // Check to see if we got closed, e.g. slow consumer + if c.conn == nil { + return + } } } @@ -353,22 +357,55 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { client.unsubscribe(sub) return } + + // Check to see if our writes will cause a flush + // in the underlying bufio. If so limit time we + // will wait for flush to complete. + + deadlineSet := false + if client.bw.Available() < (len(mh) + len(msg) + len(CR_LF)) { + client.conn.SetWriteDeadline(time.Now().Add(DEFAULT_FLUSH_DEADLINE)) + deadlineSet = true + } + // Deliver to the client. _, err := client.bw.Write(mh) if err != nil { - Logf("Error writing msg header: %v\n", err) + goto writeErr } + _, err = client.bw.Write(msg) if err != nil { - Logf("Error writing msg: %v\n", err) + goto writeErr } + // FIXME, this is already attached to original message _, err = client.bw.WriteString(CR_LF) if err != nil { - Logf("Error writing CRLF: %v\n", err) + goto writeErr + } + + if deadlineSet { + client.conn.SetWriteDeadline(time.Time{}) + } + + client.mu.Unlock() + c.pcd[client] = needFlush + return + +writeErr: + if deadlineSet { + client.conn.SetWriteDeadline(time.Time{}) } client.mu.Unlock() - c.pcd[sub.client] = needFlush + + if ne, ok := err.(net.Error); ok && ne.Timeout() { + // FIXME: SlowConsumer logic + Log("Slow Consumer Detected", clientConnStr(client.conn), client.cid) + client.closeConnection() + } else { + Debugf("Error writing msg: %v", err) + } } func (c *client) processMsg(msg []byte) {