diff --git a/server/consumer.go b/server/consumer.go index f72a8cb9..d55aa170 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -706,12 +706,15 @@ func (o *Consumer) sendAckReply(subj string) { func (o *Consumer) processAck(_ *subscription, _ *client, subject, reply string, msg []byte) { sseq, dseq, dcount, _ := o.ReplyInfo(subject) + var skipAckReply bool + switch { case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK): o.ackMsg(sseq, dseq, dcount) case bytes.Equal(msg, AckNext): o.ackMsg(sseq, dseq, dcount) o.processNextMsgReq(nil, nil, subject, reply, nil) + skipAckReply = true case bytes.Equal(msg, AckNak): o.processNak(sseq, dseq) case bytes.Equal(msg, AckProgress): @@ -721,7 +724,7 @@ func (o *Consumer) processAck(_ *subscription, _ *client, subject, reply string, } // Ack the ack if requested. - if len(reply) > 0 { + if len(reply) > 0 && !skipAckReply { o.sendAckReply(reply) } } @@ -964,6 +967,7 @@ func (o *Consumer) ackMsg(sseq, dseq, dcount uint64) { func (o *Consumer) processAckMsg(sseq, dseq, dcount uint64, doSample bool) { var sagap uint64 + o.mu.Lock() switch o.config.AckPolicy { case AckExplicit: diff --git a/test/jetstream_test.go b/test/jetstream_test.go index cfbd843a..936cc7ba 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -1665,6 +1665,11 @@ func TestJetStreamWorkQueueAckAndNext(t *testing.T) { if err != nil { t.Fatalf("Unexpected error waiting for messages: %v", err) } + + if !bytes.Equal(m.Data, []byte("Hello World!")) { + t.Fatalf("Got an invalid message from the stream: %q", m.Data) + } + nc.PublishRequest(m.Reply, sub.Subject, server.AckNext) } })