When we fail to deliver a message for a consumer, either through didNotDeliver() or LoadMsg() failure re-adjust delivered count and waitingRequest accounting.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-09-01 08:48:28 -07:00
parent 45e6812d70
commit 3a39786972
3 changed files with 141 additions and 3 deletions

View File

@@ -3030,6 +3030,22 @@ func (o *consumer) incDeliveryCount(sseq uint64) uint64 {
return o.rdc[sseq] + 1
}
// Used if we have to adjust on failed delivery or bad lookups.
// Those failed attempts should not increase deliver count.
// Lock should be held.
func (o *consumer) decDeliveryCount(sseq uint64) {
if o.rdc == nil {
return
}
if dc, ok := o.rdc[sseq]; ok {
if dc == 1 {
delete(o.rdc, sseq)
} else {
o.rdc[sseq] -= 1
}
}
}
// send a delivery exceeded advisory.
func (o *consumer) notifyDeliveryExceeded(sseq, dc uint64) {
e := JSConsumerDeliveryExceededAdvisory{
@@ -3104,6 +3120,8 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
if sm == nil || err != nil {
pmsg.returnToPool()
pmsg, dc = nil, 0
// Adjust back deliver count.
o.decDeliveryCount(seq)
}
return pmsg, dc, err
}
@@ -3205,6 +3223,7 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
interest = true
}
}
// If interest, update batch pending requests counter and update fexp timer.
if interest {
brp += wr.n
@@ -3508,8 +3527,12 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
if err == ErrStoreEOF {
o.checkNumPendingOnEOF()
}
if err == ErrStoreMsgNotFound || err == errDeletedMsg || err == ErrStoreEOF || err == errMaxAckPending || err == errPartialCache {
if err == ErrStoreMsgNotFound || err == errDeletedMsg || err == ErrStoreEOF || err == errMaxAckPending {
goto waitForMsgs
} else if err == errPartialCache {
s.Warnf("Unexpected partial cache error looking up message for consumer")
goto waitForMsgs
} else {
s.Errorf("Received an error looking up message for consumer: %v", err)
goto waitForMsgs
@@ -3906,20 +3929,39 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
}
}
// Credit back a failed delivery.
// lock should be held.
func (o *consumer) creditWaitingRequest(reply string) {
for i, rp := 0, o.waiting.rp; i < o.waiting.n; i++ {
if wr := o.waiting.reqs[rp]; wr != nil {
if wr.reply == reply {
wr.n++
wr.d--
return
}
}
rp = (rp + 1) % cap(o.waiting.reqs)
}
}
// didNotDeliver is called when a delivery for a consumer message failed.
// Depending on our state, we will process the failure.
func (o *consumer) didNotDeliver(seq uint64) {
func (o *consumer) didNotDeliver(seq uint64, subj string) {
o.mu.Lock()
mset := o.mset
if mset == nil {
o.mu.Unlock()
return
}
// Adjust back deliver count.
o.decDeliveryCount(seq)
var checkDeliveryInterest bool
if o.isPushMode() {
o.active = false
checkDeliveryInterest = true
} else if o.pending != nil {
o.creditWaitingRequest(subj)
// pull mode and we have pending.
if _, ok := o.pending[seq]; ok {
// We found this messsage on pending, we need