diff --git a/server/consumer.go b/server/consumer.go index ddc9776d..32c95f83 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -158,6 +158,8 @@ var ( AckNext = []byte("+NXT") // Terminate delivery of the message. AckTerm = []byte("+TERM") + + ackNextCnt = "+NXT %d+" ) // Consumer is a jetstream consumer. @@ -718,7 +720,7 @@ func (o *Consumer) processAck(_ *subscription, _ *client, subject, reply string, switch { case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK): o.ackMsg(sseq, dseq, dcount) - case bytes.Equal(msg, AckNext): + case bytes.HasPrefix(msg, AckNext): o.ackMsg(sseq, dseq, dcount) o.processNextMsgReq(nil, nil, subject, reply, msg) skipAckReply = true @@ -1070,11 +1072,20 @@ func nextReqFromMsg(msg []byte) (time.Time, int, bool, error) { } return cr.Expires, cr.Batch, cr.NoWait, nil } - // Naked batch size here for backward compatibility. + bs := 1 - if n, err := strconv.Atoi(req); err == nil { - bs = n + // Naked batch size here for backward compatibility. + switch { + case strings.HasPrefix(req, string(AckNext)): + if n, _ := fmt.Sscanf(req, ackNextCnt, &bs); n == 0 { + bs = 1 + } + default: + if n, err := strconv.Atoi(req); err == nil { + bs = n + } } + return time.Time{}, bs, false, nil } diff --git a/test/jetstream_test.go b/test/jetstream_test.go index cfb4517e..5ec8371d 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -15,6 +15,7 @@ package test import ( "bytes" + "context" "encoding/json" "fmt" "io/ioutil" @@ -2990,14 +2991,14 @@ func TestJetStreamConsumerAckAck(t *testing.T) { if err != nil { t.Fatalf("Expected no error with registered interest, got %v", err) } - rqn := o.RequestNextMsgSubject() defer o.Delete() + rqn := o.RequestNextMsgSubject() nc := clientConnectToServer(t, s) defer nc.Close() - // 5 for number of ack protocols to test them all. - for i := 0; i < 5; i++ { + // 4 for number of ack protocols to test them all. + for i := 0; i < 4; i++ { sendStreamMsg(t, nc, mname, "Hello World!") } @@ -3015,10 +3016,92 @@ func TestJetStreamConsumerAckAck(t *testing.T) { testAck(server.AckAck) testAck(server.AckNak) testAck(server.AckProgress) - testAck(server.AckNext) testAck(server.AckTerm) } +func TestJetStreamAckNext(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + mname := "ACKNXT" + mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.MemoryStorage}) + if err != nil { + t.Fatalf("Unexpected error adding stream: %v", err) + } + defer mset.Delete() + + o, err := mset.AddConsumer(&server.ConsumerConfig{Durable: "worker", AckPolicy: server.AckExplicit}) + if err != nil { + t.Fatalf("Expected no error with registered interest, got %v", err) + } + defer o.Delete() + + nc := clientConnectToServer(t, s) + defer nc.Close() + + for i := 0; i < 10; i++ { + sendStreamMsg(t, nc, mname, fmt.Sprintf("msg %d", i)) + } + + q := make(chan *nats.Msg, 10) + sub, err := nc.ChanSubscribe(nats.NewInbox(), q) + if err != nil { + t.Fatalf("SubscribeSync failed: %s", err) + } + + nc.PublishRequest(o.RequestNextMsgSubject(), sub.Subject, []byte("1")) + + // normal next should imply 1 + msg := <-q + err = msg.RespondMsg(&nats.Msg{Reply: sub.Subject, Subject: msg.Reply, Data: server.AckNext}) + if err != nil { + t.Fatalf("RespondMsg failed: %s", err) + } + + // read 1 message and check ack was done etc + msg = <-q + if len(q) != 0 { + t.Fatalf("Expected empty q got %d", len(q)) + } + if o.Info().AckFloor.StreamSeq != 1 { + t.Fatalf("First message was not acknowledged") + } + if !bytes.Equal(msg.Data, []byte("msg 1")) { + t.Fatalf("wrong message received, expected: msg 1 got %q", msg.Data) + } + + // now ack and request 5 more + err = msg.RespondMsg(&nats.Msg{Reply: sub.Subject, Subject: msg.Reply, Data: append(server.AckNext, []byte(" 5")...)}) + if err != nil { + t.Fatalf("RespondMsg failed: %s", err) + } + + // check next+cnt worked and got the right amount of messages, use ctx to avoid + // sleeps when all is working + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + for i := 1; i < 6; i++ { + select { + case msg := <-q: + expect := fmt.Sprintf("msg %d", i+1) + if !bytes.Equal(msg.Data, []byte(expect)) { + t.Fatalf("wrong message received, expected: %s", expect) + } + case <-ctx.Done(): + t.Fatalf("did not receive all messages") + } + } + + if o.Info().AckFloor.StreamSeq != 2 { + t.Fatalf("second message was not acknowledged") + } +} + func TestJetStreamPublishDeDupe(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown()