From db972048ceebc57f3d1d1309b1ee64600776280b Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 29 Apr 2023 11:18:10 -0700 Subject: [PATCH] Detect when we are shutting down or if a consumer is already closed when removing a stream. Signed-off-by: Derek Collison --- server/consumer.go | 7 +++++++ server/jetstream.go | 9 +++++++-- server/stream.go | 18 +++++++++++++----- 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index e96ba84c..2fd466c0 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4395,6 +4395,13 @@ func (o *consumer) delete() error { return o.stopWithFlags(true, false, true, true) } +// To test for closed state. +func (o *consumer) isClosed() bool { + o.mu.RLock() + defer o.mu.RUnlock() + return o.closed +} + func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { o.mu.Lock() js := o.js diff --git a/server/jetstream.go b/server/jetstream.go index fd76e36e..ae8cab59 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -1,4 +1,4 @@ -// Copyright 2019-2022 The NATS Authors +// Copyright 2019-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -109,11 +109,14 @@ type jetStream struct { started time.Time // System level request to purge a stream move - accountPurge *subscription + accountPurge *subscription + + // Some bools regarding general state. metaRecovering bool standAlone bool disabled bool oos bool + shuttingDown bool } type remoteUsage struct { @@ -887,6 +890,8 @@ func (s *Server) shutdownJetStream() { } accPurgeSub := js.accountPurge js.accountPurge = nil + // Signal we are shutting down. + js.shuttingDown = true js.mu.Unlock() if accPurgeSub != nil { diff --git a/server/stream.go b/server/stream.go index f0f5065f..e49f9d75 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4556,12 +4556,20 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { } mset.mu.Unlock() + js.mu.RLock() + isShuttingDown := js.shuttingDown + js.mu.RUnlock() + for _, o := range obs { - // Third flag says do not broadcast a signal. - // TODO(dlc) - If we have an err here we don't want to stop - // but should we log? - o.stopWithFlags(deleteFlag, deleteFlag, false, advisory) - o.monitorWg.Wait() + if !o.isClosed() { + // Third flag says do not broadcast a signal. + // TODO(dlc) - If we have an err here we don't want to stop + // but should we log? + o.stopWithFlags(deleteFlag, deleteFlag, false, advisory) + if !isShuttingDown { + o.monitorWg.Wait() + } + } } mset.mu.Lock()