diff --git a/server/consumer.go b/server/consumer.go index 66963c1f..73697285 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4227,7 +4227,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { n.Delete() } else { // Try to install snapshot on clean exit - if o.store != nil { + if o.store != nil && n.NeedSnapshot() { if snap, err := o.store.EncodedState(); err == nil { n.InstallSnapshot(snap) } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 9c59a5f9..c6433166 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1061,11 +1061,11 @@ func (js *jetStream) monitorCluster() { // FIXME(dlc) - Deal with errors. if didSnap, didRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { _, nb := n.Applied(ce.Index) - if js.hasPeerEntries(ce.Entries) || didSnap || (didRemoval && time.Since(lastSnapTime) > 5*time.Second) { + if js.hasPeerEntries(ce.Entries) || didSnap || didRemoval { // Since we received one make sure we have our own since we do not store // our meta state outside of raft. doSnapshot() - } else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 { + } else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 && time.Since(lastSnapTime) > 5*time.Second { doSnapshot() } } @@ -1879,9 +1879,10 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Should only to be called from leader. doSnapshot := func() { - if mset == nil || isRestore || time.Since(lastSnapTime) < minSnapDelta { + if mset == nil || isRestore || time.Since(lastSnapTime) < minSnapDelta || !n.NeedSnapshot() { return } + snap := mset.stateSnapshot() if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) { if err := n.InstallSnapshot(snap); err == nil { @@ -4004,15 +4005,16 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { } // Check several things to see if we need a snapshot. - needSnap := force || n.NeedSnapshot() - if !needSnap { + if needSnap := force && n.NeedSnapshot(); !needSnap { // Check if we should compact etc. based on size of log. ne, nb := n.Size() - needSnap = nb > 0 && ne >= compactNumMin || nb > compactSizeMin + if needSnap = nb > 0 && ne >= compactNumMin || nb > compactSizeMin; !needSnap { + return + } } if snap, err := o.store.EncodedState(); err == nil { - if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) || needSnap { + if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() } else { diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index ad718dc7..b899f0e3 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -1606,7 +1606,6 @@ func TestJetStreamClusterStreamSnapshotCatchup(t *testing.T) { sendBatch(2) sl := c.streamLeader("$G", "TEST") - sl.Shutdown() c.waitOnStreamLeader("$G", "TEST") @@ -1645,9 +1644,12 @@ func TestJetStreamClusterStreamSnapshotCatchup(t *testing.T) { mset, err = sl.GlobalAccount().lookupStream("TEST") require_NoError(t, err) - if nstate := mset.stateWithDetail(true); !reflect.DeepEqual(ostate, nstate) { - t.Fatalf("States do not match after recovery: %+v vs %+v", ostate, nstate) - } + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + if nstate := mset.stateWithDetail(true); !reflect.DeepEqual(ostate, nstate) { + return fmt.Errorf("States do not match after recovery: %+v vs %+v", ostate, nstate) + } + return nil + }) } func TestJetStreamClusterDeleteMsg(t *testing.T) { diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 3b99d221..42042bed 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -2629,7 +2629,8 @@ func TestJetStreamClusterStreamCatchupNoState(t *testing.T) { t.Fatalf("Error installing snapshot: %v", err) } } - js.Publish("foo.created", []byte("REQ")) + _, err := js.Publish("foo.created", []byte("REQ")) + require_NoError(t, err) } config := nsl.JetStreamConfig() @@ -2668,14 +2669,14 @@ func TestJetStreamClusterStreamCatchupNoState(t *testing.T) { nc, js = jsClientConnect(t, c.randomServer()) defer nc.Close() - if _, err := js.Publish("foo.created", []byte("REQ")); err != nil { - t.Fatalf("Unexpected error: %v", err) - } + c.waitOnStreamLeader("$G", "TEST") + + _, err = js.Publish("foo.created", []byte("ZZZ")) + require_NoError(t, err) si, err := js.StreamInfo("TEST") - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + require_NoError(t, err) + if si.State.LastSeq != 101 { t.Fatalf("bad state after restart: %+v", si.State) } diff --git a/server/raft.go b/server/raft.go index cec53ab5..1d56ca7e 100644 --- a/server/raft.go +++ b/server/raft.go @@ -849,7 +849,7 @@ func (n *raft) ResumeApply() { n.resetElectionTimeout() } -// Compact will compact our WAL. +// Compact will compact our WAL up to and including index. // This is for when we know we have our state on stable storage. // E.g. snapshots. func (n *raft) Compact(index uint64) error { @@ -859,7 +859,7 @@ func (n *raft) Compact(index uint64) error { if n.werr != nil { return n.werr } - _, err := n.wal.Compact(index) + _, err := n.wal.Compact(index + 1) if err != nil { n.setWriteErrLocked(err) } @@ -1016,7 +1016,7 @@ func (n *raft) InstallSnapshot(data []byte) error { func (n *raft) NeedSnapshot() bool { n.RLock() defer n.RUnlock() - return n.snapfile == _EMPTY_ && n.applied > 0 + return n.snapfile == _EMPTY_ && n.applied > 1 } const ( @@ -1094,7 +1094,7 @@ func (n *raft) setupLastSnapshot() { n.commit = snap.lastIndex n.applied = snap.lastIndex n.apply.push(&CommittedEntry{n.commit, []*Entry{{EntrySnapshot, snap.data}}}) - if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil { + if _, err := n.wal.Compact(snap.lastIndex); err != nil { n.setWriteErrLocked(err) } } @@ -2297,9 +2297,11 @@ func (n *raft) sendSnapshotToFollower(subject string) (uint64, error) { ae.pterm, ae.pindex = snap.lastTerm, snap.lastIndex var state StreamState n.wal.FastState(&state) - if snap.lastIndex < state.FirstSeq && state.FirstSeq != 0 { - snap.lastIndex = state.FirstSeq - 1 - ae.pindex = snap.lastIndex + + fpIndex := state.FirstSeq - 1 + if snap.lastIndex < fpIndex && state.FirstSeq != 0 { + snap.lastIndex = fpIndex + ae.pindex = fpIndex } encoding, err := ae.encode(nil) @@ -2326,7 +2328,6 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { var state StreamState n.wal.FastState(&state) - var didSnap bool if start < state.FirstSeq || (state.Msgs == 0 && start <= state.LastSeq) { n.debug("Need to send snapshot to follower") if lastIndex, err := n.sendSnapshotToFollower(ar.reply); err != nil { @@ -2334,20 +2335,20 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { n.Unlock() return } else { + start = lastIndex + 1 // If no other entries, we can just return here. - if state.Msgs == 0 { + if state.Msgs == 0 || start > state.LastSeq { n.debug("Finished catching up") n.Unlock() return } n.debug("Snapshot sent, reset first entry to %d", lastIndex) - start, didSnap = lastIndex, true } } ae, err := n.loadEntry(start) if err != nil { - n.warn("Request from follower for index [%d] possibly beyond our last index [%d] - %v", start, state.LastSeq, err) + n.warn("Request from follower for entry at index [%d] errored for state %+v - %v", start, state, err) ae, err = n.loadFirstEntry() } if err != nil || ae == nil { @@ -2360,12 +2361,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { } // Create a queue for delivering updates from responses. indexUpdates := n.s.newIPQueue(fmt.Sprintf("[ACC:%s] RAFT '%s' indexUpdates", n.accName, n.group)) // of uint64 - // If we did send a snapshot above, that means we skipped ahead so bump entry pindex by 1 to start for catchup stream. - if didSnap { - indexUpdates.push(ae.pindex + 1) - } else { - indexUpdates.push(ae.pindex) - } + indexUpdates.push(ae.pindex) n.progress[ar.peer] = indexUpdates n.Unlock() @@ -2948,7 +2944,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.pindex = ae.pindex n.pterm = ae.pterm n.commit = ae.pindex - _, err := n.wal.Compact(n.pindex + 1) + _, err := n.wal.Compact(n.pindex) if err != nil { n.setWriteErrLocked(err) n.Unlock() diff --git a/server/stream.go b/server/stream.go index 4b136d99..4389a314 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4478,7 +4478,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { if deleteFlag { n.Delete() sa = mset.sa - } else { + } else if n.NeedSnapshot() { // Attempt snapshot on clean exit. n.InstallSnapshot(mset.stateSnapshotLocked()) n.Stop()