diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 035f8658..13aad33b 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3400,6 +3400,7 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme s, rg := js.srv, sa.Group alreadyRunning := rg.node != nil storage := sa.Config.Storage + restore := sa.Restore js.mu.RUnlock() // Process the raft group and make sure it's running if needed. @@ -3408,11 +3409,13 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme // If we are restoring, create the stream if we are R>1 and not the preferred who handles the // receipt of the snapshot itself. shouldCreate := true - if sa.Restore != nil { + if restore != nil { if len(rg.Peers) == 1 || rg.node != nil && rg.node.ID() == rg.Preferred { shouldCreate = false } else { + js.mu.Lock() sa.Restore = nil + js.mu.Unlock() } } diff --git a/server/raft.go b/server/raft.go index 1d6b2d62..3e1088d9 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1816,9 +1816,11 @@ func (n *raft) runAsFollower() { } else if n.isCatchingUp() { n.debug("Not switching to candidate, catching up") // Check to see if our catchup has stalled. + n.Lock() if n.catchupStalled() { n.cancelCatchup() } + n.Unlock() } else { n.switchToCandidate() return diff --git a/server/stream.go b/server/stream.go index 9a3355d7..f841fcaa 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4490,7 +4490,9 @@ func (mset *stream) resetAndWaitOnConsumers() { } node.Delete() } - o.monitorWg.Wait() + if o.isMonitorRunning() { + o.monitorWg.Wait() + } } }