Detect when we are shutting down or if a consumer is already closed when removing a stream.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-04-29 11:18:10 -07:00
parent 4eb4e5496b
commit db972048ce
3 changed files with 27 additions and 7 deletions

View File

@@ -4395,6 +4395,13 @@ func (o *consumer) delete() error {
return o.stopWithFlags(true, false, true, true)
}
// To test for closed state.
func (o *consumer) isClosed() bool {
o.mu.RLock()
defer o.mu.RUnlock()
return o.closed
}
func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
o.mu.Lock()
js := o.js

View File

@@ -1,4 +1,4 @@
// Copyright 2019-2022 The NATS Authors
// Copyright 2019-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -109,11 +109,14 @@ type jetStream struct {
started time.Time
// System level request to purge a stream move
accountPurge *subscription
accountPurge *subscription
// Some bools regarding general state.
metaRecovering bool
standAlone bool
disabled bool
oos bool
shuttingDown bool
}
type remoteUsage struct {
@@ -887,6 +890,8 @@ func (s *Server) shutdownJetStream() {
}
accPurgeSub := js.accountPurge
js.accountPurge = nil
// Signal we are shutting down.
js.shuttingDown = true
js.mu.Unlock()
if accPurgeSub != nil {

View File

@@ -4556,12 +4556,20 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
}
mset.mu.Unlock()
js.mu.RLock()
isShuttingDown := js.shuttingDown
js.mu.RUnlock()
for _, o := range obs {
// Third flag says do not broadcast a signal.
// TODO(dlc) - If we have an err here we don't want to stop
// but should we log?
o.stopWithFlags(deleteFlag, deleteFlag, false, advisory)
o.monitorWg.Wait()
if !o.isClosed() {
// Third flag says do not broadcast a signal.
// TODO(dlc) - If we have an err here we don't want to stop
// but should we log?
o.stopWithFlags(deleteFlag, deleteFlag, false, advisory)
if !isShuttingDown {
o.monitorWg.Wait()
}
}
}
mset.mu.Lock()