mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Only check ack floor if we are interest policy based.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -3315,12 +3315,25 @@ func (o *consumer) checkAckFloor() {
|
||||
func (o *consumer) processInboundAcks(qch chan struct{}) {
|
||||
// Grab the server lock to watch for server quit.
|
||||
o.mu.RLock()
|
||||
s := o.srv
|
||||
s, mset := o.srv, o.mset
|
||||
hasInactiveThresh := o.cfg.InactiveThreshold > 0
|
||||
o.mu.RUnlock()
|
||||
|
||||
if s == nil || mset == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Track if we are interest retention policy, if not we can skip the ack floor check.
|
||||
isInterestRetention := mset.isInterestRetention()
|
||||
|
||||
checkAckFloor := func() {
|
||||
if isInterestRetention {
|
||||
o.checkAckFloor()
|
||||
}
|
||||
}
|
||||
|
||||
// We will check this on entry and periodically.
|
||||
o.checkAckFloor()
|
||||
checkAckFloor()
|
||||
|
||||
// How often we will check for ack floor drift.
|
||||
var ackFloorCheck = 30 * time.Second
|
||||
@@ -3339,7 +3352,7 @@ func (o *consumer) processInboundAcks(qch chan struct{}) {
|
||||
o.suppressDeletion()
|
||||
}
|
||||
case <-time.After(ackFloorCheck):
|
||||
o.checkAckFloor()
|
||||
checkAckFloor()
|
||||
case <-qch:
|
||||
return
|
||||
case <-s.quitCh:
|
||||
|
||||
Reference in New Issue
Block a user