mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Add in a few more places to check on jetstream shutting down.
Add in a helper method. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -858,6 +858,13 @@ func (s *Server) signalPullConsumers() {
|
||||
}
|
||||
}
|
||||
|
||||
// Helper for determining if we are shutting down.
|
||||
func (js *jetStream) isShuttingDown() bool {
|
||||
js.mu.RLock()
|
||||
defer js.mu.RUnlock()
|
||||
return js.shuttingDown
|
||||
}
|
||||
|
||||
// Shutdown jetstream for this server.
|
||||
func (s *Server) shutdownJetStream() {
|
||||
s.mu.RLock()
|
||||
|
||||
@@ -2567,6 +2567,12 @@ func (mset *stream) resetClusteredState(err error) bool {
|
||||
node.StepDown()
|
||||
}
|
||||
|
||||
// If we detect we are shutting down just return.
|
||||
if js != nil && js.isShuttingDown() {
|
||||
s.Debugf("Will not reset stream, jetstream shutting down")
|
||||
return false
|
||||
}
|
||||
|
||||
// Server
|
||||
if js.limitsExceeded(stype) {
|
||||
s.Debugf("Will not reset stream, server resources exceeded")
|
||||
@@ -3203,19 +3209,23 @@ func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment)
|
||||
node.StepDown(nsa.Group.Preferred)
|
||||
}
|
||||
node.ProposeRemovePeer(ourID)
|
||||
// shut down monitor by shutting down raft
|
||||
// shutdown monitor by shutting down raft.
|
||||
node.Delete()
|
||||
}
|
||||
|
||||
var isShuttingDown bool
|
||||
// Make sure this node is no longer attached to our stream assignment.
|
||||
if js, _ := s.getJetStreamCluster(); js != nil {
|
||||
js.mu.Lock()
|
||||
nsa.Group.node = nil
|
||||
isShuttingDown = js.shuttingDown
|
||||
js.mu.Unlock()
|
||||
}
|
||||
|
||||
// wait for monitor to be shut down
|
||||
mset.monitorWg.Wait()
|
||||
if !isShuttingDown {
|
||||
// wait for monitor to be shutdown.
|
||||
mset.monitorWg.Wait()
|
||||
}
|
||||
mset.stop(true, false)
|
||||
}
|
||||
|
||||
|
||||
@@ -4474,7 +4474,7 @@ func (mset *stream) internalLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
// Used to break consumers out of their
|
||||
// Used to break consumers out of their monitorConsumer go routines.
|
||||
func (mset *stream) resetAndWaitOnConsumers() {
|
||||
mset.mu.RLock()
|
||||
consumers := make([]*consumer, 0, len(mset.consumers))
|
||||
@@ -4556,10 +4556,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
|
||||
js.mu.RLock()
|
||||
isShuttingDown := js.shuttingDown
|
||||
js.mu.RUnlock()
|
||||
|
||||
isShuttingDown := js.isShuttingDown()
|
||||
for _, o := range obs {
|
||||
if !o.isClosed() {
|
||||
// Third flag says do not broadcast a signal.
|
||||
|
||||
Reference in New Issue
Block a user