ensure bare AckNxt requesting >1 is supported

Signed-off-by: R.I.Pienaar <rip@devco.net>
This commit is contained in:
R.I.Pienaar
2020-10-30 13:41:48 +01:00
parent 5adce5c01c
commit 0e9ca7614f
2 changed files with 103 additions and 8 deletions

View File

@@ -158,6 +158,8 @@ var (
AckNext = []byte("+NXT")
// Terminate delivery of the message.
AckTerm = []byte("+TERM")
ackNextCnt = "+NXT %d+"
)
// Consumer is a jetstream consumer.
@@ -718,7 +720,7 @@ func (o *Consumer) processAck(_ *subscription, _ *client, subject, reply string,
switch {
case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK):
o.ackMsg(sseq, dseq, dcount)
case bytes.Equal(msg, AckNext):
case bytes.HasPrefix(msg, AckNext):
o.ackMsg(sseq, dseq, dcount)
o.processNextMsgReq(nil, nil, subject, reply, msg)
skipAckReply = true
@@ -1070,11 +1072,20 @@ func nextReqFromMsg(msg []byte) (time.Time, int, bool, error) {
}
return cr.Expires, cr.Batch, cr.NoWait, nil
}
// Naked batch size here for backward compatibility.
bs := 1
if n, err := strconv.Atoi(req); err == nil {
bs = n
// Naked batch size here for backward compatibility.
switch {
case bytes.HasPrefix(msg, AckNext):
if n, _ := fmt.Sscanf(string(msg), ackNextCnt, &bs); n == 0 {
bs = 1
}
default:
if n, err := strconv.Atoi(req); err == nil {
bs = n
}
}
return time.Time{}, bs, false, nil
}

View File

@@ -15,6 +15,7 @@ package test
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
@@ -82,6 +83,7 @@ func RunBasicJetStreamServer() *server.Server {
opts := DefaultTestOptions
opts.Port = -1
opts.JetStream = true
opts.NoLog = true
return RunServer(&opts)
}
@@ -2988,14 +2990,14 @@ func TestJetStreamConsumerAckAck(t *testing.T) {
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
rqn := o.RequestNextMsgSubject()
defer o.Delete()
rqn := o.RequestNextMsgSubject()
nc := clientConnectToServer(t, s)
defer nc.Close()
// 5 for number of ack protocols to test them all.
for i := 0; i < 5; i++ {
// 4 for number of ack protocols to test them all.
for i := 0; i < 4; i++ {
sendStreamMsg(t, nc, mname, "Hello World!")
}
@@ -3013,10 +3015,92 @@ func TestJetStreamConsumerAckAck(t *testing.T) {
testAck(server.AckAck)
testAck(server.AckNak)
testAck(server.AckProgress)
testAck(server.AckNext)
testAck(server.AckTerm)
}
func TestJetStreamNack(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mname := "ACKNXT"
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.MemoryStorage})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
o, err := mset.AddConsumer(&server.ConsumerConfig{Durable: "worker", AckPolicy: server.AckExplicit})
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
defer o.Delete()
nc := clientConnectToServer(t, s)
defer nc.Close()
for i := 0; i < 10; i++ {
sendStreamMsg(t, nc, mname, fmt.Sprintf("msg %d", i))
}
q := make(chan *nats.Msg, 10)
sub, err := nc.ChanSubscribe(nats.NewInbox(), q)
if err != nil {
t.Fatalf("SubscribeSync failed: %s", err)
}
nc.PublishRequest(o.RequestNextMsgSubject(), sub.Subject, []byte("1"))
// normal next should imply 1
msg := <-q
err = msg.RespondMsg(&nats.Msg{Reply: sub.Subject, Subject: msg.Reply, Data: server.AckNext})
if err != nil {
t.Fatalf("RespondMsg failed: %s", err)
}
// read 1 message and check ack was done etc
msg = <-q
if len(q) != 0 {
t.Fatalf("Expected empty q got %d", len(q))
}
if o.Info().AckFloor.StreamSeq != 1 {
t.Fatalf("First message was not acknowledged")
}
if !bytes.Equal(msg.Data, []byte("msg 1")) {
t.Fatalf("wrong message received, expected: msg 1 got %q", msg.Data)
}
// now ack and request 5 more
err = msg.RespondMsg(&nats.Msg{Reply: sub.Subject, Subject: msg.Reply, Data: append(server.AckNext, []byte(" 5")...)})
if err != nil {
t.Fatalf("RespondMsg failed: %s", err)
}
// check next+cnt worked and got the right amount of messages, use ctx to avoid
// sleeps when all is working
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
for i := 1; i < 6; i++ {
select {
case msg := <-q:
expect := fmt.Sprintf("msg %d", i+1)
if !bytes.Equal(msg.Data, []byte(expect)) {
t.Fatalf("wrong message received, expected: %s", expect)
}
case <-ctx.Done():
t.Fatalf("did not receive all messages")
}
}
if o.Info().AckFloor.StreamSeq != 2 {
t.Fatalf("second message was not acknowledged")
}
}
func TestJetStreamPublishDeDupe(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()