From 52a78c035246bbd24379da923dcdcf75c8a3a57b Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sat, 28 Jan 2023 12:00:03 -0800 Subject: [PATCH] Small optimizations. 1. Only snapshot with minSnap time window like consumers and meta. Make it consistent for all to 5s. 2. Only snapshot at the end of processing all entries pending vs inside the loop. 3. Use fast state when calculating sync request, do not need deleted details there. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 5fc58336..9c59a5f9 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1061,7 +1061,7 @@ func (js *jetStream) monitorCluster() { // FIXME(dlc) - Deal with errors. if didSnap, didRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { _, nb := n.Applied(ce.Index) - if js.hasPeerEntries(ce.Entries) || didSnap || (didRemoval && time.Since(lastSnapTime) > 2*time.Second) { + if js.hasPeerEntries(ce.Entries) || didSnap || (didRemoval && time.Since(lastSnapTime) > 5*time.Second) { // Since we received one make sure we have our own since we do not store // our meta state outside of raft. doSnapshot() @@ -1849,6 +1849,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps compactInterval = 2 * time.Minute compactSizeMin = 8 * 1024 * 1024 compactNumMin = 65536 + minSnapDelta = 5 * time.Second ) // Spread these out for large numbers on server restart. @@ -1870,6 +1871,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Hash of the last snapshot (fixed size in memory). var lastSnap []byte + var lastSnapTime time.Time // Highwayhash key for generating hashes. key := make([]byte, 32) @@ -1877,13 +1879,13 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Should only to be called from leader. doSnapshot := func() { - if mset == nil || isRestore { + if mset == nil || isRestore || time.Since(lastSnapTime) < minSnapDelta { return } snap := mset.stateSnapshot() if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) { if err := n.InstallSnapshot(snap); err == nil { - lastSnap = hash[:] + lastSnap, lastSnapTime = hash[:], time.Now() } } } @@ -1948,6 +1950,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps case <-qch: return case <-aq.ch: + var ne, nb uint64 ces := aq.pop() for _, cei := range ces { // No special processing needed for when we are caught up on restart. @@ -1962,11 +1965,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps ce := cei.(*CommittedEntry) // Apply our entries. if err := js.applyStreamEntries(mset, ce, isRecovering); err == nil { - ne, nb := n.Applied(ce.Index) - // If we have at least min entries to compact, go ahead and snapshot/compact. - if ne >= compactNumMin || nb > compactSizeMin { - doSnapshot() - } + // Update our applied. + ne, nb = n.Applied(ce.Index) } else { s.Warnf("Error applying entries to '%s > %s': %v", accName, sa.Config.Name, err) if isClusterResetErr(err) { @@ -1986,6 +1986,13 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps } } aq.recycle(&ces) + + // Check about snapshotting + // If we have at least min entries to compact, go ahead and try to snapshot/compact. + if ne >= compactNumMin || nb > compactSizeMin { + doSnapshot() + } + case isLeader = <-lch: if isLeader { if sendSnapshot && mset != nil && n != nil { @@ -2542,6 +2549,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco if err := json.Unmarshal(e.Data, &snap); err != nil { return err } + mset.mu.Lock() mset.clfs = snap.Failed mset.mu.Unlock() @@ -3973,7 +3981,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { compactInterval = 2 * time.Minute compactSizeMin = 64 * 1024 // What is stored here is always small for consumers. compactNumMin = 1024 - minSnapDelta = 2 * time.Second + minSnapDelta = 5 * time.Second ) // Spread these out for large numbers on server restart. @@ -6921,7 +6929,8 @@ func (mset *stream) calculateSyncRequest(state *StreamState, snap *streamSnapsho // processSnapshotDeletes will update our current store based on the snapshot // but only processing deletes and new FirstSeq / purges. func (mset *stream) processSnapshotDeletes(snap *streamSnapshot) { - state := mset.state() + var state StreamState + mset.store.FastState(&state) // Always adjust if FirstSeq has moved beyond our state. if snap.FirstSeq > state.FirstSeq { @@ -6931,7 +6940,7 @@ func (mset *stream) processSnapshotDeletes(snap *streamSnapshot) { } // Range the deleted and delete if applicable. for _, dseq := range snap.Deleted { - if dseq <= state.LastSeq { + if dseq > state.FirstSeq && dseq <= state.LastSeq { mset.store.RemoveMsg(dseq) } }