mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Fix check from next request msg
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
This commit is contained in:
@@ -1036,7 +1036,16 @@ func (o *consumer) processAck(_ *subscription, c *client, subject, reply string,
|
||||
o.ackMsg(sseq, dseq, dc)
|
||||
case bytes.HasPrefix(msg, AckNext):
|
||||
o.ackMsg(sseq, dseq, dc)
|
||||
// processNextMsgReq can be invoked from an internal subscription or from here.
|
||||
// Therefore, it has to call msgParts(), so we can't simply pass msg[len(AckNext):]
|
||||
// with current c.pa.hdr because it would cause a panic. We will save the current
|
||||
// c.pa.hdr value and disable headers before calling processNextMsgReq and then
|
||||
// restore so that we don't mess with the calling stack in case it is used
|
||||
// somewhere else.
|
||||
phdr := c.pa.hdr
|
||||
c.pa.hdr = -1
|
||||
o.processNextMsgReq(nil, c, subject, reply, msg[len(AckNext):])
|
||||
c.pa.hdr = phdr
|
||||
skipAckReply = true
|
||||
case bytes.Equal(msg, AckNak):
|
||||
o.processNak(sseq, dseq)
|
||||
|
||||
@@ -4480,6 +4480,28 @@ func TestJetStreamClusterJSAPIImport(t *testing.T) {
|
||||
if meta.Consumer != 1 || meta.Stream != 1 || meta.Delivered != 1 || meta.Pending != 0 {
|
||||
t.Fatalf("Bad meta: %+v", meta)
|
||||
}
|
||||
|
||||
js.Publish("TEST", []byte("Second"))
|
||||
js.Publish("TEST", []byte("Third"))
|
||||
|
||||
// Ack across accounts.
|
||||
m, err = nc.Request("$JS.API.CONSUMER.MSG.NEXT.TEST.tr", []byte("+NXT"), 2*time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
meta, err = m.MetaData()
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if meta.Consumer != 2 || meta.Stream != 2 || meta.Delivered != 1 || meta.Pending != 1 {
|
||||
t.Fatalf("Bad meta: %+v", meta)
|
||||
}
|
||||
|
||||
// AckNext
|
||||
_, err = nc.Request(m.Reply, []byte("+NXT"), 2*time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamClusterSuperClusterInterestOnlyMode(t *testing.T) {
|
||||
@@ -4816,8 +4838,9 @@ var jsClusterImportsTempl = `
|
||||
jetstream: enabled
|
||||
users = [ { user: "rip", pass: "pass" } ]
|
||||
exports [
|
||||
{ service: "$JS.API.>" }
|
||||
{ service: "$JS.API.>", response: stream }
|
||||
{ service: "TEST" } # For publishing to the stream.
|
||||
{ service: "$JS.ACK.TEST.*.>" }
|
||||
]
|
||||
}
|
||||
IA {
|
||||
@@ -4825,6 +4848,7 @@ var jsClusterImportsTempl = `
|
||||
imports [
|
||||
{ service: { subject: "$JS.API.>", account: JS }}
|
||||
{ service: { subject: "TEST", account: JS }}
|
||||
{ service: { subject: "$JS.ACK.TEST.*.>", account: JS }}
|
||||
]
|
||||
}
|
||||
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
|
||||
|
||||
Reference in New Issue
Block a user