diff --git a/server/jetstream.go b/server/jetstream.go index ae8cab59..56e2a6fe 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -858,6 +858,13 @@ func (s *Server) signalPullConsumers() { } } +// Helper for determining if we are shutting down. +func (js *jetStream) isShuttingDown() bool { + js.mu.RLock() + defer js.mu.RUnlock() + return js.shuttingDown +} + // Shutdown jetstream for this server. func (s *Server) shutdownJetStream() { s.mu.RLock() diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 4cc59281..693d8bf9 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2567,6 +2567,12 @@ func (mset *stream) resetClusteredState(err error) bool { node.StepDown() } + // If we detect we are shutting down just return. + if js != nil && js.isShuttingDown() { + s.Debugf("Will not reset stream, jetstream shutting down") + return false + } + // Server if js.limitsExceeded(stype) { s.Debugf("Will not reset stream, server resources exceeded") @@ -3203,19 +3209,23 @@ func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment) node.StepDown(nsa.Group.Preferred) } node.ProposeRemovePeer(ourID) - // shut down monitor by shutting down raft + // shutdown monitor by shutting down raft. node.Delete() } + var isShuttingDown bool // Make sure this node is no longer attached to our stream assignment. if js, _ := s.getJetStreamCluster(); js != nil { js.mu.Lock() nsa.Group.node = nil + isShuttingDown = js.shuttingDown js.mu.Unlock() } - // wait for monitor to be shut down - mset.monitorWg.Wait() + if !isShuttingDown { + // wait for monitor to be shutdown. + mset.monitorWg.Wait() + } mset.stop(true, false) } diff --git a/server/stream.go b/server/stream.go index e49f9d75..9d3e2000 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4474,7 +4474,7 @@ func (mset *stream) internalLoop() { } } -// Used to break consumers out of their +// Used to break consumers out of their monitorConsumer go routines. func (mset *stream) resetAndWaitOnConsumers() { mset.mu.RLock() consumers := make([]*consumer, 0, len(mset.consumers)) @@ -4556,10 +4556,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { } mset.mu.Unlock() - js.mu.RLock() - isShuttingDown := js.shuttingDown - js.mu.RUnlock() - + isShuttingDown := js.isShuttingDown() for _, o := range obs { if !o.isClosed() { // Third flag says do not broadcast a signal.