mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Re-add coalescing to outbound queues
Originally I thought there was a race condition happening here, but it turns out it is safe after all and the race condition I was seeing was due to other problems in the WebSocket code. Signed-off-by: Neil Twigg <neil@nats.io>
This commit is contained in:
@@ -328,19 +328,14 @@ var nbPoolLarge = &sync.Pool{
|
||||
}
|
||||
|
||||
func nbPoolGet(sz int) []byte {
|
||||
var new []byte
|
||||
switch {
|
||||
case sz <= nbPoolSizeSmall:
|
||||
ptr := nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)
|
||||
new = ptr[:0]
|
||||
return nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)[:0]
|
||||
case sz <= nbPoolSizeMedium:
|
||||
ptr := nbPoolMedium.Get().(*[nbPoolSizeMedium]byte)
|
||||
new = ptr[:0]
|
||||
return nbPoolMedium.Get().(*[nbPoolSizeMedium]byte)[:0]
|
||||
default:
|
||||
ptr := nbPoolLarge.Get().(*[nbPoolSizeLarge]byte)
|
||||
new = ptr[:0]
|
||||
return nbPoolLarge.Get().(*[nbPoolSizeLarge]byte)[:0]
|
||||
}
|
||||
return new
|
||||
}
|
||||
|
||||
func nbPoolPut(b []byte) {
|
||||
@@ -1447,7 +1442,8 @@ func (c *client) flushOutbound() bool {
|
||||
// "nb" will be reset back to its starting position so it can be modified
|
||||
// safely by queueOutbound calls.
|
||||
c.out.wnb = append(c.out.wnb, collapsed...)
|
||||
orig := append(net.Buffers{}, c.out.wnb...)
|
||||
var _orig [1024][]byte
|
||||
orig := append(_orig[:0], c.out.wnb...)
|
||||
c.out.nb = c.out.nb[:0]
|
||||
|
||||
// Since WriteTo is lopping things off the beginning, we need to remember
|
||||
@@ -2041,6 +2037,21 @@ func (c *client) queueOutbound(data []byte) {
|
||||
// without affecting the original "data" slice.
|
||||
toBuffer := data
|
||||
|
||||
// All of the queued []byte have a fixed capacity, so if there's a []byte
|
||||
// at the tail of the buffer list that isn't full yet, we should top that
|
||||
// up first. This helps to ensure we aren't pulling more []bytes from the
|
||||
// pool than we need to.
|
||||
if len(c.out.nb) > 0 {
|
||||
last := &c.out.nb[len(c.out.nb)-1]
|
||||
if free := cap(*last) - len(*last); free > 0 {
|
||||
if l := len(toBuffer); l < free {
|
||||
free = l
|
||||
}
|
||||
*last = append(*last, toBuffer[:free]...)
|
||||
toBuffer = toBuffer[free:]
|
||||
}
|
||||
}
|
||||
|
||||
// Now we can push the rest of the data into new []bytes from the pool
|
||||
// in fixed size chunks. This ensures we don't go over the capacity of any
|
||||
// of the buffers and end up reallocating.
|
||||
|
||||
@@ -1483,6 +1483,66 @@ func TestWildcardCharsInLiteralSubjectWorks(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// This test ensures that coalescing into the fixed-size output
|
||||
// queues works as expected. When bytes are queued up, they should
|
||||
// not overflow a buffer until the capacity is exceeded, at which
|
||||
// point a new buffer should be added.
|
||||
func TestClientOutboundQueueCoalesce(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
clients := s.GlobalAccount().getClients()
|
||||
if len(clients) != 1 {
|
||||
t.Fatal("Expecting a client to exist")
|
||||
}
|
||||
client := clients[0]
|
||||
client.mu.Lock()
|
||||
defer client.mu.Unlock()
|
||||
|
||||
// First up, queue something small into the queue.
|
||||
client.queueOutbound([]byte{1, 2, 3, 4, 5})
|
||||
|
||||
if len(client.out.nb) != 1 {
|
||||
t.Fatal("Expecting a single queued buffer")
|
||||
}
|
||||
if l := len(client.out.nb[0]); l != 5 {
|
||||
t.Fatalf("Expecting only 5 bytes in the first queued buffer, found %d instead", l)
|
||||
}
|
||||
|
||||
// Then queue up a few more bytes, but not enough
|
||||
// to overflow into the next buffer.
|
||||
client.queueOutbound([]byte{6, 7, 8, 9, 10})
|
||||
|
||||
if len(client.out.nb) != 1 {
|
||||
t.Fatal("Expecting a single queued buffer")
|
||||
}
|
||||
if l := len(client.out.nb[0]); l != 10 {
|
||||
t.Fatalf("Expecting 10 bytes in the first queued buffer, found %d instead", l)
|
||||
}
|
||||
|
||||
// Finally, queue up something that is guaranteed
|
||||
// to overflow.
|
||||
b := nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)[:]
|
||||
b = b[:cap(b)]
|
||||
client.queueOutbound(b)
|
||||
if len(client.out.nb) != 2 {
|
||||
t.Fatal("Expecting buffer to have overflowed")
|
||||
}
|
||||
if l := len(client.out.nb[0]); l != cap(b) {
|
||||
t.Fatalf("Expecting %d bytes in the first queued buffer, found %d instead", cap(b), l)
|
||||
}
|
||||
if l := len(client.out.nb[1]); l != 10 {
|
||||
t.Fatalf("Expecting 10 bytes in the second queued buffer, found %d instead", l)
|
||||
}
|
||||
}
|
||||
|
||||
// This test ensures that outbound queues don't cause a run on
|
||||
// memory when sending something to lots of clients.
|
||||
func TestClientOutboundQueueMemory(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user