From 6058056e3b8a725649e05e54a00a3de7929374df Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 30 Jan 2023 08:52:10 -0800 Subject: [PATCH] Minor fixes and optimizations for snapshots. We were snappshotting more then needed, so double check that we should be doing this at the stream and consumer level. At the raft level, we should have always been compacting the WAL to last+1, so made that consistent. Also fixed bug that would not skip last if more items behind the snapshot. Signed-off-by: Derek Collison --- server/consumer.go | 2 +- server/jetstream_cluster.go | 16 ++++++++------- server/jetstream_cluster_1_test.go | 10 ++++++---- server/jetstream_cluster_2_test.go | 15 +++++++------- server/raft.go | 32 +++++++++++++----------------- server/stream.go | 2 +- 6 files changed, 39 insertions(+), 38 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 66963c1f..73697285 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4227,7 +4227,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { n.Delete() } else { // Try to install snapshot on clean exit - if o.store != nil { + if o.store != nil && n.NeedSnapshot() { if snap, err := o.store.EncodedState(); err == nil { n.InstallSnapshot(snap) } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 9c59a5f9..c6433166 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1061,11 +1061,11 @@ func (js *jetStream) monitorCluster() { // FIXME(dlc) - Deal with errors. if didSnap, didRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { _, nb := n.Applied(ce.Index) - if js.hasPeerEntries(ce.Entries) || didSnap || (didRemoval && time.Since(lastSnapTime) > 5*time.Second) { + if js.hasPeerEntries(ce.Entries) || didSnap || didRemoval { // Since we received one make sure we have our own since we do not store // our meta state outside of raft. doSnapshot() - } else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 { + } else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 && time.Since(lastSnapTime) > 5*time.Second { doSnapshot() } } @@ -1879,9 +1879,10 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Should only to be called from leader. doSnapshot := func() { - if mset == nil || isRestore || time.Since(lastSnapTime) < minSnapDelta { + if mset == nil || isRestore || time.Since(lastSnapTime) < minSnapDelta || !n.NeedSnapshot() { return } + snap := mset.stateSnapshot() if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) { if err := n.InstallSnapshot(snap); err == nil { @@ -4004,15 +4005,16 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { } // Check several things to see if we need a snapshot. - needSnap := force || n.NeedSnapshot() - if !needSnap { + if needSnap := force && n.NeedSnapshot(); !needSnap { // Check if we should compact etc. based on size of log. ne, nb := n.Size() - needSnap = nb > 0 && ne >= compactNumMin || nb > compactSizeMin + if needSnap = nb > 0 && ne >= compactNumMin || nb > compactSizeMin; !needSnap { + return + } } if snap, err := o.store.EncodedState(); err == nil { - if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) || needSnap { + if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() } else { diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index ad718dc7..b899f0e3 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -1606,7 +1606,6 @@ func TestJetStreamClusterStreamSnapshotCatchup(t *testing.T) { sendBatch(2) sl := c.streamLeader("$G", "TEST") - sl.Shutdown() c.waitOnStreamLeader("$G", "TEST") @@ -1645,9 +1644,12 @@ func TestJetStreamClusterStreamSnapshotCatchup(t *testing.T) { mset, err = sl.GlobalAccount().lookupStream("TEST") require_NoError(t, err) - if nstate := mset.stateWithDetail(true); !reflect.DeepEqual(ostate, nstate) { - t.Fatalf("States do not match after recovery: %+v vs %+v", ostate, nstate) - } + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + if nstate := mset.stateWithDetail(true); !reflect.DeepEqual(ostate, nstate) { + return fmt.Errorf("States do not match after recovery: %+v vs %+v", ostate, nstate) + } + return nil + }) } func TestJetStreamClusterDeleteMsg(t *testing.T) { diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 3b99d221..42042bed 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -2629,7 +2629,8 @@ func TestJetStreamClusterStreamCatchupNoState(t *testing.T) { t.Fatalf("Error installing snapshot: %v", err) } } - js.Publish("foo.created", []byte("REQ")) + _, err := js.Publish("foo.created", []byte("REQ")) + require_NoError(t, err) } config := nsl.JetStreamConfig() @@ -2668,14 +2669,14 @@ func TestJetStreamClusterStreamCatchupNoState(t *testing.T) { nc, js = jsClientConnect(t, c.randomServer()) defer nc.Close() - if _, err := js.Publish("foo.created", []byte("REQ")); err != nil { - t.Fatalf("Unexpected error: %v", err) - } + c.waitOnStreamLeader("$G", "TEST") + + _, err = js.Publish("foo.created", []byte("ZZZ")) + require_NoError(t, err) si, err := js.StreamInfo("TEST") - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + require_NoError(t, err) + if si.State.LastSeq != 101 { t.Fatalf("bad state after restart: %+v", si.State) } diff --git a/server/raft.go b/server/raft.go index cec53ab5..1d56ca7e 100644 --- a/server/raft.go +++ b/server/raft.go @@ -849,7 +849,7 @@ func (n *raft) ResumeApply() { n.resetElectionTimeout() } -// Compact will compact our WAL. +// Compact will compact our WAL up to and including index. // This is for when we know we have our state on stable storage. // E.g. snapshots. func (n *raft) Compact(index uint64) error { @@ -859,7 +859,7 @@ func (n *raft) Compact(index uint64) error { if n.werr != nil { return n.werr } - _, err := n.wal.Compact(index) + _, err := n.wal.Compact(index + 1) if err != nil { n.setWriteErrLocked(err) } @@ -1016,7 +1016,7 @@ func (n *raft) InstallSnapshot(data []byte) error { func (n *raft) NeedSnapshot() bool { n.RLock() defer n.RUnlock() - return n.snapfile == _EMPTY_ && n.applied > 0 + return n.snapfile == _EMPTY_ && n.applied > 1 } const ( @@ -1094,7 +1094,7 @@ func (n *raft) setupLastSnapshot() { n.commit = snap.lastIndex n.applied = snap.lastIndex n.apply.push(&CommittedEntry{n.commit, []*Entry{{EntrySnapshot, snap.data}}}) - if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil { + if _, err := n.wal.Compact(snap.lastIndex); err != nil { n.setWriteErrLocked(err) } } @@ -2297,9 +2297,11 @@ func (n *raft) sendSnapshotToFollower(subject string) (uint64, error) { ae.pterm, ae.pindex = snap.lastTerm, snap.lastIndex var state StreamState n.wal.FastState(&state) - if snap.lastIndex < state.FirstSeq && state.FirstSeq != 0 { - snap.lastIndex = state.FirstSeq - 1 - ae.pindex = snap.lastIndex + + fpIndex := state.FirstSeq - 1 + if snap.lastIndex < fpIndex && state.FirstSeq != 0 { + snap.lastIndex = fpIndex + ae.pindex = fpIndex } encoding, err := ae.encode(nil) @@ -2326,7 +2328,6 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { var state StreamState n.wal.FastState(&state) - var didSnap bool if start < state.FirstSeq || (state.Msgs == 0 && start <= state.LastSeq) { n.debug("Need to send snapshot to follower") if lastIndex, err := n.sendSnapshotToFollower(ar.reply); err != nil { @@ -2334,20 +2335,20 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { n.Unlock() return } else { + start = lastIndex + 1 // If no other entries, we can just return here. - if state.Msgs == 0 { + if state.Msgs == 0 || start > state.LastSeq { n.debug("Finished catching up") n.Unlock() return } n.debug("Snapshot sent, reset first entry to %d", lastIndex) - start, didSnap = lastIndex, true } } ae, err := n.loadEntry(start) if err != nil { - n.warn("Request from follower for index [%d] possibly beyond our last index [%d] - %v", start, state.LastSeq, err) + n.warn("Request from follower for entry at index [%d] errored for state %+v - %v", start, state, err) ae, err = n.loadFirstEntry() } if err != nil || ae == nil { @@ -2360,12 +2361,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { } // Create a queue for delivering updates from responses. indexUpdates := n.s.newIPQueue(fmt.Sprintf("[ACC:%s] RAFT '%s' indexUpdates", n.accName, n.group)) // of uint64 - // If we did send a snapshot above, that means we skipped ahead so bump entry pindex by 1 to start for catchup stream. - if didSnap { - indexUpdates.push(ae.pindex + 1) - } else { - indexUpdates.push(ae.pindex) - } + indexUpdates.push(ae.pindex) n.progress[ar.peer] = indexUpdates n.Unlock() @@ -2948,7 +2944,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.pindex = ae.pindex n.pterm = ae.pterm n.commit = ae.pindex - _, err := n.wal.Compact(n.pindex + 1) + _, err := n.wal.Compact(n.pindex) if err != nil { n.setWriteErrLocked(err) n.Unlock() diff --git a/server/stream.go b/server/stream.go index 4b136d99..4389a314 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4478,7 +4478,7 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { if deleteFlag { n.Delete() sa = mset.sa - } else { + } else if n.NeedSnapshot() { // Attempt snapshot on clean exit. n.InstallSnapshot(mset.stateSnapshotLocked()) n.Stop()