From 4e414f1f05e93fdf074c442c4e9210695278ea3a Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Wed, 4 Oct 2023 13:17:22 -0700 Subject: [PATCH] Skip processing consumer assignments after JS has shutdown (#4625) Signed-off-by: Waldemar Quevedo --- server/consumer.go | 4 ++++ server/jetstream_cluster.go | 8 +++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/server/consumer.go b/server/consumer.go index 04830d48..15fbd128 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1593,6 +1593,10 @@ func (o *consumer) deleteNotActive() { defer ticker.Stop() for range ticker.C { js.mu.RLock() + if js.shuttingDown { + js.mu.RUnlock() + return + } nca := js.consumerAssignment(acc, stream, name) js.mu.RUnlock() // Make sure this is not a new consumer with the same name. diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 964476aa..67278f20 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2705,6 +2705,11 @@ func (mset *stream) resetClusteredState(err error) bool { if sa != nil { js.mu.Lock() + if js.shuttingDown { + js.mu.Unlock() + return + } + s.Warnf("Resetting stream cluster state for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name) // Now wipe groups from assignments. sa.Group.node = nil @@ -3893,6 +3898,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { s, cc := js.srv, js.cluster accName, stream, consumerName := ca.Client.serviceAccount(), ca.Stream, ca.Name noMeta := cc == nil || cc.meta == nil + shuttingDown := js.shuttingDown var ourID string if !noMeta { ourID = cc.meta.ID() @@ -3903,7 +3909,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { } js.mu.RUnlock() - if s == nil || noMeta { + if s == nil || noMeta || shuttingDown { return }