Fixed bug where snapshot would not compact through applied. This mean a subsequent request for exactly applied would return that entry only not the full state snapshot.

Fixed bug where we would not snapshot when we should.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-02-23 22:19:37 -08:00
parent 79df099c44
commit ea2bfad8ea
2 changed files with 9 additions and 15 deletions

View File

@@ -999,21 +999,19 @@ func (js *jetStream) monitorCluster() {
js.setMetaRecovering()
// Snapshotting function.
doSnapshot := func() []byte {
doSnapshot := func() {
// Suppress during recovery.
if js.isMetaRecovering() {
return nil
return
}
snap := js.metaSnapshot()
if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
return snap
} else {
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
}
}
return nil
}
ru := &recoveryUpdates{
@@ -1088,16 +1086,11 @@ func (js *jetStream) monitorCluster() {
if isLeader {
s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil)
// Install a snapshot as we become leader. We will also send to the cluster.
if snap := doSnapshot(); snap != nil {
// If we are caught up distribute our current state to followers.
if ne, _ := n.Size(); ne == 0 {
// Send our snapshot to others to make sure all in sync.
n.SendSnapshot(snap)
}
}
// Optionally install a snapshot as we become leader.
doSnapshot()
js.checkClusterSize()
}
case <-t.C:
doSnapshot()
// Periodically check the cluster size.

View File

@@ -939,9 +939,9 @@ func (n *raft) InstallSnapshot(data []byte) error {
var state StreamState
n.wal.FastState(&state)
if state.FirstSeq >= n.applied {
if n.applied == 0 || len(data) == 0 {
n.Unlock()
return nil
return errNoSnapAvailable
}
n.debug("Installing snapshot of %d bytes", len(data))
@@ -976,7 +976,7 @@ func (n *raft) InstallSnapshot(data []byte) error {
// Remember our latest snapshot file.
n.snapfile = sfile
if _, err := n.wal.Compact(snap.lastIndex); err != nil {
if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil {
n.setWriteErrLocked(err)
n.Unlock()
return err
@@ -1289,6 +1289,7 @@ func (n *raft) StepDown(preferred ...string) error {
preferred = nil
}
}
// Can't pick ourselves.
if maybeLeader == n.id {
maybeLeader = noLeader