From 4500f7889e42932529f72415648397e94cc50c11 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 7 May 2021 14:13:36 -0700 Subject: [PATCH] For WorkQueues that have direct we need to make sure they have received the message before deleting. Signed-off-by: Derek Collison --- server/consumer.go | 5 +++-- server/stream.go | 35 +++++++++++++++++------------------ 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 56917cd5..3c8bb9ce 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1599,9 +1599,10 @@ func (o *consumer) needAck(sseq uint64) bool { } state, err := o.store.State() if err != nil || state == nil { - o.mu.RUnlock() // Fall back to what we track internally for now. - return sseq > o.asflr + needsAck := sseq > o.asflr + o.mu.RUnlock() + return needsAck } asflr, osseq = state.AckFloor.Stream, o.sseq pending = state.Pending diff --git a/server/stream.go b/server/stream.go index 02cf691f..c0186dc7 100644 --- a/server/stream.go +++ b/server/stream.go @@ -160,6 +160,9 @@ type stream struct { // Sources sources map[string]*sourceInfo + // Indicates we have direct consumers. + directs int + // For flowcontrol processing for source and mirror internal consumers. fcr map[uint64]string @@ -3000,12 +3003,18 @@ func (mset *stream) setConsumer(o *consumer) { if o.cfg.FilterSubject != _EMPTY_ { mset.numFilter++ } + if o.cfg.Direct { + mset.directs++ + } } func (mset *stream) removeConsumer(o *consumer) { if o.cfg.FilterSubject != _EMPTY_ { mset.numFilter-- } + if o.cfg.Direct { + mset.directs-- + } delete(mset.consumers, o.name) } @@ -3075,34 +3084,24 @@ func (mset *stream) checkInterest(seq uint64, obs *consumer) bool { return false } -// Helper function to see if we have "real" consumers for a workqueue. -// Used in mixed mode where we have direct/no ack consumers for sync. -func (mset *stream) hasExplicitAckConsumers() bool { - mset.mu.RLock() - defer mset.mu.RUnlock() - for _, o := range mset.consumers { - if o.cfg.AckPolicy == AckExplicit { - return true - } - } - return false -} - // ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy. func (mset *stream) ackMsg(o *consumer, seq uint64) { switch mset.cfg.Retention { case LimitsPolicy: return case WorkQueuePolicy: - // Consumer is nil this is signaling from an AckNone. - // Meaning a sync consumer for a mirror or a source. - if o != nil || !mset.hasExplicitAckConsumers() { + // Normally we just remove a message when its ack'd here but if we have direct consumers + // from sources and/or mirrors we need to make sure they have delivered the msg. + mset.mu.RLock() + shouldRemove := mset.directs <= 0 || !mset.checkInterest(seq, o) + mset.mu.RUnlock() + if shouldRemove { mset.store.RemoveMsg(seq) } case InterestPolicy: - mset.mu.Lock() + mset.mu.RLock() hasInterest := mset.checkInterest(seq, o) - mset.mu.Unlock() + mset.mu.RUnlock() if !hasInterest { mset.store.RemoveMsg(seq) }