mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
[FIXED] A slow consumer could cause the publisher to block
The server reads data from a client from a go routine. When receiving messages, it checks for matching subscriptions, and if found, would send those messages from the producer's readLoop. A notion of "budget" was used to make sure the server does not spend too much time sending to clients from the producer's readLoop, however, regardless of how small the budget was, if one of the subscription's connection TCP buffer was full, a TCP write would block for as long as the defined write_deadline (which is now 10 seconds). We are removing this behavior and therefore clients (like it was the case for other type of connections) will now always notify the subscriber's writeLoop that data is ready to be sent, but the send will not occur in the producer's writeLoop. Resolves #2679 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -1026,17 +1026,6 @@ func (c *client) writeLoop() {
|
||||
// sent to during processing. We pass in a budget as a time.Duration
|
||||
// for how much time to spend in place flushing for this client.
|
||||
func (c *client) flushClients(budget time.Duration) time.Time {
|
||||
return c.flushClientsWithCheck(budget, false)
|
||||
}
|
||||
|
||||
// flushClientsWithCheck will make sure to flush any clients we may have
|
||||
// sent to during processing. We pass in a budget as a time.Duration
|
||||
// for how much time to spend in place flushing for this client.
|
||||
// The 'clientsKindOnly' boolean indicates whether to check kind of client
|
||||
// and pending client to run flushOutbound in flushClientsWithCheck.
|
||||
// flushOutbound() could block the caller up to the write deadline when
|
||||
// the receiving client cannot drain data from the socket fast enough.
|
||||
func (c *client) flushClientsWithCheck(budget time.Duration, clientsKindOnly bool) time.Time {
|
||||
last := time.Now().UTC()
|
||||
|
||||
// Check pending clients for flush.
|
||||
@@ -1057,7 +1046,7 @@ func (c *client) flushClientsWithCheck(budget time.Duration, clientsKindOnly boo
|
||||
continue
|
||||
}
|
||||
|
||||
if budget > 0 && (!clientsKindOnly || c.kind == CLIENT && cp.kind == CLIENT) && cp.out.lft < 2*budget && cp.flushOutbound() {
|
||||
if budget > 0 && cp.out.lft < 2*budget && cp.flushOutbound() {
|
||||
budget -= cp.out.lft
|
||||
} else {
|
||||
cp.flushSignal()
|
||||
@@ -1199,17 +1188,8 @@ func (c *client) readLoop(pre []byte) {
|
||||
atomic.AddInt64(&s.inBytes, int64(c.in.bytes))
|
||||
}
|
||||
|
||||
// Budget to spend in place flushing outbound data.
|
||||
// Client will be checked on several fronts to see
|
||||
// if applicable. Routes and Gateways will never
|
||||
// spend time flushing outbound in place.
|
||||
var budget time.Duration
|
||||
if c.kind == CLIENT {
|
||||
budget = time.Millisecond
|
||||
}
|
||||
|
||||
// Flush, or signal to writeLoop to flush to socket.
|
||||
last := c.flushClientsWithCheck(budget, true)
|
||||
// Signal to writeLoop to flush to socket.
|
||||
last := c.flushClients(0)
|
||||
|
||||
// Update activity, check read buffer size.
|
||||
c.mu.Lock()
|
||||
|
||||
@@ -1726,15 +1726,6 @@ func (l *captureWarnLogger) Warnf(format string, v ...interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
type pauseWriteConn struct {
|
||||
net.Conn
|
||||
}
|
||||
|
||||
func (swc *pauseWriteConn) Write(b []byte) (int, error) {
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
return swc.Conn.Write(b)
|
||||
}
|
||||
|
||||
func TestReadloopWarning(t *testing.T) {
|
||||
readLoopReportThreshold = 100 * time.Millisecond
|
||||
defer func() { readLoopReportThreshold = readLoopReport }()
|
||||
@@ -1756,10 +1747,15 @@ func TestReadloopWarning(t *testing.T) {
|
||||
sender := natsConnect(t, url)
|
||||
defer sender.Close()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
c := s.getClient(cid)
|
||||
c.mu.Lock()
|
||||
c.nc = &pauseWriteConn{Conn: c.nc}
|
||||
c.mu.Unlock()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
c.mu.Unlock()
|
||||
}()
|
||||
|
||||
natsPub(t, sender, "foo", make([]byte, 100))
|
||||
natsFlush(t, sender)
|
||||
@@ -1772,6 +1768,7 @@ func TestReadloopWarning(t *testing.T) {
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("No warning printed")
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestTraceMsg(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user