Fix for datarace on clfs

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-09-10 11:07:27 -07:00
parent 5def0a99b4
commit 7d041da3c8
2 changed files with 16 additions and 7 deletions

View File

@@ -2892,9 +2892,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
}
} else if isRecovering {
// On recovery, reset CLFS/FAILED.
mset.mu.Lock()
mset.clfs = ss.Failed
mset.mu.Unlock()
mset.setCLFS(ss.Failed)
}
} else if e.Type == EntryRemovePeer {
js.mu.RLock()
@@ -7268,7 +7266,7 @@ func (mset *stream) stateSnapshot() []byte {
func (mset *stream) stateSnapshotLocked() []byte {
// Decide if we can support the new style of stream snapshots.
if mset.supportsBinarySnapshotLocked() {
snap, _ := mset.store.EncodedStreamState(mset.clfs)
snap, _ := mset.store.EncodedStreamState(mset.getCLFS())
return snap
}
@@ -7470,10 +7468,9 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
mset.clMu.Lock()
if mset.clseq == 0 || mset.clseq < lseq {
// Re-capture
lseq, clfs = mset.lastSeqAndCLFS()
lseq, clfs = mset.lseq, mset.clfs
mset.clseq = lseq + clfs
}
esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano(), mset.compressOK)
mset.clseq++