mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
add 'flushClientsWithCheck' to prevent block in readLoop
This commit is contained in:
@@ -981,10 +981,18 @@ func (c *client) writeLoop() {
|
||||
|
||||
// flushClients will make sure to flush any clients we may have
|
||||
// sent to during processing. We pass in a budget as a time.Duration
|
||||
// for how much time to spend in place flushing for this client. This
|
||||
// will normally be called in the readLoop of the client who sent the
|
||||
// message that now is being delivered.
|
||||
// for how much time to spend in place flushing for this client.
|
||||
func (c *client) flushClients(budget time.Duration) time.Time {
|
||||
return c.flushClientsWithCheck(budget, false)
|
||||
}
|
||||
|
||||
// flushClientsWithCheck will make sure to flush any clients we may have
|
||||
// sent to during processing. We pass in a budget as a time.Duration
|
||||
// for how much time to spend in place flushing for this client.
|
||||
// The 'clientsKindOnly' boolean indicates whether to check kind of client
|
||||
// and pending client to run flushOutbound in flushClientsWithCheck.
|
||||
// flushOutbound could be block caller when pending client is 'stuck'
|
||||
func (c *client) flushClientsWithCheck(budget time.Duration, clientsKindOnly bool) time.Time {
|
||||
last := time.Now().UTC()
|
||||
|
||||
// Check pending clients for flush.
|
||||
@@ -1005,8 +1013,7 @@ func (c *client) flushClients(budget time.Duration) time.Time {
|
||||
continue
|
||||
}
|
||||
|
||||
// Try to flush in place, if producer and consumer are client.
|
||||
if c.kind == CLIENT && cp.kind == CLIENT && budget > 0 && cp.out.lft < 2*budget && cp.flushOutbound() {
|
||||
if budget > 0 && (!clientsKindOnly || c.kind == CLIENT && cp.kind == CLIENT) && cp.out.lft < 2*budget && cp.flushOutbound() {
|
||||
budget -= cp.out.lft
|
||||
} else {
|
||||
cp.flushSignal()
|
||||
@@ -1158,7 +1165,7 @@ func (c *client) readLoop(pre []byte) {
|
||||
}
|
||||
|
||||
// Flush, or signal to writeLoop to flush to socket.
|
||||
last := c.flushClients(budget)
|
||||
last := c.flushClientsWithCheck(budget, true)
|
||||
|
||||
// Update activity, check read buffer size.
|
||||
c.mu.Lock()
|
||||
|
||||
Reference in New Issue
Block a user