From dbf962f9581f5702d006d06d48aaa69b2763b787 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 23 Jul 2020 11:38:43 -0600 Subject: [PATCH] [FIXED] Queue subscriptions not able to receive system events. The issue was caused by the kind of connections being checked. Resolves #1527 Signed-off-by: Ivan Kozlovic --- server/client.go | 4 ++-- server/events_test.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/server/client.go b/server/client.go index ae6a3f7c..f63e6513 100644 --- a/server/client.go +++ b/server/client.go @@ -3567,11 +3567,11 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, // Declared here because of goto. var queues [][]byte - // For all non-client connections, we may still want to send messages to + // For all routes/leaf/gateway connections, we may still want to send messages to // leaf nodes or routes even if there are no queue filters since we collect // them above and do not process inline like normal clients. // However, do select queue subs if asked to ignore empty queue filter. - if (c.kind != CLIENT && c.kind != JETSTREAM && c.kind != ACCOUNT) && qf == nil && flags&pmrIgnoreEmptyQueueFilter == 0 { + if (c.kind == LEAF || c.kind == ROUTER || c.kind == GATEWAY) && qf == nil && flags&pmrIgnoreEmptyQueueFilter == 0 { goto sendToRoutesOrLeafs } diff --git a/server/events_test.go b/server/events_test.go index 0c72474a..43f48674 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1814,3 +1814,38 @@ func TestConnectionUpdatesTimerProperlySet(t *testing.T) { return nil }) } + +func TestServerEventsReceivedByQSubs(t *testing.T) { + s, opts := runTrustedServer(t) + defer s.Shutdown() + + acc, akp := createAccount(s) + s.setSystemAccount(acc) + + url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) + ncs, err := nats.Connect(url, createUserCreds(t, s, akp)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer ncs.Close() + + // Listen for auth error events. + qsub, _ := ncs.QueueSubscribeSync("$SYS.SERVER.*.CLIENT.AUTH.ERR", "queue") + defer qsub.Unsubscribe() + + ncs.Flush() + + nats.Connect(url, nats.Name("TEST BAD LOGIN")) + + m, err := qsub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Should have heard an auth error event") + } + dem := DisconnectEventMsg{} + if err := json.Unmarshal(m.Data, &dem); err != nil { + t.Fatalf("Error unmarshalling disconnect event message: %v", err) + } + if dem.Reason != "Authentication Failure" { + t.Fatalf("Expected auth error, got %q", dem.Reason) + } +}