From 2aaf22b0de04f0e472ce8a2e8c4daf0a8d207b5d Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 14 Sep 2022 16:47:35 -0700 Subject: [PATCH] For ackMsg, make sure sequence is still relevant as well. Signed-off-by: Derek Collison --- server/stream.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/server/stream.go b/server/stream.go index 89cb7cfd..8800b20f 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4582,11 +4582,19 @@ func (mset *stream) checkInterest(seq uint64, obs *consumer) bool { // ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy. func (mset *stream) ackMsg(o *consumer, seq uint64) { - var shouldRemove bool - - switch mset.cfg.Retention { - case LimitsPolicy: + if mset.cfg.Retention == LimitsPolicy { return + } + + // Make sure this sequence is not below our first sequence. + var state StreamState + mset.store.FastState(&state) + if seq < state.FirstSeq { + return + } + + var shouldRemove bool + switch mset.cfg.Retention { case WorkQueuePolicy: // Normally we just remove a message when its ack'd here but if we have direct consumers // from sources and/or mirrors we need to make sure they have delivered the msg.