mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
For WorkQueues that have direct we need to make sure they have received the message before deleting.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1599,9 +1599,10 @@ func (o *consumer) needAck(sseq uint64) bool {
|
||||
}
|
||||
state, err := o.store.State()
|
||||
if err != nil || state == nil {
|
||||
o.mu.RUnlock()
|
||||
// Fall back to what we track internally for now.
|
||||
return sseq > o.asflr
|
||||
needsAck := sseq > o.asflr
|
||||
o.mu.RUnlock()
|
||||
return needsAck
|
||||
}
|
||||
asflr, osseq = state.AckFloor.Stream, o.sseq
|
||||
pending = state.Pending
|
||||
|
||||
@@ -160,6 +160,9 @@ type stream struct {
|
||||
// Sources
|
||||
sources map[string]*sourceInfo
|
||||
|
||||
// Indicates we have direct consumers.
|
||||
directs int
|
||||
|
||||
// For flowcontrol processing for source and mirror internal consumers.
|
||||
fcr map[uint64]string
|
||||
|
||||
@@ -3000,12 +3003,18 @@ func (mset *stream) setConsumer(o *consumer) {
|
||||
if o.cfg.FilterSubject != _EMPTY_ {
|
||||
mset.numFilter++
|
||||
}
|
||||
if o.cfg.Direct {
|
||||
mset.directs++
|
||||
}
|
||||
}
|
||||
|
||||
func (mset *stream) removeConsumer(o *consumer) {
|
||||
if o.cfg.FilterSubject != _EMPTY_ {
|
||||
mset.numFilter--
|
||||
}
|
||||
if o.cfg.Direct {
|
||||
mset.directs--
|
||||
}
|
||||
delete(mset.consumers, o.name)
|
||||
}
|
||||
|
||||
@@ -3075,34 +3084,24 @@ func (mset *stream) checkInterest(seq uint64, obs *consumer) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Helper function to see if we have "real" consumers for a workqueue.
|
||||
// Used in mixed mode where we have direct/no ack consumers for sync.
|
||||
func (mset *stream) hasExplicitAckConsumers() bool {
|
||||
mset.mu.RLock()
|
||||
defer mset.mu.RUnlock()
|
||||
for _, o := range mset.consumers {
|
||||
if o.cfg.AckPolicy == AckExplicit {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy.
|
||||
func (mset *stream) ackMsg(o *consumer, seq uint64) {
|
||||
switch mset.cfg.Retention {
|
||||
case LimitsPolicy:
|
||||
return
|
||||
case WorkQueuePolicy:
|
||||
// Consumer is nil this is signaling from an AckNone.
|
||||
// Meaning a sync consumer for a mirror or a source.
|
||||
if o != nil || !mset.hasExplicitAckConsumers() {
|
||||
// Normally we just remove a message when its ack'd here but if we have direct consumers
|
||||
// from sources and/or mirrors we need to make sure they have delivered the msg.
|
||||
mset.mu.RLock()
|
||||
shouldRemove := mset.directs <= 0 || !mset.checkInterest(seq, o)
|
||||
mset.mu.RUnlock()
|
||||
if shouldRemove {
|
||||
mset.store.RemoveMsg(seq)
|
||||
}
|
||||
case InterestPolicy:
|
||||
mset.mu.Lock()
|
||||
mset.mu.RLock()
|
||||
hasInterest := mset.checkInterest(seq, o)
|
||||
mset.mu.Unlock()
|
||||
mset.mu.RUnlock()
|
||||
if !hasInterest {
|
||||
mset.store.RemoveMsg(seq)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user