diff --git a/server/consumer.go b/server/consumer.go index f0e50750..2f90a7c8 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1544,19 +1544,32 @@ func (o *consumer) deleteNotActive() { // If we do forward a proposal to delete ourselves to the metacontroller leader. if !isDirect && s.JetStreamIsClustered() { js.mu.RLock() + var ( + cca consumerAssignment + meta RaftNode + removeEntry []byte + ) ca, cc := js.consumerAssignment(acc, stream, name), js.cluster + if ca != nil && cc != nil { + meta = cc.meta + cca = *ca + cca.Reply = _EMPTY_ + removeEntry = encodeDeleteConsumerAssignment(&cca) + meta.ForwardProposal(removeEntry) + } js.mu.RUnlock() if ca != nil && cc != nil { - cca := *ca - cca.Reply = _EMPTY_ - meta, removeEntry := cc.meta, encodeDeleteConsumerAssignment(&cca) - meta.ForwardProposal(removeEntry) - // Check to make sure we went away. // Don't think this needs to be a monitored go routine. go func() { - ticker := time.NewTicker(10 * time.Second) + const ( + startInterval = 5 * time.Second + maxInterval = 5 * time.Minute + ) + jitter := time.Duration(rand.Int63n(int64(startInterval))) + interval := startInterval + jitter + ticker := time.NewTicker(interval) defer ticker.Stop() for range ticker.C { js.mu.RLock() @@ -1566,9 +1579,14 @@ func (o *consumer) deleteNotActive() { if nca != nil && nca == ca { s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name) meta.ForwardProposal(removeEntry) - } else { - return + if interval < maxInterval { + interval *= 2 + ticker.Reset(interval) + } + continue } + // We saw that consumer has been removed, all done. + return } }() }