diff --git a/server/client.go b/server/client.go index 812b3b73..c5a7b694 100644 --- a/server/client.go +++ b/server/client.go @@ -1026,17 +1026,6 @@ func (c *client) writeLoop() { // sent to during processing. We pass in a budget as a time.Duration // for how much time to spend in place flushing for this client. func (c *client) flushClients(budget time.Duration) time.Time { - return c.flushClientsWithCheck(budget, false) -} - -// flushClientsWithCheck will make sure to flush any clients we may have -// sent to during processing. We pass in a budget as a time.Duration -// for how much time to spend in place flushing for this client. -// The 'clientsKindOnly' boolean indicates whether to check kind of client -// and pending client to run flushOutbound in flushClientsWithCheck. -// flushOutbound() could block the caller up to the write deadline when -// the receiving client cannot drain data from the socket fast enough. -func (c *client) flushClientsWithCheck(budget time.Duration, clientsKindOnly bool) time.Time { last := time.Now().UTC() // Check pending clients for flush. @@ -1057,7 +1046,7 @@ func (c *client) flushClientsWithCheck(budget time.Duration, clientsKindOnly boo continue } - if budget > 0 && (!clientsKindOnly || c.kind == CLIENT && cp.kind == CLIENT) && cp.out.lft < 2*budget && cp.flushOutbound() { + if budget > 0 && cp.out.lft < 2*budget && cp.flushOutbound() { budget -= cp.out.lft } else { cp.flushSignal() @@ -1199,17 +1188,8 @@ func (c *client) readLoop(pre []byte) { atomic.AddInt64(&s.inBytes, int64(c.in.bytes)) } - // Budget to spend in place flushing outbound data. - // Client will be checked on several fronts to see - // if applicable. Routes and Gateways will never - // spend time flushing outbound in place. - var budget time.Duration - if c.kind == CLIENT { - budget = time.Millisecond - } - - // Flush, or signal to writeLoop to flush to socket. - last := c.flushClientsWithCheck(budget, true) + // Signal to writeLoop to flush to socket. + last := c.flushClients(0) // Update activity, check read buffer size. c.mu.Lock() diff --git a/server/client_test.go b/server/client_test.go index dfe03843..8a5aa6af 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -1726,15 +1726,6 @@ func (l *captureWarnLogger) Warnf(format string, v ...interface{}) { } } -type pauseWriteConn struct { - net.Conn -} - -func (swc *pauseWriteConn) Write(b []byte) (int, error) { - time.Sleep(250 * time.Millisecond) - return swc.Conn.Write(b) -} - func TestReadloopWarning(t *testing.T) { readLoopReportThreshold = 100 * time.Millisecond defer func() { readLoopReportThreshold = readLoopReport }() @@ -1756,10 +1747,15 @@ func TestReadloopWarning(t *testing.T) { sender := natsConnect(t, url) defer sender.Close() + wg := sync.WaitGroup{} + wg.Add(1) c := s.getClient(cid) c.mu.Lock() - c.nc = &pauseWriteConn{Conn: c.nc} - c.mu.Unlock() + go func() { + defer wg.Done() + time.Sleep(250 * time.Millisecond) + c.mu.Unlock() + }() natsPub(t, sender, "foo", make([]byte, 100)) natsFlush(t, sender) @@ -1772,6 +1768,7 @@ func TestReadloopWarning(t *testing.T) { case <-time.After(2 * time.Second): t.Fatalf("No warning printed") } + wg.Wait() } func TestTraceMsg(t *testing.T) {