diff --git a/server/consumer.go b/server/consumer.go index dc006129..34c5e8da 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -698,12 +698,15 @@ func (o *Consumer) sendAckReply(subj string) { func (o *Consumer) processAck(_ *subscription, _ *client, subject, reply string, msg []byte) { sseq, dseq, dcount, _ := o.ReplyInfo(subject) + var skipAckReply bool + switch { case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK): o.ackMsg(sseq, dseq, dcount) case bytes.Equal(msg, AckNext): o.ackMsg(sseq, dseq, dcount) o.processNextMsgReq(nil, nil, subject, reply, nil) + skipAckReply = true case bytes.Equal(msg, AckNak): o.processNak(sseq, dseq) case bytes.Equal(msg, AckProgress): @@ -713,7 +716,7 @@ func (o *Consumer) processAck(_ *subscription, _ *client, subject, reply string, } // Ack the ack if requested. - if len(reply) > 0 { + if len(reply) > 0 && !skipAckReply { o.sendAckReply(reply) } } @@ -956,6 +959,7 @@ func (o *Consumer) ackMsg(sseq, dseq, dcount uint64) { func (o *Consumer) processAckMsg(sseq, dseq, dcount uint64, doSample bool) { var sagap uint64 + o.mu.Lock() switch o.config.AckPolicy { case AckExplicit: diff --git a/test/jetstream_test.go b/test/jetstream_test.go index c319837f..27ae0e12 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -1665,6 +1665,11 @@ func TestJetStreamWorkQueueAckAndNext(t *testing.T) { if err != nil { t.Fatalf("Unexpected error waiting for messages: %v", err) } + + if !bytes.Equal(m.Data, []byte("Hello World!")) { + t.Fatalf("Got an invalid message from the stream: %q", m.Data) + } + nc.PublishRequest(m.Reply, sub.Subject, server.AckNext) } })