Added in write deadline logic

This commit is contained in:
Derek Collison
2012-12-13 17:06:24 -08:00
parent 7a06c81599
commit 4f78da4c6a

View File

@@ -88,10 +88,14 @@ func (c *client) readLoop() {
cp.mu.Unlock()
if err != nil {
// FIXME, close connection?
Logf("Error flushing client connection: %v\n", err)
Debugf("Error flushing: %v", err)
}
delete(c.pcd, cp)
}
// Check to see if we got closed, e.g. slow consumer
if c.conn == nil {
return
}
}
}
@@ -353,22 +357,55 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
client.unsubscribe(sub)
return
}
// Check to see if our writes will cause a flush
// in the underlying bufio. If so limit time we
// will wait for flush to complete.
deadlineSet := false
if client.bw.Available() < (len(mh) + len(msg) + len(CR_LF)) {
client.conn.SetWriteDeadline(time.Now().Add(DEFAULT_FLUSH_DEADLINE))
deadlineSet = true
}
// Deliver to the client.
_, err := client.bw.Write(mh)
if err != nil {
Logf("Error writing msg header: %v\n", err)
goto writeErr
}
_, err = client.bw.Write(msg)
if err != nil {
Logf("Error writing msg: %v\n", err)
goto writeErr
}
// FIXME, this is already attached to original message
_, err = client.bw.WriteString(CR_LF)
if err != nil {
Logf("Error writing CRLF: %v\n", err)
goto writeErr
}
if deadlineSet {
client.conn.SetWriteDeadline(time.Time{})
}
client.mu.Unlock()
c.pcd[client] = needFlush
return
writeErr:
if deadlineSet {
client.conn.SetWriteDeadline(time.Time{})
}
client.mu.Unlock()
c.pcd[sub.client] = needFlush
if ne, ok := err.(net.Error); ok && ne.Timeout() {
// FIXME: SlowConsumer logic
Log("Slow Consumer Detected", clientConnStr(client.conn), client.cid)
client.closeConnection()
} else {
Debugf("Error writing msg: %v", err)
}
}
func (c *client) processMsg(msg []byte) {