diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 52a20f51..f1f3e333 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -960,36 +960,62 @@ type recoveryUpdates struct { // Streams and consumers are recovered from disk, and the meta layer's mappings // should clean them up, but under crash scenarios there could be orphans. func (js *jetStream) checkForOrphans() { - js.mu.Lock() - defer js.mu.Unlock() - consumerName := func(o *consumer) string { o.mu.RLock() defer o.mu.RUnlock() return o.name } + // Can not hold jetstream lock while trying to delete streams or consumers. + js.mu.Lock() s, cc := js.srv, js.cluster s.Debugf("JetStream cluster checking for orphans") + var streams []*stream + var consumers []*consumer + for accName, jsa := range js.accounts { asa := cc.streams[accName] for stream, mset := range jsa.streams { if sa := asa[stream]; sa == nil { - s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, stream) - mset.delete() + streams = append(streams, mset) } else { // This one is good, check consumers now. for _, o := range mset.getConsumers() { consumer := consumerName(o) if sa.consumers[consumer] == nil { - s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer) - o.delete() + consumers = append(consumers, o) } } } } } + js.mu.Unlock() + + for _, mset := range streams { + mset.mu.RLock() + accName, stream := mset.acc.Name, mset.cfg.Name + mset.mu.RUnlock() + s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, stream) + if err := mset.delete(); err != nil { + s.Warnf("Deleting stream encountered an error: %v", err) + } + } + for _, o := range consumers { + o.mu.RLock() + accName, mset, consumer := o.acc.Name, o.mset, o.name + o.mu.RUnlock() + stream := "N/A" + if mset != nil { + mset.mu.RLock() + stream = mset.cfg.Name + mset.mu.RUnlock() + } + s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer) + if err := o.delete(); err != nil { + s.Warnf("Deleting consumer encountered an error: %v", err) + } + } } func (js *jetStream) monitorCluster() {