diff --git a/server/client.go b/server/client.go index 456bc438..802b5d37 100644 --- a/server/client.go +++ b/server/client.go @@ -1008,10 +1008,19 @@ func (c *client) writeLoop() { // flushClients will make sure to flush any clients we may have // sent to during processing. We pass in a budget as a time.Duration -// for how much time to spend in place flushing for this client. This -// will normally be called in the readLoop of the client who sent the -// message that now is being delivered. +// for how much time to spend in place flushing for this client. func (c *client) flushClients(budget time.Duration) time.Time { + return c.flushClientsWithCheck(budget, false) +} + +// flushClientsWithCheck will make sure to flush any clients we may have +// sent to during processing. We pass in a budget as a time.Duration +// for how much time to spend in place flushing for this client. +// The 'clientsKindOnly' boolean indicates whether to check kind of client +// and pending client to run flushOutbound in flushClientsWithCheck. +// flushOutbound() could block the caller up to the write deadline when +// the receiving client cannot drain data from the socket fast enough. +func (c *client) flushClientsWithCheck(budget time.Duration, clientsKindOnly bool) time.Time { last := time.Now().UTC() // Check pending clients for flush. @@ -1032,7 +1041,7 @@ func (c *client) flushClients(budget time.Duration) time.Time { continue } - if budget > 0 && cp.out.lft < 2*budget && cp.flushOutbound() { + if budget > 0 && (!clientsKindOnly || c.kind == CLIENT && cp.kind == CLIENT) && cp.out.lft < 2*budget && cp.flushOutbound() { budget -= cp.out.lft } else { cp.flushSignal() @@ -1184,7 +1193,7 @@ func (c *client) readLoop(pre []byte) { } // Flush, or signal to writeLoop to flush to socket. - last := c.flushClients(budget) + last := c.flushClientsWithCheck(budget, true) // Update activity, check read buffer size. c.mu.Lock()