From e0cbe503ed25682adae28b4075298e3cfd28c68a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 7 Mar 2023 06:42:53 -0500 Subject: [PATCH 1/2] Do not hold jetstream lock cleaning up orphans. Could optionally deadlock. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 52a20f51..ed19e4bc 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -960,36 +960,58 @@ type recoveryUpdates struct { // Streams and consumers are recovered from disk, and the meta layer's mappings // should clean them up, but under crash scenarios there could be orphans. func (js *jetStream) checkForOrphans() { - js.mu.Lock() - defer js.mu.Unlock() - consumerName := func(o *consumer) string { o.mu.RLock() defer o.mu.RUnlock() return o.name } + // Can not hold jetstream lock while trying to delete streams or consumers. + js.mu.Lock() s, cc := js.srv, js.cluster s.Debugf("JetStream cluster checking for orphans") + var streams []*stream + var consumers []*consumer + for accName, jsa := range js.accounts { asa := cc.streams[accName] for stream, mset := range jsa.streams { if sa := asa[stream]; sa == nil { - s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, stream) - mset.delete() + streams = append(streams, mset) } else { // This one is good, check consumers now. for _, o := range mset.getConsumers() { consumer := consumerName(o) if sa.consumers[consumer] == nil { - s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer) - o.delete() + consumers = append(consumers, o) } } } } } + js.mu.Unlock() + + for _, mset := range streams { + mset.mu.RLock() + accName, stream := mset.acc.Name, mset.cfg.Name + mset.mu.RUnlock() + s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, stream) + mset.delete() + } + for _, o := range consumers { + o.mu.RLock() + accName, mset, consumer := o.acc.Name, o.mset, o.name + o.mu.RUnlock() + stream := "N/A" + if mset != nil { + mset.mu.RLock() + stream = mset.cfg.Name + mset.mu.RUnlock() + } + s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer) + o.delete() + } } func (js *jetStream) monitorCluster() { From 062dec7f5eb19217fa7eb5bdb8ada29dc1f47ee8 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 7 Mar 2023 19:26:29 -0500 Subject: [PATCH 2/2] Added in error warning if stream or consumer delete fails. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ed19e4bc..f1f3e333 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -997,7 +997,9 @@ func (js *jetStream) checkForOrphans() { accName, stream := mset.acc.Name, mset.cfg.Name mset.mu.RUnlock() s.Warnf("Detected orphaned stream '%s > %s', will cleanup", accName, stream) - mset.delete() + if err := mset.delete(); err != nil { + s.Warnf("Deleting stream encountered an error: %v", err) + } } for _, o := range consumers { o.mu.RLock() @@ -1010,7 +1012,9 @@ func (js *jetStream) checkForOrphans() { mset.mu.RUnlock() } s.Warnf("Detected orphaned consumer '%s > %s > %s', will cleanup", accName, stream, consumer) - o.delete() + if err := o.delete(); err != nil { + s.Warnf("Deleting consumer encountered an error: %v", err) + } } }