Refactor outbound queues, remove dynamic sizing, add buffer reuse

Also try to reduce flakiness of `TestClusterQueueSubs` and `TestCrossAccountServiceResponseTypes`
This commit is contained in:
Neil Twigg
2022-12-20 16:47:31 +00:00
parent c0784bc363
commit f2bffec366
5 changed files with 263 additions and 258 deletions

View File

@@ -2099,12 +2099,17 @@ func TestCrossAccountServiceResponseTypes(t *testing.T) {
cfoo.parseAsync(string(mReply))
var b [256]byte
n, err := crBar.Read(b[:])
if err != nil {
t.Fatalf("Error reading response: %v", err)
var buf []byte
for i := 0; i < 20; i++ {
b, err := crBar.ReadBytes('\n')
if err != nil {
t.Fatalf("Error reading response: %v", err)
}
buf = append(buf[:], b...)
if mraw = msgPat.FindAllStringSubmatch(string(buf), -1); len(mraw) == 10 {
break
}
}
mraw = msgPat.FindAllStringSubmatch(string(b[:n]), -1)
if len(mraw) != 10 {
t.Fatalf("Expected a response but got %d", len(mraw))
}

View File

@@ -293,13 +293,9 @@ type pinfo struct {
// outbound holds pending data for a socket.
type outbound struct {
p []byte // Primary write buffer
s []byte // Secondary for use post flush
nb net.Buffers // net.Buffers for writev IO
sz int32 // limit size per []byte, uses variable BufSize constants, start, min, max.
sws int32 // Number of short writes, used for dynamic resizing.
nb net.Buffers // Pending buffers for send, each has fixed capacity as per nbPool below.
wnb net.Buffers // Working copy of "nb", reused on each flushOutbound call, partial writes may leave entries here for next iteration.
pb int64 // Total pending/queued bytes.
pm int32 // Total pending/queued messages.
fsp int32 // Flush signals that are pending per producer from readLoop's pcd.
sg *sync.Cond // To signal writeLoop that there is data to flush.
wdl time.Duration // Snapshot of write deadline.
@@ -308,6 +304,37 @@ type outbound struct {
stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in.
}
const nbPoolSizeSmall = 4096 // Underlying array size of small buffer
const nbPoolSizeLarge = 65536 // Underlying array size of large buffer
var nbPoolSmall = &sync.Pool{
New: func() any {
b := [nbPoolSizeSmall]byte{}
return &b
},
}
var nbPoolLarge = &sync.Pool{
New: func() any {
b := [nbPoolSizeLarge]byte{}
return &b
},
}
func nbPoolPut(b []byte) {
switch cap(b) {
case nbPoolSizeSmall:
b := (*[nbPoolSizeSmall]byte)(b[0:nbPoolSizeSmall])
nbPoolSmall.Put(b)
case nbPoolSizeLarge:
b := (*[nbPoolSizeLarge]byte)(b[0:nbPoolSizeLarge])
nbPoolLarge.Put(b)
default:
// Ignore frames that are the wrong size, this might happen
// with WebSocket/MQTT messages as they are framed
}
}
type perm struct {
allow *Sublist
deny *Sublist
@@ -584,7 +611,6 @@ func (c *client) initClient() {
c.cid = atomic.AddUint64(&s.gcid, 1)
// Outbound data structure setup
c.out.sz = startBufSize
c.out.sg = sync.NewCond(&(c.mu))
opts := s.getOpts()
// Snapshots to avoid mutex access in fast paths.
@@ -1344,11 +1370,6 @@ func (c *client) collapsePtoNB() (net.Buffers, int64) {
if c.isWebsocket() {
return c.wsCollapsePtoNB()
}
if c.out.p != nil {
p := c.out.p
c.out.p = nil
return append(c.out.nb, p), c.out.pb
}
return c.out.nb, c.out.pb
}
@@ -1359,9 +1380,6 @@ func (c *client) handlePartialWrite(pnb net.Buffers) {
c.ws.frames = append(pnb, c.ws.frames...)
return
}
nb, _ := c.collapsePtoNB()
// The partial needs to be first, so append nb to pnb
c.out.nb = append(pnb, nb...)
}
// flushOutbound will flush outbound buffer to a client.
@@ -1385,26 +1403,37 @@ func (c *client) flushOutbound() bool {
return true // true because no need to queue a signal.
}
// Place primary on nb, assign primary to secondary, nil out nb and secondary.
nb, attempted := c.collapsePtoNB()
c.out.p, c.out.nb, c.out.s = c.out.s, nil, nil
if nb == nil {
return true
}
// In the case of a normal socket connection, "collapsed" is just a ref
// to "nb". In the case of WebSockets, additional framing is added to
// anything that is waiting in "nb". Also keep a note of how many bytes
// were queued before we release the mutex.
collapsed, attempted := c.collapsePtoNB()
// For selecting primary replacement.
cnb := nb
var lfs int
if len(cnb) > 0 {
lfs = len(cnb[0])
}
// Frustratingly, (net.Buffers).WriteTo() modifies the receiver so we
// can't work on "nb" directly — while the mutex is unlocked during IO,
// something else might call queueOutbound and modify it. So instead we
// need a working copy — we'll operate on "wnb" instead. Note that in
// the case of a partial write, "wnb" may have remaining data from the
// previous write, and in the case of WebSockets, that data may already
// be framed, so we are careful not to re-frame "wnb" here. Instead we
// will just frame up "nb" and append it onto whatever is left on "wnb".
// "nb" will be reset back to its starting position so it can be modified
// safely by queueOutbound calls.
c.out.wnb = append(c.out.wnb, collapsed...)
orig := append(net.Buffers{}, c.out.wnb...)
c.out.nb = c.out.nb[:0]
// Since WriteTo is lopping things off the beginning, we need to remember
// the start position of the underlying array so that we can get back to it.
// Otherwise we'll always "slide forward" and that will result in reallocs.
startOfWnb := c.out.wnb[0:]
// In case it goes away after releasing the lock.
nc := c.nc
apm := c.out.pm
// Capture this (we change the value in some tests)
wdl := c.out.wdl
// Do NOT hold lock during actual IO.
c.mu.Unlock()
@@ -1416,7 +1445,7 @@ func (c *client) flushOutbound() bool {
nc.SetWriteDeadline(start.Add(wdl))
// Actual write to the socket.
n, err := nb.WriteTo(nc)
n, err := c.out.wnb.WriteTo(nc)
nc.SetWriteDeadline(time.Time{})
lft := time.Since(start)
@@ -1424,11 +1453,35 @@ func (c *client) flushOutbound() bool {
// Re-acquire client lock.
c.mu.Lock()
// At this point, "wnb" has been mutated by WriteTo and any consumed
// buffers have been lopped off the beginning, so in order to return
// them to the pool, we need to look at the difference between "orig"
// and "wnb".
for i := 0; i < len(orig)-len(c.out.wnb); i++ {
nbPoolPut(orig[i])
}
// At this point it's possible that "nb" has been modified by another
// call to queueOutbound while the lock was released, so we'll leave
// those for the next iteration. Meanwhile it's possible that we only
// managed a partial write of "wnb", so we'll shift anything that
// remains up to the beginning of the array to prevent reallocating.
// Anything left in "wnb" has already been framed for WebSocket conns
// so leave them alone for the next call to flushOutbound.
c.out.wnb = append(startOfWnb[:0], c.out.wnb...)
// If we've written everything but the underlying array of our working
// buffer has grown excessively then free it — the GC will tidy it up
// and we can allocate a new one next time.
if len(c.out.wnb) == 0 && cap(c.out.wnb) > nbPoolSizeLarge*8 {
c.out.wnb = nil
}
// Ignore ErrShortWrite errors, they will be handled as partials.
if err != nil && err != io.ErrShortWrite {
// Handle timeout error (slow consumer) differently
if ne, ok := err.(net.Error); ok && ne.Timeout() {
if closed := c.handleWriteTimeout(n, attempted, len(cnb)); closed {
if closed := c.handleWriteTimeout(n, attempted, len(c.out.nb)); closed {
return true
}
} else {
@@ -1452,43 +1505,11 @@ func (c *client) flushOutbound() bool {
if c.isWebsocket() {
c.ws.fs -= n
}
c.out.pm -= apm // FIXME(dlc) - this will not be totally accurate on partials.
// Check for partial writes
// TODO(dlc) - zero write with no error will cause lost message and the writeloop to spin.
if n != attempted && n > 0 {
c.handlePartialWrite(nb)
} else if int32(n) >= c.out.sz {
c.out.sws = 0
}
// Adjust based on what we wrote plus any pending.
pt := n + c.out.pb
// Adjust sz as needed downward, keeping power of 2.
// We do this at a slower rate.
if pt < int64(c.out.sz) && c.out.sz > minBufSize {
c.out.sws++
if c.out.sws > shortsToShrink {
c.out.sz >>= 1
}
}
// Adjust sz as needed upward, keeping power of 2.
if pt > int64(c.out.sz) && c.out.sz < maxBufSize {
c.out.sz <<= 1
}
// Check to see if we can reuse buffers.
if lfs != 0 && n >= int64(lfs) {
oldp := cnb[0][:0]
if cap(oldp) >= int(c.out.sz) {
// Replace primary or secondary if they are nil, reusing same buffer.
if c.out.p == nil {
c.out.p = oldp
} else if c.out.s == nil || cap(c.out.s) < int(c.out.sz) {
c.out.s = oldp
}
}
c.handlePartialWrite(c.out.nb)
}
// Check that if there is still data to send and writeLoop is in wait,
@@ -1989,6 +2010,49 @@ func (c *client) queueOutbound(data []byte) {
// Add to pending bytes total.
c.out.pb += int64(len(data))
// Take a copy of the slice ref so that we can chop bits off the beginning
// without affecting the original "data" slice.
toBuffer := data
// All of the queued []byte have a fixed capacity, so if there's a []byte
// at the tail of the buffer list that isn't full yet, we should top that
// up first. This helps to ensure we aren't pulling more []bytes from the
// pool than we need to.
if len(c.out.nb) > 0 {
last := &c.out.nb[len(c.out.nb)-1]
if free := cap(*last) - len(*last); free > 0 {
if l := len(toBuffer); l < free {
free = l
}
*last = append(*last, toBuffer[:free]...)
toBuffer = toBuffer[free:]
}
}
// Now we can push the rest of the data into new []bytes from the pool
// in fixed size chunks. This ensures we don't go over the capacity of any
// of the buffers and end up reallocating.
for len(toBuffer) > 0 {
var new []byte
if len(c.out.nb) == 0 && len(toBuffer) <= nbPoolSizeSmall {
// If the buffer is empty, try to allocate a small buffer if the
// message will fit in it. This will help for cases like pings.
new = nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)[:0]
} else {
// If "nb" isn't empty, default to large buffers in all cases as
// this means we are always coalescing future messages into
// larger buffers. Reduces the number of buffers into writev.
new = nbPoolLarge.Get().(*[nbPoolSizeLarge]byte)[:0]
}
l := len(toBuffer)
if c := cap(new); l > c {
l = c
}
new = append(new, toBuffer[:l]...)
c.out.nb = append(c.out.nb, new)
toBuffer = toBuffer[l:]
}
// Check for slow consumer via pending bytes limit.
// ok to return here, client is going away.
if c.kind == CLIENT && c.out.pb > c.out.mp {
@@ -2004,58 +2068,6 @@ func (c *client) queueOutbound(data []byte) {
return
}
if c.out.p == nil && len(data) < maxBufSize {
if c.out.sz == 0 {
c.out.sz = startBufSize
}
if c.out.s != nil && cap(c.out.s) >= int(c.out.sz) {
c.out.p = c.out.s
c.out.s = nil
} else {
// FIXME(dlc) - make power of 2 if less than maxBufSize?
c.out.p = make([]byte, 0, c.out.sz)
}
}
// Determine if we copy or reference
available := cap(c.out.p) - len(c.out.p)
if len(data) > available {
// We can't fit everything into existing primary, but message will
// fit in next one we allocate or utilize from the secondary.
// So copy what we can.
if available > 0 && len(data) < int(c.out.sz) {
c.out.p = append(c.out.p, data[:available]...)
data = data[available:]
}
// Put the primary on the nb if it has a payload
if len(c.out.p) > 0 {
c.out.nb = append(c.out.nb, c.out.p)
c.out.p = nil
}
// TODO: It was found with LeafNode and Websocket that referencing
// the data buffer when > maxBufSize would cause corruption
// (reproduced with small maxBufSize=10 and TestLeafNodeWSNoBufferCorruption).
// So always make a copy for now.
// We will copy to primary.
if c.out.p == nil {
// Grow here
if (c.out.sz << 1) <= maxBufSize {
c.out.sz <<= 1
}
if len(data) > int(c.out.sz) {
c.out.p = make([]byte, 0, len(data))
} else {
if c.out.s != nil && cap(c.out.s) >= int(c.out.sz) { // TODO(dlc) - Size mismatch?
c.out.p = c.out.s
c.out.s = nil
} else {
c.out.p = make([]byte, 0, c.out.sz)
}
}
}
}
c.out.p = append(c.out.p, data...)
// Check here if we should create a stall channel if we are falling behind.
// We do this here since if we wait for consumer's writeLoop it could be
// too late with large number of fan in producers.
@@ -3249,8 +3261,6 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su
client.queueOutbound([]byte(CR_LF))
}
client.out.pm++
// If we are tracking dynamic publish permissions that track reply subjects,
// do that accounting here. We only look at client.replies which will be non-nil.
if client.replies != nil && len(reply) > 0 {
@@ -3265,7 +3275,7 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su
// to intervene before this producer goes back to top of readloop. We are in the producer's
// readloop go routine at this point.
// FIXME(dlc) - We may call this alot, maybe suppress after first call?
if client.out.pm > 1 && client.out.pb > maxBufSize*2 {
if len(client.out.nb) != 0 {
client.flushSignal()
}
@@ -4672,7 +4682,10 @@ func (c *client) flushAndClose(minimalFlush bool) {
}
c.flushOutbound()
}
c.out.p, c.out.s = nil, nil
for i := range c.out.nb {
nbPoolPut(c.out.nb[i])
}
c.out.nb = nil
// Close the low level connection.
if c.nc != nil {

View File

@@ -31,7 +31,6 @@ import (
"testing"
"time"
"crypto/rand"
"crypto/tls"
"github.com/nats-io/jwt/v2"
@@ -1484,7 +1483,11 @@ func TestWildcardCharsInLiteralSubjectWorks(t *testing.T) {
}
}
func TestDynamicBuffers(t *testing.T) {
// This test ensures that coalescing into the fixed-size output
// queues works as expected. When bytes are queued up, they should
// not overflow a buffer until the capacity is exceeded, at which
// point a new buffer should be added.
func TestClientOutboundQueueCoalesce(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
@@ -1495,139 +1498,114 @@ func TestDynamicBuffers(t *testing.T) {
}
defer nc.Close()
// Grab the client from server.
s.mu.Lock()
lc := len(s.clients)
c := s.clients[s.gcid]
s.mu.Unlock()
if lc != 1 {
t.Fatalf("Expected only 1 client but got %d\n", lc)
clients := s.GlobalAccount().getClients()
if len(clients) != 1 {
t.Fatal("Expecting a client to exist")
}
if c == nil {
t.Fatal("Expected to retrieve client\n")
client := clients[0]
client.mu.Lock()
defer client.mu.Unlock()
// First up, queue something small into the queue.
client.queueOutbound([]byte{1, 2, 3, 4, 5})
if len(client.out.nb) != 1 {
t.Fatal("Expecting a single queued buffer")
}
if l := len(client.out.nb[0]); l != 5 {
t.Fatalf("Expecting only 5 bytes in the first queued buffer, found %d instead", l)
}
// Create some helper functions and data structures.
done := make(chan bool) // Used to stop recording.
type maxv struct{ rsz, wsz int32 } // Used to hold max values.
results := make(chan maxv)
// Then queue up a few more bytes, but not enough
// to overflow into the next buffer.
client.queueOutbound([]byte{6, 7, 8, 9, 10})
// stopRecording stops the recording ticker and releases go routine.
stopRecording := func() maxv {
done <- true
return <-results
if len(client.out.nb) != 1 {
t.Fatal("Expecting a single queued buffer")
}
// max just grabs max values.
max := func(a, b int32) int32 {
if a > b {
return a
if l := len(client.out.nb[0]); l != 10 {
t.Fatalf("Expecting 10 bytes in the first queued buffer, found %d instead", l)
}
// Finally, queue up something that is guaranteed
// to overflow.
b := nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)[:]
b = b[:cap(b)]
client.queueOutbound(b)
if len(client.out.nb) != 2 {
t.Fatal("Expecting buffer to have overflowed")
}
if l := len(client.out.nb[0]); l != cap(b) {
t.Fatalf("Expecting %d bytes in the first queued buffer, found %d instead", cap(b), l)
}
if l := len(client.out.nb[1]); l != 10 {
t.Fatalf("Expecting 10 bytes in the second queued buffer, found %d instead", l)
}
}
// This test ensures that outbound queues don't cause a run on
// memory when sending something to lots of clients.
func TestClientOutboundQueueMemory(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()
var before runtime.MemStats
var after runtime.MemStats
var err error
clients := make([]*nats.Conn, 50000)
wait := &sync.WaitGroup{}
wait.Add(len(clients))
for i := 0; i < len(clients); i++ {
clients[i], err = nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
return b
}
// Returns current value of the buffer sizes.
getBufferSizes := func() (int32, int32) {
c.mu.Lock()
defer c.mu.Unlock()
return c.in.rsz, c.out.sz
}
// Record the max values seen.
recordMaxBufferSizes := func() {
ticker := time.NewTicker(10 * time.Microsecond)
defer ticker.Stop()
defer clients[i].Close()
var m maxv
clients[i].Subscribe("test", func(m *nats.Msg) {
wait.Done()
})
}
recordMax := func() {
rsz, wsz := getBufferSizes()
m.rsz = max(m.rsz, rsz)
m.wsz = max(m.wsz, wsz)
runtime.GC()
runtime.ReadMemStats(&before)
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
var m [48000]byte
if err = nc.Publish("test", m[:]); err != nil {
t.Fatal(err)
}
wait.Wait()
runtime.GC()
runtime.ReadMemStats(&after)
hb, ha := float64(before.HeapAlloc), float64(after.HeapAlloc)
ms := float64(len(m))
diff := float64(ha) - float64(hb)
inc := (diff / float64(hb)) * 100
fmt.Printf("Message size: %.1fKB\n", ms/1024)
fmt.Printf("Subscribed clients: %d\n", len(clients))
fmt.Printf("Heap allocs before: %.1fMB\n", hb/1024/1024)
fmt.Printf("Heap allocs after: %.1fMB\n", ha/1024/1024)
fmt.Printf("Heap allocs delta: %.1f%%\n", inc)
// TODO: What threshold makes sense here for a failure?
/*
if inc > 10 {
t.Fatalf("memory increase was %.1f%% (should be <= 10%%)", inc)
}
for {
select {
case <-done:
recordMax()
results <- m
return
case <-ticker.C:
recordMax()
}
}
}
// Check that the current value is what we expected.
checkBuffers := func(ers, ews int32) {
t.Helper()
rsz, wsz := getBufferSizes()
if rsz != ers {
t.Fatalf("Expected read buffer of %d, but got %d\n", ers, rsz)
}
if wsz != ews {
t.Fatalf("Expected write buffer of %d, but got %d\n", ews, wsz)
}
}
// Check that the max was as expected.
checkResults := func(m maxv, rsz, wsz int32) {
t.Helper()
if rsz != m.rsz {
t.Fatalf("Expected read buffer of %d, but got %d\n", rsz, m.rsz)
}
if wsz != m.wsz {
t.Fatalf("Expected write buffer of %d, but got %d\n", wsz, m.wsz)
}
}
// Here is where testing begins..
// Should be at or below the startBufSize for both.
rsz, wsz := getBufferSizes()
if rsz > startBufSize {
t.Fatalf("Expected read buffer of <= %d, but got %d\n", startBufSize, rsz)
}
if wsz > startBufSize {
t.Fatalf("Expected write buffer of <= %d, but got %d\n", startBufSize, wsz)
}
// Send some data.
data := make([]byte, 2048)
rand.Read(data)
go recordMaxBufferSizes()
for i := 0; i < 200; i++ {
nc.Publish("foo", data)
}
nc.Flush()
m := stopRecording()
if m.rsz != maxBufSize && m.rsz != maxBufSize/2 {
t.Fatalf("Expected read buffer of %d or %d, but got %d\n", maxBufSize, maxBufSize/2, m.rsz)
}
if m.wsz > startBufSize {
t.Fatalf("Expected write buffer of <= %d, but got %d\n", startBufSize, m.wsz)
}
// Create Subscription to test outbound buffer from server.
nc.Subscribe("foo", func(m *nats.Msg) {
// Just eat it..
})
go recordMaxBufferSizes()
for i := 0; i < 200; i++ {
nc.Publish("foo", data)
}
nc.Flush()
m = stopRecording()
checkResults(m, maxBufSize, maxBufSize)
// Now test that we shrink correctly.
// Should go to minimum for both..
for i := 0; i < 20; i++ {
nc.Flush()
}
checkBuffers(minBufSize, minBufSize)
*/
}
func TestClientTraceRace(t *testing.T) {
@@ -2246,7 +2224,6 @@ func TestFlushOutboundNoSliceReuseIfPartial(t *testing.T) {
expected.Write(buf)
c.mu.Lock()
c.queueOutbound(buf)
c.out.sz = 10
c.flushOutbound()
fakeConn.partial = false
c.mu.Unlock()

View File

@@ -1266,13 +1266,7 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
if c.ws.browser {
mfs = wsFrameSizeForBrowsers
}
if len(c.out.p) > 0 {
p := c.out.p
c.out.p = nil
nb = append(c.out.nb, p)
} else if len(c.out.nb) > 0 {
nb = c.out.nb
}
nb = c.out.nb
mask := c.ws.maskwrite
// Start with possible already framed buffers (that we could have
// got from partials or control messages such as ws pings or pongs).

View File

@@ -240,6 +240,10 @@ func TestClusterQueueSubs(t *testing.T) {
sendB("PING\r\n")
expectB(pongRe)
// Give plenty of time for the messages to flush, so that we don't
// accidentally only read some of them.
time.Sleep(time.Millisecond * 250)
// Should receive 5.
matches = expectMsgsA(5)
checkForQueueSid(t, matches, qg1SidsA)
@@ -248,6 +252,10 @@ func TestClusterQueueSubs(t *testing.T) {
// Send to A
sendA("PUB foo 2\r\nok\r\n")
// Give plenty of time for the messages to flush, so that we don't
// accidentally only read some of them.
time.Sleep(time.Millisecond * 250)
// Should receive 5.
matches = expectMsgsA(5)
checkForQueueSid(t, matches, qg1SidsA)
@@ -270,6 +278,10 @@ func TestClusterQueueSubs(t *testing.T) {
// Send to B
sendB("PUB foo 2\r\nok\r\n")
// Give plenty of time for the messages to flush, so that we don't
// accidentally only read some of them.
time.Sleep(time.Millisecond * 250)
// Should receive 1 from B.
matches = expectMsgsB(1)
checkForQueueSid(t, matches, qg2SidsB)
@@ -308,6 +320,10 @@ func TestClusterQueueSubs(t *testing.T) {
// Send to A
sendA("PUB foo 2\r\nok\r\n")
// Give plenty of time for the messages to flush, so that we don't
// accidentally only read some of them.
time.Sleep(time.Millisecond * 250)
// Should receive 4 now.
matches = expectMsgsA(4)
checkForPubSids(t, matches, pSids)