From e4b6ba2f23c49b8dbfce7fec7170613b46b88fe3 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 20 Dec 2022 16:47:31 +0000 Subject: [PATCH] Refactor outbound queues, remove dynamic sizing, add buffer reuse Also try to reduce flakiness of `TestClusterQueueSubs` and `TestCrossAccountServiceResponseTypes` --- server/accounts_test.go | 15 ++- server/client.go | 251 +++++++++++++++++++++------------------- server/client_test.go | 231 +++++++++++++++++------------------- server/websocket.go | 8 +- test/cluster_test.go | 16 +++ 5 files changed, 263 insertions(+), 258 deletions(-) diff --git a/server/accounts_test.go b/server/accounts_test.go index cd1737f6..0142344b 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -2099,12 +2099,17 @@ func TestCrossAccountServiceResponseTypes(t *testing.T) { cfoo.parseAsync(string(mReply)) - var b [256]byte - n, err := crBar.Read(b[:]) - if err != nil { - t.Fatalf("Error reading response: %v", err) + var buf []byte + for i := 0; i < 20; i++ { + b, err := crBar.ReadBytes('\n') + if err != nil { + t.Fatalf("Error reading response: %v", err) + } + buf = append(buf[:], b...) + if mraw = msgPat.FindAllStringSubmatch(string(buf), -1); len(mraw) == 10 { + break + } } - mraw = msgPat.FindAllStringSubmatch(string(b[:n]), -1) if len(mraw) != 10 { t.Fatalf("Expected a response but got %d", len(mraw)) } diff --git a/server/client.go b/server/client.go index f00a3222..607afb47 100644 --- a/server/client.go +++ b/server/client.go @@ -294,13 +294,9 @@ type pinfo struct { // outbound holds pending data for a socket. type outbound struct { - p []byte // Primary write buffer - s []byte // Secondary for use post flush - nb net.Buffers // net.Buffers for writev IO - sz int32 // limit size per []byte, uses variable BufSize constants, start, min, max. - sws int32 // Number of short writes, used for dynamic resizing. + nb net.Buffers // Pending buffers for send, each has fixed capacity as per nbPool below. + wnb net.Buffers // Working copy of "nb", reused on each flushOutbound call, partial writes may leave entries here for next iteration. pb int64 // Total pending/queued bytes. - pm int32 // Total pending/queued messages. fsp int32 // Flush signals that are pending per producer from readLoop's pcd. sg *sync.Cond // To signal writeLoop that there is data to flush. wdl time.Duration // Snapshot of write deadline. @@ -309,6 +305,37 @@ type outbound struct { stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in. } +const nbPoolSizeSmall = 4096 // Underlying array size of small buffer +const nbPoolSizeLarge = 65536 // Underlying array size of large buffer + +var nbPoolSmall = &sync.Pool{ + New: func() any { + b := [nbPoolSizeSmall]byte{} + return &b + }, +} + +var nbPoolLarge = &sync.Pool{ + New: func() any { + b := [nbPoolSizeLarge]byte{} + return &b + }, +} + +func nbPoolPut(b []byte) { + switch cap(b) { + case nbPoolSizeSmall: + b := (*[nbPoolSizeSmall]byte)(b[0:nbPoolSizeSmall]) + nbPoolSmall.Put(b) + case nbPoolSizeLarge: + b := (*[nbPoolSizeLarge]byte)(b[0:nbPoolSizeLarge]) + nbPoolLarge.Put(b) + default: + // Ignore frames that are the wrong size, this might happen + // with WebSocket/MQTT messages as they are framed + } +} + type perm struct { allow *Sublist deny *Sublist @@ -585,7 +612,6 @@ func (c *client) initClient() { c.cid = atomic.AddUint64(&s.gcid, 1) // Outbound data structure setup - c.out.sz = startBufSize c.out.sg = sync.NewCond(&(c.mu)) opts := s.getOpts() // Snapshots to avoid mutex access in fast paths. @@ -1405,11 +1431,6 @@ func (c *client) collapsePtoNB() (net.Buffers, int64) { if c.isWebsocket() { return c.wsCollapsePtoNB() } - if c.out.p != nil { - p := c.out.p - c.out.p = nil - return append(c.out.nb, p), c.out.pb - } return c.out.nb, c.out.pb } @@ -1420,9 +1441,6 @@ func (c *client) handlePartialWrite(pnb net.Buffers) { c.ws.frames = append(pnb, c.ws.frames...) return } - nb, _ := c.collapsePtoNB() - // The partial needs to be first, so append nb to pnb - c.out.nb = append(pnb, nb...) } // flushOutbound will flush outbound buffer to a client. @@ -1446,26 +1464,37 @@ func (c *client) flushOutbound() bool { return true // true because no need to queue a signal. } - // Place primary on nb, assign primary to secondary, nil out nb and secondary. - nb, attempted := c.collapsePtoNB() - c.out.p, c.out.nb, c.out.s = c.out.s, nil, nil - if nb == nil { - return true - } + // In the case of a normal socket connection, "collapsed" is just a ref + // to "nb". In the case of WebSockets, additional framing is added to + // anything that is waiting in "nb". Also keep a note of how many bytes + // were queued before we release the mutex. + collapsed, attempted := c.collapsePtoNB() - // For selecting primary replacement. - cnb := nb - var lfs int - if len(cnb) > 0 { - lfs = len(cnb[0]) - } + // Frustratingly, (net.Buffers).WriteTo() modifies the receiver so we + // can't work on "nb" directly — while the mutex is unlocked during IO, + // something else might call queueOutbound and modify it. So instead we + // need a working copy — we'll operate on "wnb" instead. Note that in + // the case of a partial write, "wnb" may have remaining data from the + // previous write, and in the case of WebSockets, that data may already + // be framed, so we are careful not to re-frame "wnb" here. Instead we + // will just frame up "nb" and append it onto whatever is left on "wnb". + // "nb" will be reset back to its starting position so it can be modified + // safely by queueOutbound calls. + c.out.wnb = append(c.out.wnb, collapsed...) + orig := append(net.Buffers{}, c.out.wnb...) + c.out.nb = c.out.nb[:0] + + // Since WriteTo is lopping things off the beginning, we need to remember + // the start position of the underlying array so that we can get back to it. + // Otherwise we'll always "slide forward" and that will result in reallocs. + startOfWnb := c.out.wnb[0:] // In case it goes away after releasing the lock. nc := c.nc - apm := c.out.pm // Capture this (we change the value in some tests) wdl := c.out.wdl + // Do NOT hold lock during actual IO. c.mu.Unlock() @@ -1477,7 +1506,7 @@ func (c *client) flushOutbound() bool { nc.SetWriteDeadline(start.Add(wdl)) // Actual write to the socket. - n, err := nb.WriteTo(nc) + n, err := c.out.wnb.WriteTo(nc) nc.SetWriteDeadline(time.Time{}) lft := time.Since(start) @@ -1485,11 +1514,35 @@ func (c *client) flushOutbound() bool { // Re-acquire client lock. c.mu.Lock() + // At this point, "wnb" has been mutated by WriteTo and any consumed + // buffers have been lopped off the beginning, so in order to return + // them to the pool, we need to look at the difference between "orig" + // and "wnb". + for i := 0; i < len(orig)-len(c.out.wnb); i++ { + nbPoolPut(orig[i]) + } + + // At this point it's possible that "nb" has been modified by another + // call to queueOutbound while the lock was released, so we'll leave + // those for the next iteration. Meanwhile it's possible that we only + // managed a partial write of "wnb", so we'll shift anything that + // remains up to the beginning of the array to prevent reallocating. + // Anything left in "wnb" has already been framed for WebSocket conns + // so leave them alone for the next call to flushOutbound. + c.out.wnb = append(startOfWnb[:0], c.out.wnb...) + + // If we've written everything but the underlying array of our working + // buffer has grown excessively then free it — the GC will tidy it up + // and we can allocate a new one next time. + if len(c.out.wnb) == 0 && cap(c.out.wnb) > nbPoolSizeLarge*8 { + c.out.wnb = nil + } + // Ignore ErrShortWrite errors, they will be handled as partials. if err != nil && err != io.ErrShortWrite { // Handle timeout error (slow consumer) differently if ne, ok := err.(net.Error); ok && ne.Timeout() { - if closed := c.handleWriteTimeout(n, attempted, len(cnb)); closed { + if closed := c.handleWriteTimeout(n, attempted, len(c.out.nb)); closed { return true } } else { @@ -1513,43 +1566,11 @@ func (c *client) flushOutbound() bool { if c.isWebsocket() { c.ws.fs -= n } - c.out.pm -= apm // FIXME(dlc) - this will not be totally accurate on partials. // Check for partial writes // TODO(dlc) - zero write with no error will cause lost message and the writeloop to spin. if n != attempted && n > 0 { - c.handlePartialWrite(nb) - } else if int32(n) >= c.out.sz { - c.out.sws = 0 - } - - // Adjust based on what we wrote plus any pending. - pt := n + c.out.pb - - // Adjust sz as needed downward, keeping power of 2. - // We do this at a slower rate. - if pt < int64(c.out.sz) && c.out.sz > minBufSize { - c.out.sws++ - if c.out.sws > shortsToShrink { - c.out.sz >>= 1 - } - } - // Adjust sz as needed upward, keeping power of 2. - if pt > int64(c.out.sz) && c.out.sz < maxBufSize { - c.out.sz <<= 1 - } - - // Check to see if we can reuse buffers. - if lfs != 0 && n >= int64(lfs) { - oldp := cnb[0][:0] - if cap(oldp) >= int(c.out.sz) { - // Replace primary or secondary if they are nil, reusing same buffer. - if c.out.p == nil { - c.out.p = oldp - } else if c.out.s == nil || cap(c.out.s) < int(c.out.sz) { - c.out.s = oldp - } - } + c.handlePartialWrite(c.out.nb) } // Check that if there is still data to send and writeLoop is in wait, @@ -2050,6 +2071,49 @@ func (c *client) queueOutbound(data []byte) { // Add to pending bytes total. c.out.pb += int64(len(data)) + // Take a copy of the slice ref so that we can chop bits off the beginning + // without affecting the original "data" slice. + toBuffer := data + + // All of the queued []byte have a fixed capacity, so if there's a []byte + // at the tail of the buffer list that isn't full yet, we should top that + // up first. This helps to ensure we aren't pulling more []bytes from the + // pool than we need to. + if len(c.out.nb) > 0 { + last := &c.out.nb[len(c.out.nb)-1] + if free := cap(*last) - len(*last); free > 0 { + if l := len(toBuffer); l < free { + free = l + } + *last = append(*last, toBuffer[:free]...) + toBuffer = toBuffer[free:] + } + } + + // Now we can push the rest of the data into new []bytes from the pool + // in fixed size chunks. This ensures we don't go over the capacity of any + // of the buffers and end up reallocating. + for len(toBuffer) > 0 { + var new []byte + if len(c.out.nb) == 0 && len(toBuffer) <= nbPoolSizeSmall { + // If the buffer is empty, try to allocate a small buffer if the + // message will fit in it. This will help for cases like pings. + new = nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)[:0] + } else { + // If "nb" isn't empty, default to large buffers in all cases as + // this means we are always coalescing future messages into + // larger buffers. Reduces the number of buffers into writev. + new = nbPoolLarge.Get().(*[nbPoolSizeLarge]byte)[:0] + } + l := len(toBuffer) + if c := cap(new); l > c { + l = c + } + new = append(new, toBuffer[:l]...) + c.out.nb = append(c.out.nb, new) + toBuffer = toBuffer[l:] + } + // Check for slow consumer via pending bytes limit. // ok to return here, client is going away. if c.kind == CLIENT && c.out.pb > c.out.mp { @@ -2065,58 +2129,6 @@ func (c *client) queueOutbound(data []byte) { return } - if c.out.p == nil && len(data) < maxBufSize { - if c.out.sz == 0 { - c.out.sz = startBufSize - } - if c.out.s != nil && cap(c.out.s) >= int(c.out.sz) { - c.out.p = c.out.s - c.out.s = nil - } else { - // FIXME(dlc) - make power of 2 if less than maxBufSize? - c.out.p = make([]byte, 0, c.out.sz) - } - } - // Determine if we copy or reference - available := cap(c.out.p) - len(c.out.p) - if len(data) > available { - // We can't fit everything into existing primary, but message will - // fit in next one we allocate or utilize from the secondary. - // So copy what we can. - if available > 0 && len(data) < int(c.out.sz) { - c.out.p = append(c.out.p, data[:available]...) - data = data[available:] - } - // Put the primary on the nb if it has a payload - if len(c.out.p) > 0 { - c.out.nb = append(c.out.nb, c.out.p) - c.out.p = nil - } - // TODO: It was found with LeafNode and Websocket that referencing - // the data buffer when > maxBufSize would cause corruption - // (reproduced with small maxBufSize=10 and TestLeafNodeWSNoBufferCorruption). - // So always make a copy for now. - - // We will copy to primary. - if c.out.p == nil { - // Grow here - if (c.out.sz << 1) <= maxBufSize { - c.out.sz <<= 1 - } - if len(data) > int(c.out.sz) { - c.out.p = make([]byte, 0, len(data)) - } else { - if c.out.s != nil && cap(c.out.s) >= int(c.out.sz) { // TODO(dlc) - Size mismatch? - c.out.p = c.out.s - c.out.s = nil - } else { - c.out.p = make([]byte, 0, c.out.sz) - } - } - } - } - c.out.p = append(c.out.p, data...) - // Check here if we should create a stall channel if we are falling behind. // We do this here since if we wait for consumer's writeLoop it could be // too late with large number of fan in producers. @@ -3310,8 +3322,6 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su client.queueOutbound([]byte(CR_LF)) } - client.out.pm++ - // If we are tracking dynamic publish permissions that track reply subjects, // do that accounting here. We only look at client.replies which will be non-nil. if client.replies != nil && len(reply) > 0 { @@ -3326,7 +3336,7 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su // to intervene before this producer goes back to top of readloop. We are in the producer's // readloop go routine at this point. // FIXME(dlc) - We may call this alot, maybe suppress after first call? - if client.out.pm > 1 && client.out.pb > maxBufSize*2 { + if len(client.out.nb) != 0 { client.flushSignal() } @@ -4752,7 +4762,10 @@ func (c *client) flushAndClose(minimalFlush bool) { } c.flushOutbound() } - c.out.p, c.out.s = nil, nil + for i := range c.out.nb { + nbPoolPut(c.out.nb[i]) + } + c.out.nb = nil // Close the low level connection. if c.nc != nil { diff --git a/server/client_test.go b/server/client_test.go index 8285f3af..c12e4c51 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -31,7 +31,6 @@ import ( "testing" "time" - "crypto/rand" "crypto/tls" "github.com/nats-io/jwt/v2" @@ -1484,7 +1483,11 @@ func TestWildcardCharsInLiteralSubjectWorks(t *testing.T) { } } -func TestDynamicBuffers(t *testing.T) { +// This test ensures that coalescing into the fixed-size output +// queues works as expected. When bytes are queued up, they should +// not overflow a buffer until the capacity is exceeded, at which +// point a new buffer should be added. +func TestClientOutboundQueueCoalesce(t *testing.T) { opts := DefaultOptions() s := RunServer(opts) defer s.Shutdown() @@ -1495,139 +1498,114 @@ func TestDynamicBuffers(t *testing.T) { } defer nc.Close() - // Grab the client from server. - s.mu.Lock() - lc := len(s.clients) - c := s.clients[s.gcid] - s.mu.Unlock() - - if lc != 1 { - t.Fatalf("Expected only 1 client but got %d\n", lc) + clients := s.GlobalAccount().getClients() + if len(clients) != 1 { + t.Fatal("Expecting a client to exist") } - if c == nil { - t.Fatal("Expected to retrieve client\n") + client := clients[0] + client.mu.Lock() + defer client.mu.Unlock() + + // First up, queue something small into the queue. + client.queueOutbound([]byte{1, 2, 3, 4, 5}) + + if len(client.out.nb) != 1 { + t.Fatal("Expecting a single queued buffer") + } + if l := len(client.out.nb[0]); l != 5 { + t.Fatalf("Expecting only 5 bytes in the first queued buffer, found %d instead", l) } - // Create some helper functions and data structures. - done := make(chan bool) // Used to stop recording. - type maxv struct{ rsz, wsz int32 } // Used to hold max values. - results := make(chan maxv) + // Then queue up a few more bytes, but not enough + // to overflow into the next buffer. + client.queueOutbound([]byte{6, 7, 8, 9, 10}) - // stopRecording stops the recording ticker and releases go routine. - stopRecording := func() maxv { - done <- true - return <-results + if len(client.out.nb) != 1 { + t.Fatal("Expecting a single queued buffer") } - // max just grabs max values. - max := func(a, b int32) int32 { - if a > b { - return a + if l := len(client.out.nb[0]); l != 10 { + t.Fatalf("Expecting 10 bytes in the first queued buffer, found %d instead", l) + } + + // Finally, queue up something that is guaranteed + // to overflow. + b := nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)[:] + b = b[:cap(b)] + client.queueOutbound(b) + if len(client.out.nb) != 2 { + t.Fatal("Expecting buffer to have overflowed") + } + if l := len(client.out.nb[0]); l != cap(b) { + t.Fatalf("Expecting %d bytes in the first queued buffer, found %d instead", cap(b), l) + } + if l := len(client.out.nb[1]); l != 10 { + t.Fatalf("Expecting 10 bytes in the second queued buffer, found %d instead", l) + } +} + +// This test ensures that outbound queues don't cause a run on +// memory when sending something to lots of clients. +func TestClientOutboundQueueMemory(t *testing.T) { + opts := DefaultOptions() + s := RunServer(opts) + defer s.Shutdown() + + var before runtime.MemStats + var after runtime.MemStats + + var err error + clients := make([]*nats.Conn, 50000) + wait := &sync.WaitGroup{} + wait.Add(len(clients)) + + for i := 0; i < len(clients); i++ { + clients[i], err = nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s)) + if err != nil { + t.Fatalf("Error on connect: %v", err) } - return b - } - // Returns current value of the buffer sizes. - getBufferSizes := func() (int32, int32) { - c.mu.Lock() - defer c.mu.Unlock() - return c.in.rsz, c.out.sz - } - // Record the max values seen. - recordMaxBufferSizes := func() { - ticker := time.NewTicker(10 * time.Microsecond) - defer ticker.Stop() + defer clients[i].Close() - var m maxv + clients[i].Subscribe("test", func(m *nats.Msg) { + wait.Done() + }) + } - recordMax := func() { - rsz, wsz := getBufferSizes() - m.rsz = max(m.rsz, rsz) - m.wsz = max(m.wsz, wsz) + runtime.GC() + runtime.ReadMemStats(&before) + + nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port), nats.InProcessServer(s)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + var m [48000]byte + if err = nc.Publish("test", m[:]); err != nil { + t.Fatal(err) + } + + wait.Wait() + + runtime.GC() + runtime.ReadMemStats(&after) + + hb, ha := float64(before.HeapAlloc), float64(after.HeapAlloc) + ms := float64(len(m)) + diff := float64(ha) - float64(hb) + inc := (diff / float64(hb)) * 100 + + fmt.Printf("Message size: %.1fKB\n", ms/1024) + fmt.Printf("Subscribed clients: %d\n", len(clients)) + fmt.Printf("Heap allocs before: %.1fMB\n", hb/1024/1024) + fmt.Printf("Heap allocs after: %.1fMB\n", ha/1024/1024) + fmt.Printf("Heap allocs delta: %.1f%%\n", inc) + + // TODO: What threshold makes sense here for a failure? + /* + if inc > 10 { + t.Fatalf("memory increase was %.1f%% (should be <= 10%%)", inc) } - - for { - select { - case <-done: - recordMax() - results <- m - return - case <-ticker.C: - recordMax() - } - } - } - // Check that the current value is what we expected. - checkBuffers := func(ers, ews int32) { - t.Helper() - rsz, wsz := getBufferSizes() - if rsz != ers { - t.Fatalf("Expected read buffer of %d, but got %d\n", ers, rsz) - } - if wsz != ews { - t.Fatalf("Expected write buffer of %d, but got %d\n", ews, wsz) - } - } - - // Check that the max was as expected. - checkResults := func(m maxv, rsz, wsz int32) { - t.Helper() - if rsz != m.rsz { - t.Fatalf("Expected read buffer of %d, but got %d\n", rsz, m.rsz) - } - if wsz != m.wsz { - t.Fatalf("Expected write buffer of %d, but got %d\n", wsz, m.wsz) - } - } - - // Here is where testing begins.. - - // Should be at or below the startBufSize for both. - rsz, wsz := getBufferSizes() - if rsz > startBufSize { - t.Fatalf("Expected read buffer of <= %d, but got %d\n", startBufSize, rsz) - } - if wsz > startBufSize { - t.Fatalf("Expected write buffer of <= %d, but got %d\n", startBufSize, wsz) - } - - // Send some data. - data := make([]byte, 2048) - rand.Read(data) - - go recordMaxBufferSizes() - for i := 0; i < 200; i++ { - nc.Publish("foo", data) - } - nc.Flush() - m := stopRecording() - - if m.rsz != maxBufSize && m.rsz != maxBufSize/2 { - t.Fatalf("Expected read buffer of %d or %d, but got %d\n", maxBufSize, maxBufSize/2, m.rsz) - } - if m.wsz > startBufSize { - t.Fatalf("Expected write buffer of <= %d, but got %d\n", startBufSize, m.wsz) - } - - // Create Subscription to test outbound buffer from server. - nc.Subscribe("foo", func(m *nats.Msg) { - // Just eat it.. - }) - go recordMaxBufferSizes() - - for i := 0; i < 200; i++ { - nc.Publish("foo", data) - } - nc.Flush() - - m = stopRecording() - checkResults(m, maxBufSize, maxBufSize) - - // Now test that we shrink correctly. - - // Should go to minimum for both.. - for i := 0; i < 20; i++ { - nc.Flush() - } - checkBuffers(minBufSize, minBufSize) + */ } func TestClientTraceRace(t *testing.T) { @@ -2246,7 +2224,6 @@ func TestFlushOutboundNoSliceReuseIfPartial(t *testing.T) { expected.Write(buf) c.mu.Lock() c.queueOutbound(buf) - c.out.sz = 10 c.flushOutbound() fakeConn.partial = false c.mu.Unlock() diff --git a/server/websocket.go b/server/websocket.go index a884b49b..fd06c986 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -1266,13 +1266,7 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { if c.ws.browser { mfs = wsFrameSizeForBrowsers } - if len(c.out.p) > 0 { - p := c.out.p - c.out.p = nil - nb = append(c.out.nb, p) - } else if len(c.out.nb) > 0 { - nb = c.out.nb - } + nb = c.out.nb mask := c.ws.maskwrite // Start with possible already framed buffers (that we could have // got from partials or control messages such as ws pings or pongs). diff --git a/test/cluster_test.go b/test/cluster_test.go index 86ba7c0c..f8354d46 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -240,6 +240,10 @@ func TestClusterQueueSubs(t *testing.T) { sendB("PING\r\n") expectB(pongRe) + // Give plenty of time for the messages to flush, so that we don't + // accidentally only read some of them. + time.Sleep(time.Millisecond * 250) + // Should receive 5. matches = expectMsgsA(5) checkForQueueSid(t, matches, qg1SidsA) @@ -248,6 +252,10 @@ func TestClusterQueueSubs(t *testing.T) { // Send to A sendA("PUB foo 2\r\nok\r\n") + // Give plenty of time for the messages to flush, so that we don't + // accidentally only read some of them. + time.Sleep(time.Millisecond * 250) + // Should receive 5. matches = expectMsgsA(5) checkForQueueSid(t, matches, qg1SidsA) @@ -270,6 +278,10 @@ func TestClusterQueueSubs(t *testing.T) { // Send to B sendB("PUB foo 2\r\nok\r\n") + // Give plenty of time for the messages to flush, so that we don't + // accidentally only read some of them. + time.Sleep(time.Millisecond * 250) + // Should receive 1 from B. matches = expectMsgsB(1) checkForQueueSid(t, matches, qg2SidsB) @@ -308,6 +320,10 @@ func TestClusterQueueSubs(t *testing.T) { // Send to A sendA("PUB foo 2\r\nok\r\n") + // Give plenty of time for the messages to flush, so that we don't + // accidentally only read some of them. + time.Sleep(time.Millisecond * 250) + // Should receive 4 now. matches = expectMsgsA(4) checkForPubSids(t, matches, pSids)