mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Merge pull request #3907 from nats-io/raft-fix
[FIXED] Snapshots would not compact through applied.
This commit is contained in:
@@ -999,21 +999,19 @@ func (js *jetStream) monitorCluster() {
|
||||
js.setMetaRecovering()
|
||||
|
||||
// Snapshotting function.
|
||||
doSnapshot := func() []byte {
|
||||
doSnapshot := func() {
|
||||
// Suppress during recovery.
|
||||
if js.isMetaRecovering() {
|
||||
return nil
|
||||
return
|
||||
}
|
||||
snap := js.metaSnapshot()
|
||||
if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) {
|
||||
if err := n.InstallSnapshot(snap); err == nil {
|
||||
lastSnap, lastSnapTime = hash[:], time.Now()
|
||||
return snap
|
||||
} else {
|
||||
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
ru := &recoveryUpdates{
|
||||
@@ -1088,16 +1086,11 @@ func (js *jetStream) monitorCluster() {
|
||||
|
||||
if isLeader {
|
||||
s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil)
|
||||
// Install a snapshot as we become leader. We will also send to the cluster.
|
||||
if snap := doSnapshot(); snap != nil {
|
||||
// If we are caught up distribute our current state to followers.
|
||||
if ne, _ := n.Size(); ne == 0 {
|
||||
// Send our snapshot to others to make sure all in sync.
|
||||
n.SendSnapshot(snap)
|
||||
}
|
||||
}
|
||||
// Optionally install a snapshot as we become leader.
|
||||
doSnapshot()
|
||||
js.checkClusterSize()
|
||||
}
|
||||
|
||||
case <-t.C:
|
||||
doSnapshot()
|
||||
// Periodically check the cluster size.
|
||||
|
||||
@@ -939,9 +939,9 @@ func (n *raft) InstallSnapshot(data []byte) error {
|
||||
var state StreamState
|
||||
n.wal.FastState(&state)
|
||||
|
||||
if state.FirstSeq >= n.applied {
|
||||
if n.applied == 0 || len(data) == 0 {
|
||||
n.Unlock()
|
||||
return nil
|
||||
return errNoSnapAvailable
|
||||
}
|
||||
|
||||
n.debug("Installing snapshot of %d bytes", len(data))
|
||||
@@ -976,7 +976,7 @@ func (n *raft) InstallSnapshot(data []byte) error {
|
||||
// Remember our latest snapshot file.
|
||||
n.snapfile = sfile
|
||||
|
||||
if _, err := n.wal.Compact(snap.lastIndex); err != nil {
|
||||
if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil {
|
||||
n.setWriteErrLocked(err)
|
||||
n.Unlock()
|
||||
return err
|
||||
@@ -1289,6 +1289,7 @@ func (n *raft) StepDown(preferred ...string) error {
|
||||
preferred = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Can't pick ourselves.
|
||||
if maybeLeader == n.id {
|
||||
maybeLeader = noLeader
|
||||
|
||||
Reference in New Issue
Block a user