diff --git a/server/const.go b/server/const.go index 7ea17f2e..1993f102 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.0-RC.7" + VERSION = "2.2.0-RC.7.1" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 61107fb8..23797fe8 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3957,19 +3957,21 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ } // Proceed with proposing this message. - mset.mu.Lock() // We only use mset.clseq for clustering and in case we run ahead of actual commits. // Check if we need to set initial value here + mset.clMu.Lock() if mset.clseq == 0 { + mset.mu.RLock() mset.clseq = mset.lseq + mset.mu.RUnlock() } esm := encodeStreamMsg(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano()) mset.clseq++ - // Do proposal. - mset.mu.Unlock() err := mset.node.Propose(esm) + mset.clMu.Unlock() + if err != nil { mset.mu.Lock() mset.clseq-- diff --git a/server/raft.go b/server/raft.go index 32f26ed8..e7ae18f3 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1950,8 +1950,11 @@ func (n *raft) applyCommit(index uint64) error { } // For entering and exiting the system, proposals and apply we // will block. + closed := n.state == Closed n.Unlock() - n.applyc <- &CommittedEntry{index, committed} + if !closed { + n.applyc <- &CommittedEntry{index, committed} + } n.Lock() } else { // If we processed inline update our applied index. diff --git a/server/stream.go b/server/stream.go index efb549d1..1714b200 100644 --- a/server/stream.go +++ b/server/stream.go @@ -169,6 +169,7 @@ type stream struct { catchup bool syncSub *subscription infoSub *subscription + clMu sync.Mutex clseq uint64 clfs uint64 lqsent time.Time