mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
[FIXED] Queue subscriptions not able to receive system events.
The issue was caused by the kind of connections being checked. Resolves #1527 Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -3567,11 +3567,11 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
|
||||
// Declared here because of goto.
|
||||
var queues [][]byte
|
||||
|
||||
// For all non-client connections, we may still want to send messages to
|
||||
// For all routes/leaf/gateway connections, we may still want to send messages to
|
||||
// leaf nodes or routes even if there are no queue filters since we collect
|
||||
// them above and do not process inline like normal clients.
|
||||
// However, do select queue subs if asked to ignore empty queue filter.
|
||||
if (c.kind != CLIENT && c.kind != JETSTREAM && c.kind != ACCOUNT) && qf == nil && flags&pmrIgnoreEmptyQueueFilter == 0 {
|
||||
if (c.kind == LEAF || c.kind == ROUTER || c.kind == GATEWAY) && qf == nil && flags&pmrIgnoreEmptyQueueFilter == 0 {
|
||||
goto sendToRoutesOrLeafs
|
||||
}
|
||||
|
||||
|
||||
@@ -1814,3 +1814,38 @@ func TestConnectionUpdatesTimerProperlySet(t *testing.T) {
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TestServerEventsReceivedByQSubs(t *testing.T) {
|
||||
s, opts := runTrustedServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
acc, akp := createAccount(s)
|
||||
s.setSystemAccount(acc)
|
||||
|
||||
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
|
||||
ncs, err := nats.Connect(url, createUserCreds(t, s, akp))
|
||||
if err != nil {
|
||||
t.Fatalf("Error on connect: %v", err)
|
||||
}
|
||||
defer ncs.Close()
|
||||
|
||||
// Listen for auth error events.
|
||||
qsub, _ := ncs.QueueSubscribeSync("$SYS.SERVER.*.CLIENT.AUTH.ERR", "queue")
|
||||
defer qsub.Unsubscribe()
|
||||
|
||||
ncs.Flush()
|
||||
|
||||
nats.Connect(url, nats.Name("TEST BAD LOGIN"))
|
||||
|
||||
m, err := qsub.NextMsg(time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Should have heard an auth error event")
|
||||
}
|
||||
dem := DisconnectEventMsg{}
|
||||
if err := json.Unmarshal(m.Data, &dem); err != nil {
|
||||
t.Fatalf("Error unmarshalling disconnect event message: %v", err)
|
||||
}
|
||||
if dem.Reason != "Authentication Failure" {
|
||||
t.Fatalf("Expected auth error, got %q", dem.Reason)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user