From 27d1a50b359f50ddbdb60696b6dd01f2f5fa9854 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 9 Nov 2021 17:22:15 -0700 Subject: [PATCH] [FIXED] A slow consumer could cause the publisher to block The server reads data from a client from a go routine. When receiving messages, it checks for matching subscriptions, and if found, would send those messages from the producer's readLoop. A notion of "budget" was used to make sure the server does not spend too much time sending to clients from the producer's readLoop, however, regardless of how small the budget was, if one of the subscription's connection TCP buffer was full, a TCP write would block for as long as the defined write_deadline (which is now 10 seconds). We are removing this behavior and therefore clients (like it was the case for other type of connections) will now always notify the subscriber's writeLoop that data is ready to be sent, but the send will not occur in the producer's writeLoop. Resolves #2679 Signed-off-by: Ivan Kozlovic --- server/client.go | 26 +++----------------------- server/client_test.go | 19 ++++++++----------- 2 files changed, 11 insertions(+), 34 deletions(-) diff --git a/server/client.go b/server/client.go index 812b3b73..c5a7b694 100644 --- a/server/client.go +++ b/server/client.go @@ -1026,17 +1026,6 @@ func (c *client) writeLoop() { // sent to during processing. We pass in a budget as a time.Duration // for how much time to spend in place flushing for this client. func (c *client) flushClients(budget time.Duration) time.Time { - return c.flushClientsWithCheck(budget, false) -} - -// flushClientsWithCheck will make sure to flush any clients we may have -// sent to during processing. We pass in a budget as a time.Duration -// for how much time to spend in place flushing for this client. -// The 'clientsKindOnly' boolean indicates whether to check kind of client -// and pending client to run flushOutbound in flushClientsWithCheck. -// flushOutbound() could block the caller up to the write deadline when -// the receiving client cannot drain data from the socket fast enough. -func (c *client) flushClientsWithCheck(budget time.Duration, clientsKindOnly bool) time.Time { last := time.Now().UTC() // Check pending clients for flush. @@ -1057,7 +1046,7 @@ func (c *client) flushClientsWithCheck(budget time.Duration, clientsKindOnly boo continue } - if budget > 0 && (!clientsKindOnly || c.kind == CLIENT && cp.kind == CLIENT) && cp.out.lft < 2*budget && cp.flushOutbound() { + if budget > 0 && cp.out.lft < 2*budget && cp.flushOutbound() { budget -= cp.out.lft } else { cp.flushSignal() @@ -1199,17 +1188,8 @@ func (c *client) readLoop(pre []byte) { atomic.AddInt64(&s.inBytes, int64(c.in.bytes)) } - // Budget to spend in place flushing outbound data. - // Client will be checked on several fronts to see - // if applicable. Routes and Gateways will never - // spend time flushing outbound in place. - var budget time.Duration - if c.kind == CLIENT { - budget = time.Millisecond - } - - // Flush, or signal to writeLoop to flush to socket. - last := c.flushClientsWithCheck(budget, true) + // Signal to writeLoop to flush to socket. + last := c.flushClients(0) // Update activity, check read buffer size. c.mu.Lock() diff --git a/server/client_test.go b/server/client_test.go index dfe03843..8a5aa6af 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -1726,15 +1726,6 @@ func (l *captureWarnLogger) Warnf(format string, v ...interface{}) { } } -type pauseWriteConn struct { - net.Conn -} - -func (swc *pauseWriteConn) Write(b []byte) (int, error) { - time.Sleep(250 * time.Millisecond) - return swc.Conn.Write(b) -} - func TestReadloopWarning(t *testing.T) { readLoopReportThreshold = 100 * time.Millisecond defer func() { readLoopReportThreshold = readLoopReport }() @@ -1756,10 +1747,15 @@ func TestReadloopWarning(t *testing.T) { sender := natsConnect(t, url) defer sender.Close() + wg := sync.WaitGroup{} + wg.Add(1) c := s.getClient(cid) c.mu.Lock() - c.nc = &pauseWriteConn{Conn: c.nc} - c.mu.Unlock() + go func() { + defer wg.Done() + time.Sleep(250 * time.Millisecond) + c.mu.Unlock() + }() natsPub(t, sender, "foo", make([]byte, 100)) natsFlush(t, sender) @@ -1772,6 +1768,7 @@ func TestReadloopWarning(t *testing.T) { case <-time.After(2 * time.Second): t.Fatalf("No warning printed") } + wg.Wait() } func TestTraceMsg(t *testing.T) {