For ackMsg, make sure sequence is still relevant as well.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2022-09-14 16:47:35 -07:00
parent 6c97733bb8
commit 2aaf22b0de

View File

@@ -4582,11 +4582,19 @@ func (mset *stream) checkInterest(seq uint64, obs *consumer) bool {
// ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy.
func (mset *stream) ackMsg(o *consumer, seq uint64) {
var shouldRemove bool
switch mset.cfg.Retention {
case LimitsPolicy:
if mset.cfg.Retention == LimitsPolicy {
return
}
// Make sure this sequence is not below our first sequence.
var state StreamState
mset.store.FastState(&state)
if seq < state.FirstSeq {
return
}
var shouldRemove bool
switch mset.cfg.Retention {
case WorkQueuePolicy:
// Normally we just remove a message when its ack'd here but if we have direct consumers
// from sources and/or mirrors we need to make sure they have delivered the msg.