Fix for data race and adjustment to do a backoff on making sure consumers are cleaned up.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-09-14 11:21:11 -07:00
parent b79b180498
commit 392f25b6da

View File

@@ -1544,19 +1544,31 @@ func (o *consumer) deleteNotActive() {
// If we do forward a proposal to delete ourselves to the metacontroller leader.
if !isDirect && s.JetStreamIsClustered() {
js.mu.RLock()
var (
cca consumerAssignment
meta RaftNode
removeEntry []byte
)
ca, cc := js.consumerAssignment(acc, stream, name), js.cluster
if ca != nil && cc != nil {
meta = cc.meta
cca = *ca
cca.Reply = _EMPTY_
removeEntry = encodeDeleteConsumerAssignment(&cca)
meta.ForwardProposal(removeEntry)
}
js.mu.RUnlock()
if ca != nil && cc != nil {
cca := *ca
cca.Reply = _EMPTY_
meta, removeEntry := cc.meta, encodeDeleteConsumerAssignment(&cca)
meta.ForwardProposal(removeEntry)
// Check to make sure we went away.
// Don't think this needs to be a monitored go routine.
go func() {
ticker := time.NewTicker(10 * time.Second)
const (
startInterval = 5 * time.Second
maxInterval = 5 * time.Minute
)
interval := startInterval
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
js.mu.RLock()
@@ -1566,9 +1578,14 @@ func (o *consumer) deleteNotActive() {
if nca != nil && nca == ca {
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
meta.ForwardProposal(removeEntry)
} else {
return
if interval < maxInterval {
interval *= 2
ticker.Reset(interval)
}
continue
}
// We saw that consumer has been removed, all done.
return
}
}()
}