diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 99e98786..ed2e6d5f 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -999,21 +999,19 @@ func (js *jetStream) monitorCluster() { js.setMetaRecovering() // Snapshotting function. - doSnapshot := func() []byte { + doSnapshot := func() { // Suppress during recovery. if js.isMetaRecovering() { - return nil + return } snap := js.metaSnapshot() if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() - return snap } else { s.Warnf("Error snapshotting JetStream cluster state: %v", err) } } - return nil } ru := &recoveryUpdates{ @@ -1088,16 +1086,11 @@ func (js *jetStream) monitorCluster() { if isLeader { s.sendInternalMsgLocked(serverStatsPingReqSubj, _EMPTY_, nil, nil) - // Install a snapshot as we become leader. We will also send to the cluster. - if snap := doSnapshot(); snap != nil { - // If we are caught up distribute our current state to followers. - if ne, _ := n.Size(); ne == 0 { - // Send our snapshot to others to make sure all in sync. - n.SendSnapshot(snap) - } - } + // Optionally install a snapshot as we become leader. + doSnapshot() js.checkClusterSize() } + case <-t.C: doSnapshot() // Periodically check the cluster size. diff --git a/server/raft.go b/server/raft.go index 017f8947..eda9bd40 100644 --- a/server/raft.go +++ b/server/raft.go @@ -939,9 +939,9 @@ func (n *raft) InstallSnapshot(data []byte) error { var state StreamState n.wal.FastState(&state) - if state.FirstSeq >= n.applied { + if n.applied == 0 || len(data) == 0 { n.Unlock() - return nil + return errNoSnapAvailable } n.debug("Installing snapshot of %d bytes", len(data)) @@ -976,7 +976,7 @@ func (n *raft) InstallSnapshot(data []byte) error { // Remember our latest snapshot file. n.snapfile = sfile - if _, err := n.wal.Compact(snap.lastIndex); err != nil { + if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil { n.setWriteErrLocked(err) n.Unlock() return err @@ -1289,6 +1289,7 @@ func (n *raft) StepDown(preferred ...string) error { preferred = nil } } + // Can't pick ourselves. if maybeLeader == n.id { maybeLeader = noLeader