diff --git a/server/consumer.go b/server/consumer.go index c9fd4055..e5165896 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3315,12 +3315,25 @@ func (o *consumer) checkAckFloor() { func (o *consumer) processInboundAcks(qch chan struct{}) { // Grab the server lock to watch for server quit. o.mu.RLock() - s := o.srv + s, mset := o.srv, o.mset hasInactiveThresh := o.cfg.InactiveThreshold > 0 o.mu.RUnlock() + if s == nil || mset == nil { + return + } + + // Track if we are interest retention policy, if not we can skip the ack floor check. + isInterestRetention := mset.isInterestRetention() + + checkAckFloor := func() { + if isInterestRetention { + o.checkAckFloor() + } + } + // We will check this on entry and periodically. - o.checkAckFloor() + checkAckFloor() // How often we will check for ack floor drift. var ackFloorCheck = 30 * time.Second @@ -3339,7 +3352,7 @@ func (o *consumer) processInboundAcks(qch chan struct{}) { o.suppressDeletion() } case <-time.After(ackFloorCheck): - o.checkAckFloor() + checkAckFloor() case <-qch: return case <-s.quitCh: