From 278c0efc8a8d95b8f13f87d543d9290aefe8a90b Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 5 Mar 2021 13:23:25 -0800 Subject: [PATCH] Fix check from next request msg Signed-off-by: Waldemar Quevedo --- server/consumer.go | 9 +++++++++ server/jetstream_cluster_test.go | 26 +++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/server/consumer.go b/server/consumer.go index 157ff23a..c2951107 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1036,7 +1036,16 @@ func (o *consumer) processAck(_ *subscription, c *client, subject, reply string, o.ackMsg(sseq, dseq, dc) case bytes.HasPrefix(msg, AckNext): o.ackMsg(sseq, dseq, dc) + // processNextMsgReq can be invoked from an internal subscription or from here. + // Therefore, it has to call msgParts(), so we can't simply pass msg[len(AckNext):] + // with current c.pa.hdr because it would cause a panic. We will save the current + // c.pa.hdr value and disable headers before calling processNextMsgReq and then + // restore so that we don't mess with the calling stack in case it is used + // somewhere else. + phdr := c.pa.hdr + c.pa.hdr = -1 o.processNextMsgReq(nil, c, subject, reply, msg[len(AckNext):]) + c.pa.hdr = phdr skipAckReply = true case bytes.Equal(msg, AckNak): o.processNak(sseq, dseq) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 0940c57e..876e26a6 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -4480,6 +4480,28 @@ func TestJetStreamClusterJSAPIImport(t *testing.T) { if meta.Consumer != 1 || meta.Stream != 1 || meta.Delivered != 1 || meta.Pending != 0 { t.Fatalf("Bad meta: %+v", meta) } + + js.Publish("TEST", []byte("Second")) + js.Publish("TEST", []byte("Third")) + + // Ack across accounts. + m, err = nc.Request("$JS.API.CONSUMER.MSG.NEXT.TEST.tr", []byte("+NXT"), 2*time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + meta, err = m.MetaData() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if meta.Consumer != 2 || meta.Stream != 2 || meta.Delivered != 1 || meta.Pending != 1 { + t.Fatalf("Bad meta: %+v", meta) + } + + // AckNext + _, err = nc.Request(m.Reply, []byte("+NXT"), 2*time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } } func TestJetStreamClusterSuperClusterInterestOnlyMode(t *testing.T) { @@ -4816,8 +4838,9 @@ var jsClusterImportsTempl = ` jetstream: enabled users = [ { user: "rip", pass: "pass" } ] exports [ - { service: "$JS.API.>" } + { service: "$JS.API.>", response: stream } { service: "TEST" } # For publishing to the stream. + { service: "$JS.ACK.TEST.*.>" } ] } IA { @@ -4825,6 +4848,7 @@ var jsClusterImportsTempl = ` imports [ { service: { subject: "$JS.API.>", account: JS }} { service: { subject: "TEST", account: JS }} + { service: { subject: "$JS.ACK.TEST.*.>", account: JS }} ] } $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }