From af43dd3c745875f566eb80d32232df06bd8b981a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 11 Jun 2022 14:12:05 -0700 Subject: [PATCH] When internal system messages were destined for a queue subscriber across a route the reply subject would be empty but not nil which caused mangling of the RMSG proto. Signed-off-by: Derek Collison --- server/client.go | 6 +++--- server/events_test.go | 23 +++++++++++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/server/client.go b/server/client.go index d7a8da09..37add98e 100644 --- a/server/client.go +++ b/server/client.go @@ -2954,7 +2954,7 @@ func (c *client) msgHeaderForRouteOrLeaf(subj, reply []byte, rt *routeTarget, ac mh = append(mh, ' ') if len(rt.qs) > 0 { - if reply != nil { + if len(reply) > 0 { mh = append(mh, "+ "...) // Signal that there is a reply. mh = append(mh, reply...) mh = append(mh, ' ') @@ -2962,7 +2962,7 @@ func (c *client) msgHeaderForRouteOrLeaf(subj, reply []byte, rt *routeTarget, ac mh = append(mh, "| "...) // Only queues } mh = append(mh, rt.qs...) - } else if reply != nil { + } else if len(reply) > 0 { mh = append(mh, reply...) mh = append(mh, ' ') } @@ -4180,7 +4180,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, // leaf nodes or routes even if there are no queue filters since we collect // them above and do not process inline like normal clients. // However, do select queue subs if asked to ignore empty queue filter. - if (c.kind == LEAF || c.kind == ROUTER || c.kind == GATEWAY) && qf == nil && flags&pmrIgnoreEmptyQueueFilter == 0 { + if (c.kind == LEAF || c.kind == ROUTER || c.kind == GATEWAY) && len(qf) == 0 && flags&pmrIgnoreEmptyQueueFilter == 0 { goto sendToRoutesOrLeafs } diff --git a/server/events_test.go b/server/events_test.go index e2f12b51..987510f6 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -2371,3 +2371,26 @@ func TestServerEventsFilteredByTag(t *testing.T) { require_Contains(t, string(m1.Data)+string(m2.Data), "srv-A", "srv-B", "foo", "bar", "baz") require_Len(t, len(msgs), 0) } + +// https://github.com/nats-io/nats-server/issues/3177 +func TestServerEventsAndDQSubscribers(t *testing.T) { + c := createJetStreamClusterWithTemplate(t, jsClusterAccountsTempl, "DDQ", 3) + defer c.shutdown() + + nc, err := nats.Connect(c.randomServer().ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + require_NoError(t, err) + defer nc.Close() + + sub, err := nc.QueueSubscribeSync("$SYS.ACCOUNT.*.DISCONNECT", "qq") + require_NoError(t, err) + nc.Flush() + + // Create and disconnect 10 random connections. + for i := 0; i < 10; i++ { + nc, err := nats.Connect(c.randomServer().ClientURL()) + require_NoError(t, err) + nc.Close() + } + + checkSubsPending(t, sub, 10) +}