mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
With WorkQueue policy streams that are mirrored or sourced need to only delete if no regular consumers.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1109,9 +1109,9 @@ func (o *consumer) processAck(_ *subscription, c *client, subject, reply string,
|
||||
|
||||
switch {
|
||||
case len(msg) == 0, bytes.Equal(msg, AckAck), bytes.Equal(msg, AckOK):
|
||||
o.ackMsg(sseq, dseq, dc)
|
||||
o.processAckMsg(sseq, dseq, dc, true)
|
||||
case bytes.HasPrefix(msg, AckNext):
|
||||
o.ackMsg(sseq, dseq, dc)
|
||||
o.processAckMsg(sseq, dseq, dc, true)
|
||||
// processNextMsgReq can be invoked from an internal subscription or from here.
|
||||
// Therefore, it has to call msgParts(), so we can't simply pass msg[len(AckNext):]
|
||||
// with current c.pa.hdr because it would cause a panic. We will save the current
|
||||
@@ -1499,11 +1499,6 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) {
|
||||
o.sendAdvisory(o.ackEventT, j)
|
||||
}
|
||||
|
||||
// Process an ack for a message.
|
||||
func (o *consumer) ackMsg(sseq, dseq, dc uint64) {
|
||||
o.processAckMsg(sseq, dseq, dc, true)
|
||||
}
|
||||
|
||||
func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
|
||||
o.mu.Lock()
|
||||
var sagap uint64
|
||||
|
||||
@@ -3075,16 +3075,33 @@ func (mset *stream) checkInterest(seq uint64, obs *consumer) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Helper function to see if we have "real" consumers for a workqueue.
|
||||
// Used in mixed mode where we have direct/no ack consumers for sync.
|
||||
func (mset *stream) hasExplicitAckConsumers() bool {
|
||||
mset.mu.RLock()
|
||||
defer mset.mu.RUnlock()
|
||||
for _, o := range mset.consumers {
|
||||
if o.cfg.AckPolicy == AckExplicit {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy.
|
||||
func (mset *stream) ackMsg(obs *consumer, seq uint64) {
|
||||
func (mset *stream) ackMsg(o *consumer, seq uint64) {
|
||||
switch mset.cfg.Retention {
|
||||
case LimitsPolicy:
|
||||
return
|
||||
case WorkQueuePolicy:
|
||||
mset.store.RemoveMsg(seq)
|
||||
// Consumer is nil this is signaling from an AckNone.
|
||||
// Meaning a sync consumer for a mirror or a source.
|
||||
if o != nil || !mset.hasExplicitAckConsumers() {
|
||||
mset.store.RemoveMsg(seq)
|
||||
}
|
||||
case InterestPolicy:
|
||||
mset.mu.Lock()
|
||||
hasInterest := mset.checkInterest(seq, obs)
|
||||
hasInterest := mset.checkInterest(seq, o)
|
||||
mset.mu.Unlock()
|
||||
if !hasInterest {
|
||||
mset.store.RemoveMsg(seq)
|
||||
|
||||
Reference in New Issue
Block a user