From cd376aa7532c5424261069b49ab37dacaa48f421 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Mon, 21 Sep 2020 17:28:44 +0200 Subject: [PATCH] Do not ack to AckNxt This causes nil messages mid content stream and is confusing, the test for this is amended to test the content received for validity - where previously the test assumed any content is good content but in fact it only received half the content. Removing this behaviour until we can design this properly Signed-off-by: R.I.Pienaar --- server/consumer.go | 6 +++++- test/jetstream_test.go | 5 +++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/server/consumer.go b/server/consumer.go index dc006129..34c5e8da 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -698,12 +698,15 @@ func (o *Consumer) sendAckReply(subj string) { func (o *Consumer) processAck(_ *subscription, _ *client, subject, reply string, msg []byte) { sseq, dseq, dcount, _ := o.ReplyInfo(subject) + var skipAckReply bool + switch { case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK): o.ackMsg(sseq, dseq, dcount) case bytes.Equal(msg, AckNext): o.ackMsg(sseq, dseq, dcount) o.processNextMsgReq(nil, nil, subject, reply, nil) + skipAckReply = true case bytes.Equal(msg, AckNak): o.processNak(sseq, dseq) case bytes.Equal(msg, AckProgress): @@ -713,7 +716,7 @@ func (o *Consumer) processAck(_ *subscription, _ *client, subject, reply string, } // Ack the ack if requested. - if len(reply) > 0 { + if len(reply) > 0 && !skipAckReply { o.sendAckReply(reply) } } @@ -956,6 +959,7 @@ func (o *Consumer) ackMsg(sseq, dseq, dcount uint64) { func (o *Consumer) processAckMsg(sseq, dseq, dcount uint64, doSample bool) { var sagap uint64 + o.mu.Lock() switch o.config.AckPolicy { case AckExplicit: diff --git a/test/jetstream_test.go b/test/jetstream_test.go index c319837f..27ae0e12 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -1665,6 +1665,11 @@ func TestJetStreamWorkQueueAckAndNext(t *testing.T) { if err != nil { t.Fatalf("Unexpected error waiting for messages: %v", err) } + + if !bytes.Equal(m.Data, []byte("Hello World!")) { + t.Fatalf("Got an invalid message from the stream: %q", m.Data) + } + nc.PublishRequest(m.Reply, sub.Subject, server.AckNext) } })