Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2023-06-09 09:29:13 -07:00
3 changed files with 259 additions and 162 deletions

View File

@@ -3454,10 +3454,10 @@ func (o *consumer) hbTimer() (time.Duration, *time.Timer) {
// Should only be called from consumer leader.
func (o *consumer) checkAckFloor() {
o.mu.RLock()
mset, closed, asflr := o.mset, o.closed, o.asflr
mset, closed, asflr, numPending := o.mset, o.closed, o.asflr, len(o.pending)
o.mu.RUnlock()
if closed || mset == nil {
if asflr == 0 || closed || mset == nil {
return
}
@@ -3469,19 +3469,46 @@ func (o *consumer) checkAckFloor() {
return
}
// Process all messages that no longer exist.
for seq := asflr + 1; seq < ss.FirstSeq; seq++ {
// Check if this message was pending.
// Check which linear space is less to walk.
if ss.FirstSeq-asflr-1 < uint64(numPending) {
// Process all messages that no longer exist.
for seq := asflr + 1; seq < ss.FirstSeq; seq++ {
// Check if this message was pending.
o.mu.RLock()
p, isPending := o.pending[seq]
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[seq]
}
o.mu.RUnlock()
// If it was pending for us, get rid of it.
if isPending {
o.processTerm(seq, p.Sequence, rdc)
}
}
} else if numPending > 0 {
// here it shorter to walk pending.
// toTerm is seq, dseq, rcd for each entry.
toTerm := make([]uint64, 0, numPending*3)
o.mu.RLock()
p, isPending := o.pending[seq]
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[seq]
for seq, p := range o.pending {
if seq < ss.FirstSeq {
var dseq uint64 = 1
if p != nil {
dseq = p.Sequence
}
var rdc uint64 = 1
if o.rdc != nil {
rdc = o.rdc[seq]
}
toTerm = append(toTerm, seq, dseq, rdc)
}
}
o.mu.RUnlock()
// If it was pending for us, get rid of it.
if isPending {
o.processTerm(seq, p.Sequence, rdc)
for i := 0; i < len(toTerm); i += 3 {
seq, dseq, rdc := toTerm[i], toTerm[i+1], toTerm[i+2]
o.processTerm(seq, dseq, rdc)
}
}
@@ -3522,20 +3549,13 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
return
}
// Track if we are interest retention policy, if not we can skip the ack floor check.
isInterestRetention := mset.isInterestRetention()
checkAckFloor := func() {
if isInterestRetention {
o.checkAckFloor()
}
}
// We will check this on entry and periodically.
checkAckFloor()
o.checkAckFloor()
// How often we will check for ack floor drift.
var ackFloorCheck = 30 * time.Second
// Spread these out for large numbers on a server restart.
delta := time.Duration(rand.Int63n(int64(time.Minute)))
var ackFloorCheck = time.Minute + delta
for {
select {
@@ -3551,7 +3571,7 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
o.suppressDeletion()
}
case <-time.After(ackFloorCheck):
checkAckFloor()
o.checkAckFloor()
case <-qch:
return
case <-s.quitCh: