From 2206f9e46899e9c30e5747d31df4773c08f741b4 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 24 Apr 2023 09:44:51 +0100 Subject: [PATCH] Re-add coalescing to outbound queues Originally I thought there was a race condition happening here, but it turns out it is safe after all and the race condition I was seeing was due to other problems in the WebSocket code. Signed-off-by: Neil Twigg --- server/client.go | 29 ++++++++++++++------- server/client_test.go | 60 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 9 deletions(-) diff --git a/server/client.go b/server/client.go index 0089ae74..1852f021 100644 --- a/server/client.go +++ b/server/client.go @@ -328,19 +328,14 @@ var nbPoolLarge = &sync.Pool{ } func nbPoolGet(sz int) []byte { - var new []byte switch { case sz <= nbPoolSizeSmall: - ptr := nbPoolSmall.Get().(*[nbPoolSizeSmall]byte) - new = ptr[:0] + return nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)[:0] case sz <= nbPoolSizeMedium: - ptr := nbPoolMedium.Get().(*[nbPoolSizeMedium]byte) - new = ptr[:0] + return nbPoolMedium.Get().(*[nbPoolSizeMedium]byte)[:0] default: - ptr := nbPoolLarge.Get().(*[nbPoolSizeLarge]byte) - new = ptr[:0] + return nbPoolLarge.Get().(*[nbPoolSizeLarge]byte)[:0] } - return new } func nbPoolPut(b []byte) { @@ -1447,7 +1442,8 @@ func (c *client) flushOutbound() bool { // "nb" will be reset back to its starting position so it can be modified // safely by queueOutbound calls. c.out.wnb = append(c.out.wnb, collapsed...) - orig := append(net.Buffers{}, c.out.wnb...) + var _orig [1024][]byte + orig := append(_orig[:0], c.out.wnb...) c.out.nb = c.out.nb[:0] // Since WriteTo is lopping things off the beginning, we need to remember @@ -2041,6 +2037,21 @@ func (c *client) queueOutbound(data []byte) { // without affecting the original "data" slice. toBuffer := data + // All of the queued []byte have a fixed capacity, so if there's a []byte + // at the tail of the buffer list that isn't full yet, we should top that + // up first. This helps to ensure we aren't pulling more []bytes from the + // pool than we need to. + if len(c.out.nb) > 0 { + last := &c.out.nb[len(c.out.nb)-1] + if free := cap(*last) - len(*last); free > 0 { + if l := len(toBuffer); l < free { + free = l + } + *last = append(*last, toBuffer[:free]...) + toBuffer = toBuffer[free:] + } + } + // Now we can push the rest of the data into new []bytes from the pool // in fixed size chunks. This ensures we don't go over the capacity of any // of the buffers and end up reallocating. diff --git a/server/client_test.go b/server/client_test.go index 54bfba6c..bc1a9df9 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -1483,6 +1483,66 @@ func TestWildcardCharsInLiteralSubjectWorks(t *testing.T) { } } +// This test ensures that coalescing into the fixed-size output +// queues works as expected. When bytes are queued up, they should +// not overflow a buffer until the capacity is exceeded, at which +// point a new buffer should be added. +func TestClientOutboundQueueCoalesce(t *testing.T) { + opts := DefaultOptions() + s := RunServer(opts) + defer s.Shutdown() + + nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + clients := s.GlobalAccount().getClients() + if len(clients) != 1 { + t.Fatal("Expecting a client to exist") + } + client := clients[0] + client.mu.Lock() + defer client.mu.Unlock() + + // First up, queue something small into the queue. + client.queueOutbound([]byte{1, 2, 3, 4, 5}) + + if len(client.out.nb) != 1 { + t.Fatal("Expecting a single queued buffer") + } + if l := len(client.out.nb[0]); l != 5 { + t.Fatalf("Expecting only 5 bytes in the first queued buffer, found %d instead", l) + } + + // Then queue up a few more bytes, but not enough + // to overflow into the next buffer. + client.queueOutbound([]byte{6, 7, 8, 9, 10}) + + if len(client.out.nb) != 1 { + t.Fatal("Expecting a single queued buffer") + } + if l := len(client.out.nb[0]); l != 10 { + t.Fatalf("Expecting 10 bytes in the first queued buffer, found %d instead", l) + } + + // Finally, queue up something that is guaranteed + // to overflow. + b := nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)[:] + b = b[:cap(b)] + client.queueOutbound(b) + if len(client.out.nb) != 2 { + t.Fatal("Expecting buffer to have overflowed") + } + if l := len(client.out.nb[0]); l != cap(b) { + t.Fatalf("Expecting %d bytes in the first queued buffer, found %d instead", cap(b), l) + } + if l := len(client.out.nb[1]); l != 10 { + t.Fatalf("Expecting 10 bytes in the second queued buffer, found %d instead", l) + } +} + // This test ensures that outbound queues don't cause a run on // memory when sending something to lots of clients. func TestClientOutboundQueueMemory(t *testing.T) {