diff --git a/server/stream.go b/server/stream.go index 89cb7cfd..8800b20f 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4582,11 +4582,19 @@ func (mset *stream) checkInterest(seq uint64, obs *consumer) bool { // ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy. func (mset *stream) ackMsg(o *consumer, seq uint64) { - var shouldRemove bool - - switch mset.cfg.Retention { - case LimitsPolicy: + if mset.cfg.Retention == LimitsPolicy { return + } + + // Make sure this sequence is not below our first sequence. + var state StreamState + mset.store.FastState(&state) + if seq < state.FirstSeq { + return + } + + var shouldRemove bool + switch mset.cfg.Retention { case WorkQueuePolicy: // Normally we just remove a message when its ack'd here but if we have direct consumers // from sources and/or mirrors we need to make sure they have delivered the msg.