mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Moving TestQueueAutoUnsubscribe to norace_test.go
This test has been found to cause TestAccountNATSResolverFetch to fail on macOS. We did not find the exact reason yet, but it seem that with `-race`, the queue auto-unsub test (that creates 2,000 queue subs and sends 1,000 messages) cause mem to grow to 256MB (which we know -race is memory hungry) and that may be causing interactions with the account resolver test. For now, moving it to norace_test.go, which consumes much less memory (25MB) and anyway is a better place since it would stress better the "races" of having a queue sub being unsubscribed while messages were inflight to this queue sub. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -1586,65 +1586,6 @@ func TestDynamicBuffers(t *testing.T) {
|
||||
checkBuffers(minBufSize, minBufSize)
|
||||
}
|
||||
|
||||
// Similar to the routed version. Make sure we receive all of the
|
||||
// messages with auto-unsubscribe enabled.
|
||||
func TestQueueAutoUnsubscribe(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
rbar := int32(0)
|
||||
barCb := func(m *nats.Msg) {
|
||||
atomic.AddInt32(&rbar, 1)
|
||||
}
|
||||
rbaz := int32(0)
|
||||
bazCb := func(m *nats.Msg) {
|
||||
atomic.AddInt32(&rbaz, 1)
|
||||
}
|
||||
|
||||
// Create 1000 subscriptions with auto-unsubscribe of 1.
|
||||
// Do two groups, one bar and one baz.
|
||||
for i := 0; i < 1000; i++ {
|
||||
qsub, err := nc.QueueSubscribe("foo", "bar", barCb)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on subscribe: %v", err)
|
||||
}
|
||||
if err := qsub.AutoUnsubscribe(1); err != nil {
|
||||
t.Fatalf("Error on auto-unsubscribe: %v", err)
|
||||
}
|
||||
qsub, err = nc.QueueSubscribe("foo", "baz", bazCb)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on subscribe: %v", err)
|
||||
}
|
||||
if err := qsub.AutoUnsubscribe(1); err != nil {
|
||||
t.Fatalf("Error on auto-unsubscribe: %v", err)
|
||||
}
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
expected := int32(1000)
|
||||
for i := int32(0); i < expected; i++ {
|
||||
nc.Publish("foo", []byte("Don't Drop Me!"))
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
checkFor(t, 5*time.Second, 10*time.Millisecond, func() error {
|
||||
nbar := atomic.LoadInt32(&rbar)
|
||||
nbaz := atomic.LoadInt32(&rbaz)
|
||||
if nbar == expected && nbaz == expected {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Did not receive all %d queue messages, received %d for 'bar' and %d for 'baz'",
|
||||
expected, atomic.LoadInt32(&rbar), atomic.LoadInt32(&rbaz))
|
||||
})
|
||||
}
|
||||
|
||||
func TestClientTraceRace(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
s := RunServer(opts)
|
||||
|
||||
@@ -918,3 +918,63 @@ func TestNoRaceLeafNodeClusterNameConflictDeadlock(t *testing.T) {
|
||||
func TestNoRaceAccountAddServiceImportRace(t *testing.T) {
|
||||
TestAccountAddServiceImportRace(t)
|
||||
}
|
||||
|
||||
// Similar to the routed version. Make sure we receive all of the
|
||||
// messages with auto-unsubscribe enabled.
|
||||
func TestNoRaceQueueAutoUnsubscribe(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
rbar := int32(0)
|
||||
barCb := func(m *nats.Msg) {
|
||||
atomic.AddInt32(&rbar, 1)
|
||||
}
|
||||
rbaz := int32(0)
|
||||
bazCb := func(m *nats.Msg) {
|
||||
atomic.AddInt32(&rbaz, 1)
|
||||
}
|
||||
|
||||
// Create 1000 subscriptions with auto-unsubscribe of 1.
|
||||
// Do two groups, one bar and one baz.
|
||||
total := 1000
|
||||
for i := 0; i < total; i++ {
|
||||
qsub, err := nc.QueueSubscribe("foo", "bar", barCb)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on subscribe: %v", err)
|
||||
}
|
||||
if err := qsub.AutoUnsubscribe(1); err != nil {
|
||||
t.Fatalf("Error on auto-unsubscribe: %v", err)
|
||||
}
|
||||
qsub, err = nc.QueueSubscribe("foo", "baz", bazCb)
|
||||
if err != nil {
|
||||
t.Fatalf("Error on subscribe: %v", err)
|
||||
}
|
||||
if err := qsub.AutoUnsubscribe(1); err != nil {
|
||||
t.Fatalf("Error on auto-unsubscribe: %v", err)
|
||||
}
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
expected := int32(total)
|
||||
for i := int32(0); i < expected; i++ {
|
||||
nc.Publish("foo", []byte("Don't Drop Me!"))
|
||||
}
|
||||
nc.Flush()
|
||||
|
||||
checkFor(t, 5*time.Second, 10*time.Millisecond, func() error {
|
||||
nbar := atomic.LoadInt32(&rbar)
|
||||
nbaz := atomic.LoadInt32(&rbaz)
|
||||
if nbar == expected && nbaz == expected {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Did not receive all %d queue messages, received %d for 'bar' and %d for 'baz'",
|
||||
expected, atomic.LoadInt32(&rbar), atomic.LoadInt32(&rbaz))
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user