A stream could have a complicated state with interior deletes.

This is a simpler way to determine if we need to consider a snapshot that involves much less time and CPU and memory.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-04-18 19:11:49 -07:00
parent c43c216415
commit f6195a5ee3

View File

@@ -1968,33 +1968,37 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
}
accName := acc.GetName()
// Hash of the last snapshot (fixed size in memory).
var lastSnap []byte
// Used to represent how we can detect a changed state quickly and without representing
// a complete and detailed state which could be costly in terms of memory, cpu and GC.
// This only entails how many messages, and the first and last sequence of the stream.
// This is all that is needed to detect a change, and we can get this from FilteredState()
// with and empty filter.
var lastState SimpleState
var lastSnapTime time.Time
// Highwayhash key for generating hashes.
key := make([]byte, 32)
rand.Read(key)
// Should only to be called from leader.
doSnapshot := func() {
if mset == nil || isRestore || time.Since(lastSnapTime) < minSnapDelta {
return
}
snap := mset.stateSnapshot()
ne, nb := n.Size()
hash := highwayhash.Sum(snap, key)
// Before we actually calculate the detailed state and encode it, let's check the
// simple state to detect any changes.
curState := mset.store.FilteredState(0, _EMPTY_)
// If the state hasn't changed but the log has gone way over
// the compaction size then we will want to compact anyway.
// This shouldn't happen for streams like it can for pull
// consumers on idle streams but better to be safe than sorry!
if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin {
if err := n.InstallSnapshot(snap); err == nil {
lastSnap, lastSnapTime = hash[:], time.Now()
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
}
ne, nb := n.Size()
if curState == lastState && ne < compactNumMin && nb < compactSizeMin {
return
}
if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil {
lastState, lastSnapTime = curState, time.Now()
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
}
}