diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 92e70501..964476aa 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -7722,8 +7722,8 @@ func (mset *stream) isCurrent() bool { return mset.node.Current() && !mset.catchup } -// Maximum requests for the whole server that can be in flight. -const maxConcurrentSyncRequests = 8 +// Maximum requests for the whole server that can be in flight at the same time. +const maxConcurrentSyncRequests = 16 var ( errCatchupCorruptSnapshot = errors.New("corrupt stream snapshot detected") @@ -7900,11 +7900,11 @@ RETRY: // Grab sync request again on failures. if sreq == nil { - mset.mu.Lock() + mset.mu.RLock() var state StreamState mset.store.FastState(&state) sreq = mset.calculateSyncRequest(&state, snap) - mset.mu.Unlock() + mset.mu.RUnlock() if sreq == nil { return nil } diff --git a/server/raft.go b/server/raft.go index 5bcc2a1a..7baa949d 100644 --- a/server/raft.go +++ b/server/raft.go @@ -654,9 +654,9 @@ func (s *Server) transferRaftLeaders() bool { // This should only be called on the leader. func (n *raft) Propose(data []byte) error { n.RLock() - if n.state != Leader { + if state := n.state; state != Leader { n.RUnlock() - n.debug("Proposal ignored, not leader (state: %v)", n.state) + n.debug("Proposal ignored, not leader (state: %v)", state) return errNotLeader } // Error if we had a previous write error.