mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #2093 from shkim-will/flush_client_only
[CHANGED] Flush in place only if producer and consumer are client connections
This commit is contained in:
@@ -1008,10 +1008,19 @@ func (c *client) writeLoop() {
|
||||
|
||||
// flushClients will make sure to flush any clients we may have
|
||||
// sent to during processing. We pass in a budget as a time.Duration
|
||||
// for how much time to spend in place flushing for this client. This
|
||||
// will normally be called in the readLoop of the client who sent the
|
||||
// message that now is being delivered.
|
||||
// for how much time to spend in place flushing for this client.
|
||||
func (c *client) flushClients(budget time.Duration) time.Time {
|
||||
return c.flushClientsWithCheck(budget, false)
|
||||
}
|
||||
|
||||
// flushClientsWithCheck will make sure to flush any clients we may have
|
||||
// sent to during processing. We pass in a budget as a time.Duration
|
||||
// for how much time to spend in place flushing for this client.
|
||||
// The 'clientsKindOnly' boolean indicates whether to check kind of client
|
||||
// and pending client to run flushOutbound in flushClientsWithCheck.
|
||||
// flushOutbound() could block the caller up to the write deadline when
|
||||
// the receiving client cannot drain data from the socket fast enough.
|
||||
func (c *client) flushClientsWithCheck(budget time.Duration, clientsKindOnly bool) time.Time {
|
||||
last := time.Now().UTC()
|
||||
|
||||
// Check pending clients for flush.
|
||||
@@ -1032,7 +1041,7 @@ func (c *client) flushClients(budget time.Duration) time.Time {
|
||||
continue
|
||||
}
|
||||
|
||||
if budget > 0 && cp.out.lft < 2*budget && cp.flushOutbound() {
|
||||
if budget > 0 && (!clientsKindOnly || c.kind == CLIENT && cp.kind == CLIENT) && cp.out.lft < 2*budget && cp.flushOutbound() {
|
||||
budget -= cp.out.lft
|
||||
} else {
|
||||
cp.flushSignal()
|
||||
@@ -1184,7 +1193,7 @@ func (c *client) readLoop(pre []byte) {
|
||||
}
|
||||
|
||||
// Flush, or signal to writeLoop to flush to socket.
|
||||
last := c.flushClients(budget)
|
||||
last := c.flushClientsWithCheck(budget, true)
|
||||
|
||||
// Update activity, check read buffer size.
|
||||
c.mu.Lock()
|
||||
|
||||
Reference in New Issue
Block a user