Merge pull request #1601 from ripienaar/skip_ack_on_acknxt

Do not ack to AckNxt
This commit is contained in:
Derek Collison
2020-09-28 17:19:35 -07:00
committed by GitHub
2 changed files with 10 additions and 1 deletions

View File

@@ -706,12 +706,15 @@ func (o *Consumer) sendAckReply(subj string) {
func (o *Consumer) processAck(_ *subscription, _ *client, subject, reply string, msg []byte) {
sseq, dseq, dcount, _ := o.ReplyInfo(subject)
var skipAckReply bool
switch {
case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK):
o.ackMsg(sseq, dseq, dcount)
case bytes.Equal(msg, AckNext):
o.ackMsg(sseq, dseq, dcount)
o.processNextMsgReq(nil, nil, subject, reply, nil)
skipAckReply = true
case bytes.Equal(msg, AckNak):
o.processNak(sseq, dseq)
case bytes.Equal(msg, AckProgress):
@@ -721,7 +724,7 @@ func (o *Consumer) processAck(_ *subscription, _ *client, subject, reply string,
}
// Ack the ack if requested.
if len(reply) > 0 {
if len(reply) > 0 && !skipAckReply {
o.sendAckReply(reply)
}
}
@@ -964,6 +967,7 @@ func (o *Consumer) ackMsg(sseq, dseq, dcount uint64) {
func (o *Consumer) processAckMsg(sseq, dseq, dcount uint64, doSample bool) {
var sagap uint64
o.mu.Lock()
switch o.config.AckPolicy {
case AckExplicit:

View File

@@ -1665,6 +1665,11 @@ func TestJetStreamWorkQueueAckAndNext(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error waiting for messages: %v", err)
}
if !bytes.Equal(m.Data, []byte("Hello World!")) {
t.Fatalf("Got an invalid message from the stream: %q", m.Data)
}
nc.PublishRequest(m.Reply, sub.Subject, server.AckNext)
}
})