diff --git a/server/consumer.go b/server/consumer.go index afc522ce..e22f7864 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -751,9 +751,9 @@ func (o *Consumer) sendAckReply(subj string) { // Process a message for the ack reply subject delivered with a message. func (o *Consumer) processAck(_ *subscription, _ *client, subject, reply string, msg []byte) { - sseq, dseq, dcount, _, _ := o.ReplyInfo(subject) + sseq, dseq, dcount := ackReplyInfo(subject) - var skipAckReply bool + skipAckReply := sseq == 0 switch { case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK): @@ -1830,23 +1830,73 @@ func (o *Consumer) checkPending() { // SeqFromReply will extract a sequence number from a reply subject. func (o *Consumer) SeqFromReply(reply string) uint64 { - _, seq, _, _, _ := o.ReplyInfo(reply) - return seq + _, dseq, _ := ackReplyInfo(reply) + return dseq } // StreamSeqFromReply will extract the stream sequence from the reply subject. func (o *Consumer) StreamSeqFromReply(reply string) uint64 { - seq, _, _, _, _ := o.ReplyInfo(reply) - return seq + sseq, _, _ := ackReplyInfo(reply) + return sseq } +// Quick parser for positive numbers in ack reply encoding. +func parseAckReplyNum(d string) (n int64) { + if len(d) == 0 { + return -1 + } + for _, dec := range d { + if dec < asciiZero || dec > asciiNine { + return -1 + } + n = n*10 + (int64(dec) - asciiZero) + } + return n +} + +const expectedNumReplyTokens = 9 + // Grab encoded information in the reply subject for a delivered message. -func (o *Consumer) ReplyInfo(reply string) (sseq, dseq, dcount uint64, ts int64, pending uint64) { - n, err := fmt.Sscanf(reply, o.ackReplyT, &dcount, &sseq, &dseq, &ts, &pending) - if err != nil || n != 5 { +func (o *Consumer) ReplyInfo(subject string) (sseq, dseq, dcount uint64, ts int64, pending uint64) { + tsa := [expectedNumReplyTokens]string{} + start, tokens := 0, tsa[:0] + for i := 0; i < len(subject); i++ { + if subject[i] == btsep { + tokens = append(tokens, subject[start:i]) + start = i + 1 + } + } + tokens = append(tokens, subject[start:]) + if len(tokens) != expectedNumReplyTokens || tokens[0] != "$JS" || tokens[1] != "ACK" { return 0, 0, 0, 0, 0 } - return + // TODO(dlc) - Should we error if we do not match consumer name? + // stream is tokens[2], consumer is 3. + dcount = uint64(parseAckReplyNum(tokens[4])) + sseq, dseq = uint64(parseAckReplyNum(tokens[5])), uint64(parseAckReplyNum(tokens[6])) + ts = parseAckReplyNum(tokens[7]) + pending = uint64(parseAckReplyNum(tokens[8])) + + return sseq, dseq, dcount, ts, pending +} + +func ackReplyInfo(subject string) (sseq, dseq, dcount uint64) { + tsa := [expectedNumReplyTokens]string{} + start, tokens := 0, tsa[:0] + for i := 0; i < len(subject); i++ { + if subject[i] == btsep { + tokens = append(tokens, subject[start:i]) + start = i + 1 + } + } + tokens = append(tokens, subject[start:]) + if len(tokens) != expectedNumReplyTokens || tokens[0] != "$JS" || tokens[1] != "ACK" { + return 0, 0, 0 + } + dcount = uint64(parseAckReplyNum(tokens[4])) + sseq, dseq = uint64(parseAckReplyNum(tokens[5])), uint64(parseAckReplyNum(tokens[6])) + + return sseq, dseq, dcount } // NextSeq returns the next delivered sequence number for this observable.