Small optimizations.

1. Only snapshot with minSnap time window like consumers and meta. Make it consistent for all to 5s.
2. Only snapshot at the end of processing all entries pending vs inside the loop.
3. Use fast state when calculating sync request, do not need deleted details there.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-01-28 12:00:03 -08:00
parent e1a2da8d85
commit 52a78c0352

View File

@@ -1061,7 +1061,7 @@ func (js *jetStream) monitorCluster() {
// FIXME(dlc) - Deal with errors.
if didSnap, didRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
_, nb := n.Applied(ce.Index)
if js.hasPeerEntries(ce.Entries) || didSnap || (didRemoval && time.Since(lastSnapTime) > 2*time.Second) {
if js.hasPeerEntries(ce.Entries) || didSnap || (didRemoval && time.Since(lastSnapTime) > 5*time.Second) {
// Since we received one make sure we have our own since we do not store
// our meta state outside of raft.
doSnapshot()
@@ -1849,6 +1849,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
compactInterval = 2 * time.Minute
compactSizeMin = 8 * 1024 * 1024
compactNumMin = 65536
minSnapDelta = 5 * time.Second
)
// Spread these out for large numbers on server restart.
@@ -1870,6 +1871,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Hash of the last snapshot (fixed size in memory).
var lastSnap []byte
var lastSnapTime time.Time
// Highwayhash key for generating hashes.
key := make([]byte, 32)
@@ -1877,13 +1879,13 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Should only to be called from leader.
doSnapshot := func() {
if mset == nil || isRestore {
if mset == nil || isRestore || time.Since(lastSnapTime) < minSnapDelta {
return
}
snap := mset.stateSnapshot()
if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap = hash[:]
lastSnap, lastSnapTime = hash[:], time.Now()
}
}
}
@@ -1948,6 +1950,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
case <-qch:
return
case <-aq.ch:
var ne, nb uint64
ces := aq.pop()
for _, cei := range ces {
// No special processing needed for when we are caught up on restart.
@@ -1962,11 +1965,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
ce := cei.(*CommittedEntry)
// Apply our entries.
if err := js.applyStreamEntries(mset, ce, isRecovering); err == nil {
ne, nb := n.Applied(ce.Index)
// If we have at least min entries to compact, go ahead and snapshot/compact.
if ne >= compactNumMin || nb > compactSizeMin {
doSnapshot()
}
// Update our applied.
ne, nb = n.Applied(ce.Index)
} else {
s.Warnf("Error applying entries to '%s > %s': %v", accName, sa.Config.Name, err)
if isClusterResetErr(err) {
@@ -1986,6 +1986,13 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
}
}
aq.recycle(&ces)
// Check about snapshotting
// If we have at least min entries to compact, go ahead and try to snapshot/compact.
if ne >= compactNumMin || nb > compactSizeMin {
doSnapshot()
}
case isLeader = <-lch:
if isLeader {
if sendSnapshot && mset != nil && n != nil {
@@ -2542,6 +2549,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
if err := json.Unmarshal(e.Data, &snap); err != nil {
return err
}
mset.mu.Lock()
mset.clfs = snap.Failed
mset.mu.Unlock()
@@ -3973,7 +3981,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
compactInterval = 2 * time.Minute
compactSizeMin = 64 * 1024 // What is stored here is always small for consumers.
compactNumMin = 1024
minSnapDelta = 2 * time.Second
minSnapDelta = 5 * time.Second
)
// Spread these out for large numbers on server restart.
@@ -6921,7 +6929,8 @@ func (mset *stream) calculateSyncRequest(state *StreamState, snap *streamSnapsho
// processSnapshotDeletes will update our current store based on the snapshot
// but only processing deletes and new FirstSeq / purges.
func (mset *stream) processSnapshotDeletes(snap *streamSnapshot) {
state := mset.state()
var state StreamState
mset.store.FastState(&state)
// Always adjust if FirstSeq has moved beyond our state.
if snap.FirstSeq > state.FirstSeq {
@@ -6931,7 +6940,7 @@ func (mset *stream) processSnapshotDeletes(snap *streamSnapshot) {
}
// Range the deleted and delete if applicable.
for _, dseq := range snap.Deleted {
if dseq <= state.LastSeq {
if dseq > state.FirstSeq && dseq <= state.LastSeq {
mset.store.RemoveMsg(dseq)
}
}