Merge pull request #1689 from ripienaar/acknxt_struct

support next structure in acknxt
This commit is contained in:
R.I.Pienaar
2020-11-02 17:58:46 +01:00
committed by GitHub
2 changed files with 38 additions and 33 deletions

View File

@@ -158,8 +158,6 @@ var (
AckNext = []byte("+NXT")
// Terminate delivery of the message.
AckTerm = []byte("+TERM")
ackNextCnt = "+NXT %d+"
)
// Consumer is a jetstream consumer.
@@ -722,7 +720,7 @@ func (o *Consumer) processAck(_ *subscription, _ *client, subject, reply string,
o.ackMsg(sseq, dseq, dcount)
case bytes.HasPrefix(msg, AckNext):
o.ackMsg(sseq, dseq, dcount)
o.processNextMsgReq(nil, nil, subject, reply, msg)
o.processNextMsgReq(nil, nil, subject, reply, msg[len(AckNext):])
skipAckReply = true
case bytes.Equal(msg, AckNak):
o.processNak(sseq, dseq)
@@ -1061,32 +1059,26 @@ func (o *Consumer) needAck(sseq uint64) bool {
// Helper for the next message requests.
func nextReqFromMsg(msg []byte) (time.Time, int, bool, error) {
req := strings.TrimSpace(string(msg))
if len(req) == 0 {
req := bytes.TrimSpace(msg)
switch {
case len(req) == 0:
return time.Time{}, 1, false, nil
}
if req[0] == '{' {
case req[0] == '{':
var cr JSApiConsumerGetNextRequest
if err := json.Unmarshal(msg, &cr); err != nil {
if err := json.Unmarshal(req, &cr); err != nil {
return time.Time{}, -1, false, err
}
return cr.Expires, cr.Batch, cr.NoWait, nil
}
bs := 1
// Naked batch size here for backward compatibility.
switch {
case strings.HasPrefix(req, string(AckNext)):
if n, _ := fmt.Sscanf(req, ackNextCnt, &bs); n == 0 {
bs = 1
}
default:
if n, err := strconv.Atoi(req); err == nil {
bs = n
if n, err := strconv.Atoi(string(req)); err == nil {
return time.Time{}, n, false, nil
}
}
return time.Time{}, bs, false, nil
return time.Time{}, 1, false, nil
}
// Represents a request that is on the internal waiting queue

View File

@@ -3056,7 +3056,7 @@ func TestJetStreamAckNext(t *testing.T) {
nc := clientConnectToServer(t, s)
defer nc.Close()
for i := 0; i < 10; i++ {
for i := 0; i < 12; i++ {
sendStreamMsg(t, nc, mname, fmt.Sprintf("msg %d", i))
}
@@ -3087,29 +3087,42 @@ func TestJetStreamAckNext(t *testing.T) {
t.Fatalf("wrong message received, expected: msg 1 got %q", msg.Data)
}
// now ack and request 5 more
// now ack and request 5 more using a naked number
err = msg.RespondMsg(&nats.Msg{Reply: sub.Subject, Subject: msg.Reply, Data: append(server.AckNext, []byte(" 5")...)})
if err != nil {
t.Fatalf("RespondMsg failed: %s", err)
}
// check next+cnt worked and got the right amount of messages, use ctx to avoid
// sleeps when all is working
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
getMsgs := func(start, count int) {
t.Helper()
for i := 1; i < 6; i++ {
select {
case msg := <-q:
expect := fmt.Sprintf("msg %d", i+1)
if !bytes.Equal(msg.Data, []byte(expect)) {
t.Fatalf("wrong message received, expected: %s", expect)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
for i := start; i < count+1; i++ {
select {
case msg := <-q:
expect := fmt.Sprintf("msg %d", i+1)
if !bytes.Equal(msg.Data, []byte(expect)) {
t.Fatalf("wrong message received, expected: %s got %#v", expect, msg)
}
case <-ctx.Done():
t.Fatalf("did not receive all messages")
}
case <-ctx.Done():
t.Fatalf("did not receive all messages")
}
}
getMsgs(1, 5)
// now ack and request 5 more using the full request
err = msg.RespondMsg(&nats.Msg{Reply: sub.Subject, Subject: msg.Reply, Data: append(server.AckNext, []byte(`{"batch": 5}`)...)})
if err != nil {
t.Fatalf("RespondMsg failed: %s", err)
}
getMsgs(6, 10)
if o.Info().AckFloor.StreamSeq != 2 {
t.Fatalf("second message was not acknowledged")
}