When internal system messages were destined for a queue subscriber across a route the reply subject would be empty but not nil which caused mangling of the RMSG proto.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-06-11 14:12:05 -07:00
parent 0794eafa6f
commit af43dd3c74
2 changed files with 26 additions and 3 deletions

View File

@@ -2954,7 +2954,7 @@ func (c *client) msgHeaderForRouteOrLeaf(subj, reply []byte, rt *routeTarget, ac
mh = append(mh, ' ')
if len(rt.qs) > 0 {
if reply != nil {
if len(reply) > 0 {
mh = append(mh, "+ "...) // Signal that there is a reply.
mh = append(mh, reply...)
mh = append(mh, ' ')
@@ -2962,7 +2962,7 @@ func (c *client) msgHeaderForRouteOrLeaf(subj, reply []byte, rt *routeTarget, ac
mh = append(mh, "| "...) // Only queues
}
mh = append(mh, rt.qs...)
} else if reply != nil {
} else if len(reply) > 0 {
mh = append(mh, reply...)
mh = append(mh, ' ')
}
@@ -4180,7 +4180,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
// leaf nodes or routes even if there are no queue filters since we collect
// them above and do not process inline like normal clients.
// However, do select queue subs if asked to ignore empty queue filter.
if (c.kind == LEAF || c.kind == ROUTER || c.kind == GATEWAY) && qf == nil && flags&pmrIgnoreEmptyQueueFilter == 0 {
if (c.kind == LEAF || c.kind == ROUTER || c.kind == GATEWAY) && len(qf) == 0 && flags&pmrIgnoreEmptyQueueFilter == 0 {
goto sendToRoutesOrLeafs
}

View File

@@ -2371,3 +2371,26 @@ func TestServerEventsFilteredByTag(t *testing.T) {
require_Contains(t, string(m1.Data)+string(m2.Data), "srv-A", "srv-B", "foo", "bar", "baz")
require_Len(t, len(msgs), 0)
}
// https://github.com/nats-io/nats-server/issues/3177
func TestServerEventsAndDQSubscribers(t *testing.T) {
c := createJetStreamClusterWithTemplate(t, jsClusterAccountsTempl, "DDQ", 3)
defer c.shutdown()
nc, err := nats.Connect(c.randomServer().ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
defer nc.Close()
sub, err := nc.QueueSubscribeSync("$SYS.ACCOUNT.*.DISCONNECT", "qq")
require_NoError(t, err)
nc.Flush()
// Create and disconnect 10 random connections.
for i := 0; i < 10; i++ {
nc, err := nats.Connect(c.randomServer().ClientURL())
require_NoError(t, err)
nc.Close()
}
checkSubsPending(t, sub, 10)
}