mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
@@ -753,15 +753,15 @@ func (o *Consumer) sendAckReply(subj string) {
|
||||
|
||||
// Process a message for the ack reply subject delivered with a message.
|
||||
func (o *Consumer) processAck(_ *subscription, _ *client, subject, reply string, msg []byte) {
|
||||
sseq, dseq, dcount := ackReplyInfo(subject)
|
||||
sseq, dseq, dc := ackReplyInfo(subject)
|
||||
|
||||
skipAckReply := sseq == 0
|
||||
|
||||
switch {
|
||||
case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK):
|
||||
o.ackMsg(sseq, dseq, dcount)
|
||||
o.ackMsg(sseq, dseq, dc)
|
||||
case bytes.HasPrefix(msg, AckNext):
|
||||
o.ackMsg(sseq, dseq, dcount)
|
||||
o.ackMsg(sseq, dseq, dc)
|
||||
o.processNextMsgReq(nil, nil, subject, reply, msg[len(AckNext):])
|
||||
skipAckReply = true
|
||||
case bytes.Equal(msg, AckNak):
|
||||
@@ -769,7 +769,7 @@ func (o *Consumer) processAck(_ *subscription, _ *client, subject, reply string,
|
||||
case bytes.Equal(msg, AckProgress):
|
||||
o.progressUpdate(sseq)
|
||||
case bytes.Equal(msg, AckTerm):
|
||||
o.processTerm(sseq, dseq, dcount)
|
||||
o.processTerm(sseq, dseq, dc)
|
||||
}
|
||||
|
||||
// Ack the ack if requested.
|
||||
@@ -815,9 +815,9 @@ func (o *Consumer) processNak(sseq, dseq uint64) {
|
||||
}
|
||||
|
||||
// Process a TERM
|
||||
func (o *Consumer) processTerm(sseq, dseq, dcount uint64) {
|
||||
func (o *Consumer) processTerm(sseq, dseq, dc uint64) {
|
||||
// Treat like an ack to suppress redelivery.
|
||||
o.processAckMsg(sseq, dseq, dcount, false)
|
||||
o.processAckMsg(sseq, dseq, dc, false)
|
||||
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
@@ -833,7 +833,7 @@ func (o *Consumer) processTerm(sseq, dseq, dcount uint64) {
|
||||
Consumer: o.name,
|
||||
ConsumerSeq: dseq,
|
||||
StreamSeq: sseq,
|
||||
Deliveries: dcount,
|
||||
Deliveries: dc,
|
||||
}
|
||||
|
||||
j, err := json.MarshalIndent(e, "", " ")
|
||||
@@ -1022,7 +1022,7 @@ func (o *Consumer) shouldSample() bool {
|
||||
return mrand.Int31n(100) <= o.sfreq
|
||||
}
|
||||
|
||||
func (o *Consumer) sampleAck(sseq, dseq, dcount uint64) {
|
||||
func (o *Consumer) sampleAck(sseq, dseq, dc uint64) {
|
||||
if !o.shouldSample() {
|
||||
return
|
||||
}
|
||||
@@ -1041,7 +1041,7 @@ func (o *Consumer) sampleAck(sseq, dseq, dcount uint64) {
|
||||
ConsumerSeq: dseq,
|
||||
StreamSeq: sseq,
|
||||
Delay: unow - o.pending[sseq].Timestamp,
|
||||
Deliveries: dcount,
|
||||
Deliveries: dc,
|
||||
}
|
||||
|
||||
j, err := json.MarshalIndent(e, "", " ")
|
||||
@@ -1053,11 +1053,11 @@ func (o *Consumer) sampleAck(sseq, dseq, dcount uint64) {
|
||||
}
|
||||
|
||||
// Process an ack for a message.
|
||||
func (o *Consumer) ackMsg(sseq, dseq, dcount uint64) {
|
||||
o.processAckMsg(sseq, dseq, dcount, true)
|
||||
func (o *Consumer) ackMsg(sseq, dseq, dc uint64) {
|
||||
o.processAckMsg(sseq, dseq, dc, true)
|
||||
}
|
||||
|
||||
func (o *Consumer) processAckMsg(sseq, dseq, dcount uint64, doSample bool) {
|
||||
func (o *Consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
|
||||
o.mu.Lock()
|
||||
var sagap uint64
|
||||
var needSignal bool
|
||||
@@ -1066,7 +1066,7 @@ func (o *Consumer) processAckMsg(sseq, dseq, dcount uint64, doSample bool) {
|
||||
case AckExplicit:
|
||||
if p, ok := o.pending[sseq]; ok {
|
||||
if doSample {
|
||||
o.sampleAck(sseq, dseq, dcount)
|
||||
o.sampleAck(sseq, dseq, dc)
|
||||
}
|
||||
if o.maxp > 0 && len(o.pending) >= o.maxp {
|
||||
needSignal = true
|
||||
@@ -1354,7 +1354,7 @@ func (o *Consumer) incDeliveryCount(sseq uint64) uint64 {
|
||||
}
|
||||
|
||||
// send a delivery exceeded advisory.
|
||||
func (o *Consumer) notifyDeliveryExceeded(sseq, dcount uint64) {
|
||||
func (o *Consumer) notifyDeliveryExceeded(sseq, dc uint64) {
|
||||
e := JSConsumerDeliveryExceededAdvisory{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: JSConsumerDeliveryExceededAdvisoryType,
|
||||
@@ -1364,7 +1364,7 @@ func (o *Consumer) notifyDeliveryExceeded(sseq, dcount uint64) {
|
||||
Stream: o.stream,
|
||||
Consumer: o.name,
|
||||
StreamSeq: sseq,
|
||||
Deliveries: dcount,
|
||||
Deliveries: dc,
|
||||
}
|
||||
|
||||
j, err := json.MarshalIndent(e, "", " ")
|
||||
@@ -1396,20 +1396,20 @@ var errBadConsumer = errors.New("consumer not valid")
|
||||
// Get next available message from underlying store.
|
||||
// Is partition aware and redeliver aware.
|
||||
// Lock should be held.
|
||||
func (o *Consumer) getNextMsg() (subj string, hdr, msg []byte, seq uint64, dcount uint64, ts int64, err error) {
|
||||
func (o *Consumer) getNextMsg() (subj string, hdr, msg []byte, seq uint64, dc uint64, ts int64, err error) {
|
||||
if o.mset == nil {
|
||||
return _EMPTY_, nil, nil, 0, 0, 0, errBadConsumer
|
||||
}
|
||||
for {
|
||||
seq, dcount := o.sseq, uint64(1)
|
||||
seq, dc := o.sseq, uint64(1)
|
||||
if len(o.rdq) > 0 {
|
||||
seq = o.rdq[0]
|
||||
o.rdq = append(o.rdq[:0], o.rdq[1:]...)
|
||||
dcount = o.incDeliveryCount(seq)
|
||||
if o.maxdc > 0 && dcount > o.maxdc {
|
||||
dc = o.incDeliveryCount(seq)
|
||||
if o.maxdc > 0 && dc > o.maxdc {
|
||||
// Only send once
|
||||
if dcount == o.maxdc+1 {
|
||||
o.notifyDeliveryExceeded(seq, dcount-1)
|
||||
if dc == o.maxdc+1 {
|
||||
o.notifyDeliveryExceeded(seq, dc-1)
|
||||
}
|
||||
// Make sure to remove from pending.
|
||||
delete(o.pending, seq)
|
||||
@@ -1423,14 +1423,14 @@ func (o *Consumer) getNextMsg() (subj string, hdr, msg []byte, seq uint64, dcoun
|
||||
|
||||
subj, hdr, msg, ts, err := o.mset.store.LoadMsg(seq)
|
||||
if err == nil {
|
||||
if dcount == 1 { // First delivery.
|
||||
if dc == 1 { // First delivery.
|
||||
o.sseq++
|
||||
if o.config.FilterSubject != _EMPTY_ && !o.isFilteredMatch(subj) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
// We have the msg here.
|
||||
return subj, hdr, msg, seq, dcount, ts, nil
|
||||
return subj, hdr, msg, seq, dc, ts, nil
|
||||
}
|
||||
// We got an error here. If this is an EOF we will return, otherwise
|
||||
// we can continue looking.
|
||||
@@ -1508,7 +1508,7 @@ func (o *Consumer) loopAndGatherMsgs(s *Server, a *Account) {
|
||||
// Deliver all the msgs we have now, once done or on a condition, we wait for new ones.
|
||||
for {
|
||||
var (
|
||||
seq, dcount uint64
|
||||
seq, dc uint64
|
||||
subj, dsubj string
|
||||
hdr []byte
|
||||
msg []byte
|
||||
@@ -1534,7 +1534,7 @@ func (o *Consumer) loopAndGatherMsgs(s *Server, a *Account) {
|
||||
goto waitForMsgs
|
||||
}
|
||||
|
||||
subj, hdr, msg, seq, dcount, ts, err = o.getNextMsg()
|
||||
subj, hdr, msg, seq, dc, ts, err = o.getNextMsg()
|
||||
|
||||
// On error either wait or return.
|
||||
if err != nil {
|
||||
@@ -1586,7 +1586,7 @@ func (o *Consumer) loopAndGatherMsgs(s *Server, a *Account) {
|
||||
}
|
||||
}
|
||||
|
||||
o.deliverMsg(dsubj, subj, hdr, msg, seq, dcount, ts)
|
||||
o.deliverMsg(dsubj, subj, hdr, msg, seq, dc, ts)
|
||||
|
||||
o.mu.Unlock()
|
||||
continue
|
||||
@@ -1610,8 +1610,8 @@ func (o *Consumer) loopAndGatherMsgs(s *Server, a *Account) {
|
||||
}
|
||||
}
|
||||
|
||||
func (o *Consumer) ackReply(sseq, dseq, dcount uint64, ts int64, pending uint64) string {
|
||||
return fmt.Sprintf(o.ackReplyT, dcount, sseq, dseq, ts, pending)
|
||||
func (o *Consumer) ackReply(sseq, dseq, dc uint64, ts int64, pending uint64) string {
|
||||
return fmt.Sprintf(o.ackReplyT, dc, sseq, dseq, ts, pending)
|
||||
}
|
||||
|
||||
// deliverCurrentMsg is the hot path to deliver a message that was just received.
|
||||
@@ -1667,12 +1667,12 @@ func (o *Consumer) deliverCurrentMsg(subj string, hdr, msg []byte, seq uint64, t
|
||||
|
||||
// Deliver a msg to the consumer.
|
||||
// Lock should be held and o.mset validated to be non-nil.
|
||||
func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dcount uint64, ts int64) {
|
||||
func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint64, ts int64) {
|
||||
if o.mset == nil {
|
||||
return
|
||||
}
|
||||
// Update pending on first attempt
|
||||
if dcount == 1 && o.sgap > 0 {
|
||||
if dc == 1 && o.sgap > 0 {
|
||||
o.sgap--
|
||||
}
|
||||
|
||||
@@ -1684,7 +1684,7 @@ func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dcount u
|
||||
}
|
||||
|
||||
dseq := o.dseq
|
||||
pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dcount, ts, o.sgap), hdr, msg, o, seq}
|
||||
pmsg := &jsPubMsg{dsubj, subj, o.ackReply(seq, dseq, dc, ts, o.sgap), hdr, msg, o, seq}
|
||||
mset := o.mset
|
||||
ap := o.config.AckPolicy
|
||||
|
||||
@@ -1709,7 +1709,7 @@ func (o *Consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dcount u
|
||||
o.dseq++
|
||||
|
||||
// FIXME(dlc) - Capture errors?
|
||||
o.store.UpdateDelivered(dseq, seq, dcount, ts)
|
||||
o.store.UpdateDelivered(dseq, seq, dc, ts)
|
||||
}
|
||||
|
||||
// Tracks our outstanding pending acks. Only applicable to AckExplicit mode.
|
||||
@@ -1859,7 +1859,7 @@ func parseAckReplyNum(d string) (n int64) {
|
||||
const expectedNumReplyTokens = 9
|
||||
|
||||
// Grab encoded information in the reply subject for a delivered message.
|
||||
func (o *Consumer) ReplyInfo(subject string) (sseq, dseq, dcount uint64, ts int64, pending uint64) {
|
||||
func (o *Consumer) ReplyInfo(subject string) (sseq, dseq, dc uint64, ts int64, pending uint64) {
|
||||
tsa := [expectedNumReplyTokens]string{}
|
||||
start, tokens := 0, tsa[:0]
|
||||
for i := 0; i < len(subject); i++ {
|
||||
@@ -1874,15 +1874,15 @@ func (o *Consumer) ReplyInfo(subject string) (sseq, dseq, dcount uint64, ts int6
|
||||
}
|
||||
// TODO(dlc) - Should we error if we do not match consumer name?
|
||||
// stream is tokens[2], consumer is 3.
|
||||
dcount = uint64(parseAckReplyNum(tokens[4]))
|
||||
dc = uint64(parseAckReplyNum(tokens[4]))
|
||||
sseq, dseq = uint64(parseAckReplyNum(tokens[5])), uint64(parseAckReplyNum(tokens[6]))
|
||||
ts = parseAckReplyNum(tokens[7])
|
||||
pending = uint64(parseAckReplyNum(tokens[8]))
|
||||
|
||||
return sseq, dseq, dcount, ts, pending
|
||||
return sseq, dseq, dc, ts, pending
|
||||
}
|
||||
|
||||
func ackReplyInfo(subject string) (sseq, dseq, dcount uint64) {
|
||||
func ackReplyInfo(subject string) (sseq, dseq, dc uint64) {
|
||||
tsa := [expectedNumReplyTokens]string{}
|
||||
start, tokens := 0, tsa[:0]
|
||||
for i := 0; i < len(subject); i++ {
|
||||
@@ -1895,10 +1895,10 @@ func ackReplyInfo(subject string) (sseq, dseq, dcount uint64) {
|
||||
if len(tokens) != expectedNumReplyTokens || tokens[0] != "$JS" || tokens[1] != "ACK" {
|
||||
return 0, 0, 0
|
||||
}
|
||||
dcount = uint64(parseAckReplyNum(tokens[4]))
|
||||
dc = uint64(parseAckReplyNum(tokens[4]))
|
||||
sseq, dseq = uint64(parseAckReplyNum(tokens[5])), uint64(parseAckReplyNum(tokens[6]))
|
||||
|
||||
return sseq, dseq, dcount
|
||||
return sseq, dseq, dc
|
||||
}
|
||||
|
||||
// NextSeq returns the next delivered sequence number for this observable.
|
||||
|
||||
Reference in New Issue
Block a user