Merge pull request #1683 from ripienaar/bare_acknxt

ensure bare AckNxt requesting >1 is supported
This commit is contained in:
R.I.Pienaar
2020-10-30 16:15:20 +01:00
committed by GitHub
2 changed files with 102 additions and 8 deletions

View File

@@ -158,6 +158,8 @@ var (
AckNext = []byte("+NXT")
// Terminate delivery of the message.
AckTerm = []byte("+TERM")
ackNextCnt = "+NXT %d+"
)
// Consumer is a jetstream consumer.
@@ -718,7 +720,7 @@ func (o *Consumer) processAck(_ *subscription, _ *client, subject, reply string,
switch {
case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK):
o.ackMsg(sseq, dseq, dcount)
case bytes.Equal(msg, AckNext):
case bytes.HasPrefix(msg, AckNext):
o.ackMsg(sseq, dseq, dcount)
o.processNextMsgReq(nil, nil, subject, reply, msg)
skipAckReply = true
@@ -1070,11 +1072,20 @@ func nextReqFromMsg(msg []byte) (time.Time, int, bool, error) {
}
return cr.Expires, cr.Batch, cr.NoWait, nil
}
// Naked batch size here for backward compatibility.
bs := 1
if n, err := strconv.Atoi(req); err == nil {
bs = n
// Naked batch size here for backward compatibility.
switch {
case strings.HasPrefix(req, string(AckNext)):
if n, _ := fmt.Sscanf(req, ackNextCnt, &bs); n == 0 {
bs = 1
}
default:
if n, err := strconv.Atoi(req); err == nil {
bs = n
}
}
return time.Time{}, bs, false, nil
}

View File

@@ -15,6 +15,7 @@ package test
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
@@ -2990,14 +2991,14 @@ func TestJetStreamConsumerAckAck(t *testing.T) {
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
rqn := o.RequestNextMsgSubject()
defer o.Delete()
rqn := o.RequestNextMsgSubject()
nc := clientConnectToServer(t, s)
defer nc.Close()
// 5 for number of ack protocols to test them all.
for i := 0; i < 5; i++ {
// 4 for number of ack protocols to test them all.
for i := 0; i < 4; i++ {
sendStreamMsg(t, nc, mname, "Hello World!")
}
@@ -3015,10 +3016,92 @@ func TestJetStreamConsumerAckAck(t *testing.T) {
testAck(server.AckAck)
testAck(server.AckNak)
testAck(server.AckProgress)
testAck(server.AckNext)
testAck(server.AckTerm)
}
func TestJetStreamAckNext(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}
mname := "ACKNXT"
mset, err := s.GlobalAccount().AddStream(&server.StreamConfig{Name: mname, Storage: server.MemoryStorage})
if err != nil {
t.Fatalf("Unexpected error adding stream: %v", err)
}
defer mset.Delete()
o, err := mset.AddConsumer(&server.ConsumerConfig{Durable: "worker", AckPolicy: server.AckExplicit})
if err != nil {
t.Fatalf("Expected no error with registered interest, got %v", err)
}
defer o.Delete()
nc := clientConnectToServer(t, s)
defer nc.Close()
for i := 0; i < 10; i++ {
sendStreamMsg(t, nc, mname, fmt.Sprintf("msg %d", i))
}
q := make(chan *nats.Msg, 10)
sub, err := nc.ChanSubscribe(nats.NewInbox(), q)
if err != nil {
t.Fatalf("SubscribeSync failed: %s", err)
}
nc.PublishRequest(o.RequestNextMsgSubject(), sub.Subject, []byte("1"))
// normal next should imply 1
msg := <-q
err = msg.RespondMsg(&nats.Msg{Reply: sub.Subject, Subject: msg.Reply, Data: server.AckNext})
if err != nil {
t.Fatalf("RespondMsg failed: %s", err)
}
// read 1 message and check ack was done etc
msg = <-q
if len(q) != 0 {
t.Fatalf("Expected empty q got %d", len(q))
}
if o.Info().AckFloor.StreamSeq != 1 {
t.Fatalf("First message was not acknowledged")
}
if !bytes.Equal(msg.Data, []byte("msg 1")) {
t.Fatalf("wrong message received, expected: msg 1 got %q", msg.Data)
}
// now ack and request 5 more
err = msg.RespondMsg(&nats.Msg{Reply: sub.Subject, Subject: msg.Reply, Data: append(server.AckNext, []byte(" 5")...)})
if err != nil {
t.Fatalf("RespondMsg failed: %s", err)
}
// check next+cnt worked and got the right amount of messages, use ctx to avoid
// sleeps when all is working
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
for i := 1; i < 6; i++ {
select {
case msg := <-q:
expect := fmt.Sprintf("msg %d", i+1)
if !bytes.Equal(msg.Data, []byte(expect)) {
t.Fatalf("wrong message received, expected: %s", expect)
}
case <-ctx.Done():
t.Fatalf("did not receive all messages")
}
}
if o.Info().AckFloor.StreamSeq != 2 {
t.Fatalf("second message was not acknowledged")
}
}
func TestJetStreamPublishDeDupe(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()