mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Merge pull request #1298 from nats-io/fix_nobuf_reuse_on_partial
[FIXED] Incorrect buffer reuse in case of partial connection write
This commit is contained in:
@@ -1035,6 +1035,10 @@ func (c *client) flushOutbound() bool {
|
||||
|
||||
// For selecting primary replacement.
|
||||
cnb := nb
|
||||
var lfs int
|
||||
if len(cnb) > 0 {
|
||||
lfs = len(cnb[0])
|
||||
}
|
||||
|
||||
// In case it goes away after releasing the lock.
|
||||
nc := c.nc
|
||||
@@ -1112,7 +1116,7 @@ func (c *client) flushOutbound() bool {
|
||||
}
|
||||
|
||||
// Check to see if we can reuse buffers.
|
||||
if len(cnb) > 0 {
|
||||
if lfs != 0 && n >= int64(lfs) {
|
||||
oldp := cnb[0][:0]
|
||||
if cap(oldp) >= int(c.out.sz) {
|
||||
// Replace primary or secondary if they are nil, reusing same buffer.
|
||||
@@ -1562,6 +1566,9 @@ func (c *client) queueOutbound(data []byte) bool {
|
||||
// Check for slow consumer via pending bytes limit.
|
||||
// ok to return here, client is going away.
|
||||
if c.kind == CLIENT && c.out.pb > c.out.mp {
|
||||
// Perf wise, it looks like it is faster to optimistically add than
|
||||
// checking current pb+len(data) and then add to pb.
|
||||
c.out.pb -= int64(len(data))
|
||||
atomic.AddInt64(&c.srv.slowConsumers, 1)
|
||||
c.Noticef("Slow Consumer Detected: MaxPending of %d Exceeded", c.out.mp)
|
||||
c.markConnAsClosed(SlowConsumerPendingBytes, true)
|
||||
|
||||
@@ -1916,3 +1916,77 @@ func TestClientIPv6Address(t *testing.T) {
|
||||
t.Fatalf("Wrong string representation of an IPv6 address: %q", ncs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPBNotIncreasedOnMaxPending(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.MaxPending = 100
|
||||
s := &Server{opts: opts}
|
||||
c := &client{srv: s}
|
||||
c.initClient()
|
||||
|
||||
c.mu.Lock()
|
||||
c.queueOutbound(make([]byte, 200))
|
||||
pb := c.out.pb
|
||||
c.mu.Unlock()
|
||||
|
||||
if pb != 0 {
|
||||
t.Fatalf("c.out.pb should be 0, got %v", pb)
|
||||
}
|
||||
}
|
||||
|
||||
type testConnWritePartial struct {
|
||||
net.Conn
|
||||
partial bool
|
||||
buf bytes.Buffer
|
||||
}
|
||||
|
||||
func (c *testConnWritePartial) Write(p []byte) (int, error) {
|
||||
n := len(p)
|
||||
if c.partial {
|
||||
n = 15
|
||||
}
|
||||
return c.buf.Write(p[:n])
|
||||
}
|
||||
|
||||
func (c *testConnWritePartial) SetWriteDeadline(_ time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestFlushOutboundNoSliceReuseIfPartial(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
opts.MaxPending = 1024
|
||||
s := &Server{opts: opts}
|
||||
|
||||
fakeConn := &testConnWritePartial{partial: true}
|
||||
c := &client{srv: s, nc: fakeConn}
|
||||
c.initClient()
|
||||
|
||||
bufs := [][]byte{
|
||||
[]byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ"),
|
||||
[]byte("------"),
|
||||
[]byte("0123456789"),
|
||||
}
|
||||
expected := bytes.Buffer{}
|
||||
for _, buf := range bufs {
|
||||
expected.Write(buf)
|
||||
c.mu.Lock()
|
||||
c.queueOutbound(buf)
|
||||
c.out.sz = 10
|
||||
c.flushOutbound()
|
||||
fakeConn.partial = false
|
||||
c.mu.Unlock()
|
||||
}
|
||||
// Ensure everything is flushed.
|
||||
for done := false; !done; {
|
||||
c.mu.Lock()
|
||||
if c.out.pb > 0 {
|
||||
c.flushOutbound()
|
||||
} else {
|
||||
done = true
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
if !bytes.Equal(expected.Bytes(), fakeConn.buf.Bytes()) {
|
||||
t.Fatalf("Expected\n%q\ngot\n%q", expected.String(), fakeConn.buf.String())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user