diff --git a/server/consumer.go b/server/consumer.go index 3b6e2527..56917cd5 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1109,9 +1109,9 @@ func (o *consumer) processAck(_ *subscription, c *client, subject, reply string, switch { case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK): - o.ackMsg(sseq, dseq, dc) + o.processAckMsg(sseq, dseq, dc, true) case bytes.HasPrefix(msg, AckNext): - o.ackMsg(sseq, dseq, dc) + o.processAckMsg(sseq, dseq, dc, true) // processNextMsgReq can be invoked from an internal subscription or from here. // Therefore, it has to call msgParts(), so we can't simply pass msg[len(AckNext):] // with current c.pa.hdr because it would cause a panic. We will save the current @@ -1499,11 +1499,6 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) { o.sendAdvisory(o.ackEventT, j) } -// Process an ack for a message. -func (o *consumer) ackMsg(sseq, dseq, dc uint64) { - o.processAckMsg(sseq, dseq, dc, true) -} - func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) { o.mu.Lock() var sagap uint64 diff --git a/server/stream.go b/server/stream.go index 6853fa24..02cf691f 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3075,16 +3075,33 @@ func (mset *stream) checkInterest(seq uint64, obs *consumer) bool { return false } +// Helper function to see if we have "real" consumers for a workqueue. +// Used in mixed mode where we have direct/no ack consumers for sync. +func (mset *stream) hasExplicitAckConsumers() bool { + mset.mu.RLock() + defer mset.mu.RUnlock() + for _, o := range mset.consumers { + if o.cfg.AckPolicy == AckExplicit { + return true + } + } + return false +} + // ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy. -func (mset *stream) ackMsg(obs *consumer, seq uint64) { +func (mset *stream) ackMsg(o *consumer, seq uint64) { switch mset.cfg.Retention { case LimitsPolicy: return case WorkQueuePolicy: - mset.store.RemoveMsg(seq) + // Consumer is nil this is signaling from an AckNone. + // Meaning a sync consumer for a mirror or a source. + if o != nil || !mset.hasExplicitAckConsumers() { + mset.store.RemoveMsg(seq) + } case InterestPolicy: mset.mu.Lock() - hasInterest := mset.checkInterest(seq, obs) + hasInterest := mset.checkInterest(seq, o) mset.mu.Unlock() if !hasInterest { mset.store.RemoveMsg(seq)