From e5e8205fac799d987f71f09391f11de46b347602 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 9 Mar 2021 00:34:33 -0600 Subject: [PATCH] Need to make sure order of clseq as stamped also make it to the propose chan. However we do not want to hold the actual stream lock. Signed-off-by: Derek Collison --- server/const.go | 2 +- server/jetstream_cluster.go | 8 +++++--- server/raft.go | 5 ++++- server/stream.go | 1 + 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/server/const.go b/server/const.go index 7ea17f2e..1993f102 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.2.0-RC.7" + VERSION = "2.2.0-RC.7.1" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 61107fb8..23797fe8 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3957,19 +3957,21 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ } // Proceed with proposing this message. - mset.mu.Lock() // We only use mset.clseq for clustering and in case we run ahead of actual commits. // Check if we need to set initial value here + mset.clMu.Lock() if mset.clseq == 0 { + mset.mu.RLock() mset.clseq = mset.lseq + mset.mu.RUnlock() } esm := encodeStreamMsg(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano()) mset.clseq++ - // Do proposal. - mset.mu.Unlock() err := mset.node.Propose(esm) + mset.clMu.Unlock() + if err != nil { mset.mu.Lock() mset.clseq-- diff --git a/server/raft.go b/server/raft.go index 32f26ed8..e7ae18f3 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1950,8 +1950,11 @@ func (n *raft) applyCommit(index uint64) error { } // For entering and exiting the system, proposals and apply we // will block. + closed := n.state == Closed n.Unlock() - n.applyc <- &CommittedEntry{index, committed} + if !closed { + n.applyc <- &CommittedEntry{index, committed} + } n.Lock() } else { // If we processed inline update our applied index. diff --git a/server/stream.go b/server/stream.go index efb549d1..1714b200 100644 --- a/server/stream.go +++ b/server/stream.go @@ -169,6 +169,7 @@ type stream struct { catchup bool syncSub *subscription infoSub *subscription + clMu sync.Mutex clseq uint64 clfs uint64 lqsent time.Time