Merge pull request #2684 from nats-io/fix_2679

[FIXED] A slow consumer could cause the publisher to block
This commit is contained in:
Ivan Kozlovic
2021-11-10 09:33:25 -07:00
committed by GitHub
2 changed files with 11 additions and 34 deletions

View File

@@ -1026,17 +1026,6 @@ func (c *client) writeLoop() {
// sent to during processing. We pass in a budget as a time.Duration
// for how much time to spend in place flushing for this client.
func (c *client) flushClients(budget time.Duration) time.Time {
return c.flushClientsWithCheck(budget, false)
}
// flushClientsWithCheck will make sure to flush any clients we may have
// sent to during processing. We pass in a budget as a time.Duration
// for how much time to spend in place flushing for this client.
// The 'clientsKindOnly' boolean indicates whether to check kind of client
// and pending client to run flushOutbound in flushClientsWithCheck.
// flushOutbound() could block the caller up to the write deadline when
// the receiving client cannot drain data from the socket fast enough.
func (c *client) flushClientsWithCheck(budget time.Duration, clientsKindOnly bool) time.Time {
last := time.Now().UTC()
// Check pending clients for flush.
@@ -1057,7 +1046,7 @@ func (c *client) flushClientsWithCheck(budget time.Duration, clientsKindOnly boo
continue
}
if budget > 0 && (!clientsKindOnly || c.kind == CLIENT && cp.kind == CLIENT) && cp.out.lft < 2*budget && cp.flushOutbound() {
if budget > 0 && cp.out.lft < 2*budget && cp.flushOutbound() {
budget -= cp.out.lft
} else {
cp.flushSignal()
@@ -1199,17 +1188,8 @@ func (c *client) readLoop(pre []byte) {
atomic.AddInt64(&s.inBytes, int64(c.in.bytes))
}
// Budget to spend in place flushing outbound data.
// Client will be checked on several fronts to see
// if applicable. Routes and Gateways will never
// spend time flushing outbound in place.
var budget time.Duration
if c.kind == CLIENT {
budget = time.Millisecond
}
// Flush, or signal to writeLoop to flush to socket.
last := c.flushClientsWithCheck(budget, true)
// Signal to writeLoop to flush to socket.
last := c.flushClients(0)
// Update activity, check read buffer size.
c.mu.Lock()

View File

@@ -1726,15 +1726,6 @@ func (l *captureWarnLogger) Warnf(format string, v ...interface{}) {
}
}
type pauseWriteConn struct {
net.Conn
}
func (swc *pauseWriteConn) Write(b []byte) (int, error) {
time.Sleep(250 * time.Millisecond)
return swc.Conn.Write(b)
}
func TestReadloopWarning(t *testing.T) {
readLoopReportThreshold = 100 * time.Millisecond
defer func() { readLoopReportThreshold = readLoopReport }()
@@ -1756,10 +1747,15 @@ func TestReadloopWarning(t *testing.T) {
sender := natsConnect(t, url)
defer sender.Close()
wg := sync.WaitGroup{}
wg.Add(1)
c := s.getClient(cid)
c.mu.Lock()
c.nc = &pauseWriteConn{Conn: c.nc}
c.mu.Unlock()
go func() {
defer wg.Done()
time.Sleep(250 * time.Millisecond)
c.mu.Unlock()
}()
natsPub(t, sender, "foo", make([]byte, 100))
natsFlush(t, sender)
@@ -1772,6 +1768,7 @@ func TestReadloopWarning(t *testing.T) {
case <-time.After(2 * time.Second):
t.Fatalf("No warning printed")
}
wg.Wait()
}
func TestTraceMsg(t *testing.T) {