diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 5fc58336..9c59a5f9 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1061,7 +1061,7 @@ func (js *jetStream) monitorCluster() { // FIXME(dlc) - Deal with errors. if didSnap, didRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { _, nb := n.Applied(ce.Index) - if js.hasPeerEntries(ce.Entries) || didSnap || (didRemoval && time.Since(lastSnapTime) > 2*time.Second) { + if js.hasPeerEntries(ce.Entries) || didSnap || (didRemoval && time.Since(lastSnapTime) > 5*time.Second) { // Since we received one make sure we have our own since we do not store // our meta state outside of raft. doSnapshot() @@ -1849,6 +1849,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps compactInterval = 2 * time.Minute compactSizeMin = 8 * 1024 * 1024 compactNumMin = 65536 + minSnapDelta = 5 * time.Second ) // Spread these out for large numbers on server restart. @@ -1870,6 +1871,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Hash of the last snapshot (fixed size in memory). var lastSnap []byte + var lastSnapTime time.Time // Highwayhash key for generating hashes. key := make([]byte, 32) @@ -1877,13 +1879,13 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Should only to be called from leader. doSnapshot := func() { - if mset == nil || isRestore { + if mset == nil || isRestore || time.Since(lastSnapTime) < minSnapDelta { return } snap := mset.stateSnapshot() if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) { if err := n.InstallSnapshot(snap); err == nil { - lastSnap = hash[:] + lastSnap, lastSnapTime = hash[:], time.Now() } } } @@ -1948,6 +1950,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps case <-qch: return case <-aq.ch: + var ne, nb uint64 ces := aq.pop() for _, cei := range ces { // No special processing needed for when we are caught up on restart. @@ -1962,11 +1965,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps ce := cei.(*CommittedEntry) // Apply our entries. if err := js.applyStreamEntries(mset, ce, isRecovering); err == nil { - ne, nb := n.Applied(ce.Index) - // If we have at least min entries to compact, go ahead and snapshot/compact. - if ne >= compactNumMin || nb > compactSizeMin { - doSnapshot() - } + // Update our applied. + ne, nb = n.Applied(ce.Index) } else { s.Warnf("Error applying entries to '%s > %s': %v", accName, sa.Config.Name, err) if isClusterResetErr(err) { @@ -1986,6 +1986,13 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps } } aq.recycle(&ces) + + // Check about snapshotting + // If we have at least min entries to compact, go ahead and try to snapshot/compact. + if ne >= compactNumMin || nb > compactSizeMin { + doSnapshot() + } + case isLeader = <-lch: if isLeader { if sendSnapshot && mset != nil && n != nil { @@ -2542,6 +2549,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco if err := json.Unmarshal(e.Data, &snap); err != nil { return err } + mset.mu.Lock() mset.clfs = snap.Failed mset.mu.Unlock() @@ -3973,7 +3981,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { compactInterval = 2 * time.Minute compactSizeMin = 64 * 1024 // What is stored here is always small for consumers. compactNumMin = 1024 - minSnapDelta = 2 * time.Second + minSnapDelta = 5 * time.Second ) // Spread these out for large numbers on server restart. @@ -6921,7 +6929,8 @@ func (mset *stream) calculateSyncRequest(state *StreamState, snap *streamSnapsho // processSnapshotDeletes will update our current store based on the snapshot // but only processing deletes and new FirstSeq / purges. func (mset *stream) processSnapshotDeletes(snap *streamSnapshot) { - state := mset.state() + var state StreamState + mset.store.FastState(&state) // Always adjust if FirstSeq has moved beyond our state. if snap.FirstSeq > state.FirstSeq { @@ -6931,7 +6940,7 @@ func (mset *stream) processSnapshotDeletes(snap *streamSnapshot) { } // Range the deleted and delete if applicable. for _, dseq := range snap.Deleted { - if dseq <= state.LastSeq { + if dseq > state.FirstSeq && dseq <= state.LastSeq { mset.store.RemoveMsg(dseq) } }