Snapshot on clean shutdown if needed or interest based retention

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-04-02 02:39:27 -07:00
parent e54019f87f
commit b752b8b30d

View File

@@ -2533,8 +2533,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool {
state, err := o.store.BorrowState()
if err != nil || state == nil {
// Fall back to what we track internally for now.
needAck := sseq > o.asflr && !o.isFiltered()
return needAck
return sseq > o.asflr && !o.isFiltered()
}
// If loading state as here, the osseq is +1.
asflr, osseq, pending = state.AckFloor.Stream, state.Delivered.Stream+1, state.Pending
@@ -3820,7 +3819,8 @@ func (o *consumer) checkPending() {
o.mu.RLock()
mset := o.mset
// On stop, mset and timer will be nil.
if mset == nil || o.ptmr == nil {
if o.closed || mset == nil || o.ptmr == nil {
stopAndClearTimer(&o.ptmr)
o.mu.RUnlock()
return
}
@@ -4377,7 +4377,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
n.Delete()
} else {
// Try to install snapshot on clean exit
if o.store != nil && n.NeedSnapshot() {
if o.store != nil && (o.retention != LimitsPolicy || n.NeedSnapshot()) {
if snap, err := o.store.EncodedState(); err == nil {
n.InstallSnapshot(snap)
}
@@ -4574,7 +4574,6 @@ func (o *consumer) checkStateForInterestStream() {
o.mu.Unlock()
return
}
state, err := o.store.State()
o.mu.Unlock()