Merge pull request #1717 from nats-io/ackReply

Optimize ackReply parsing
This commit is contained in:
Derek Collison
2020-11-17 11:44:34 -08:00
committed by GitHub

View File

@@ -751,9 +751,9 @@ func (o *Consumer) sendAckReply(subj string) {
// Process a message for the ack reply subject delivered with a message.
func (o *Consumer) processAck(_ *subscription, _ *client, subject, reply string, msg []byte) {
sseq, dseq, dcount, _, _ := o.ReplyInfo(subject)
sseq, dseq, dcount := ackReplyInfo(subject)
var skipAckReply bool
skipAckReply := sseq == 0
switch {
case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK):
@@ -1830,23 +1830,73 @@ func (o *Consumer) checkPending() {
// SeqFromReply will extract a sequence number from a reply subject.
func (o *Consumer) SeqFromReply(reply string) uint64 {
_, seq, _, _, _ := o.ReplyInfo(reply)
return seq
_, dseq, _ := ackReplyInfo(reply)
return dseq
}
// StreamSeqFromReply will extract the stream sequence from the reply subject.
func (o *Consumer) StreamSeqFromReply(reply string) uint64 {
seq, _, _, _, _ := o.ReplyInfo(reply)
return seq
sseq, _, _ := ackReplyInfo(reply)
return sseq
}
// Quick parser for positive numbers in ack reply encoding.
func parseAckReplyNum(d string) (n int64) {
if len(d) == 0 {
return -1
}
for _, dec := range d {
if dec < asciiZero || dec > asciiNine {
return -1
}
n = n*10 + (int64(dec) - asciiZero)
}
return n
}
const expectedNumReplyTokens = 9
// Grab encoded information in the reply subject for a delivered message.
func (o *Consumer) ReplyInfo(reply string) (sseq, dseq, dcount uint64, ts int64, pending uint64) {
n, err := fmt.Sscanf(reply, o.ackReplyT, &dcount, &sseq, &dseq, &ts, &pending)
if err != nil || n != 5 {
func (o *Consumer) ReplyInfo(subject string) (sseq, dseq, dcount uint64, ts int64, pending uint64) {
tsa := [expectedNumReplyTokens]string{}
start, tokens := 0, tsa[:0]
for i := 0; i < len(subject); i++ {
if subject[i] == btsep {
tokens = append(tokens, subject[start:i])
start = i + 1
}
}
tokens = append(tokens, subject[start:])
if len(tokens) != expectedNumReplyTokens || tokens[0] != "$JS" || tokens[1] != "ACK" {
return 0, 0, 0, 0, 0
}
return
// TODO(dlc) - Should we error if we do not match consumer name?
// stream is tokens[2], consumer is 3.
dcount = uint64(parseAckReplyNum(tokens[4]))
sseq, dseq = uint64(parseAckReplyNum(tokens[5])), uint64(parseAckReplyNum(tokens[6]))
ts = parseAckReplyNum(tokens[7])
pending = uint64(parseAckReplyNum(tokens[8]))
return sseq, dseq, dcount, ts, pending
}
func ackReplyInfo(subject string) (sseq, dseq, dcount uint64) {
tsa := [expectedNumReplyTokens]string{}
start, tokens := 0, tsa[:0]
for i := 0; i < len(subject); i++ {
if subject[i] == btsep {
tokens = append(tokens, subject[start:i])
start = i + 1
}
}
tokens = append(tokens, subject[start:])
if len(tokens) != expectedNumReplyTokens || tokens[0] != "$JS" || tokens[1] != "ACK" {
return 0, 0, 0
}
dcount = uint64(parseAckReplyNum(tokens[4]))
sseq, dseq = uint64(parseAckReplyNum(tokens[5])), uint64(parseAckReplyNum(tokens[6]))
return sseq, dseq, dcount
}
// NextSeq returns the next delivered sequence number for this observable.