diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 170e881a..9ec4dc29 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1968,33 +1968,37 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps } accName := acc.GetName() - // Hash of the last snapshot (fixed size in memory). - var lastSnap []byte + // Used to represent how we can detect a changed state quickly and without representing + // a complete and detailed state which could be costly in terms of memory, cpu and GC. + // This only entails how many messages, and the first and last sequence of the stream. + // This is all that is needed to detect a change, and we can get this from FilteredState() + // with and empty filter. + var lastState SimpleState var lastSnapTime time.Time - // Highwayhash key for generating hashes. - key := make([]byte, 32) - rand.Read(key) - // Should only to be called from leader. doSnapshot := func() { if mset == nil || isRestore || time.Since(lastSnapTime) < minSnapDelta { return } - snap := mset.stateSnapshot() - ne, nb := n.Size() - hash := highwayhash.Sum(snap, key) + // Before we actually calculate the detailed state and encode it, let's check the + // simple state to detect any changes. + curState := mset.store.FilteredState(0, _EMPTY_) + // If the state hasn't changed but the log has gone way over // the compaction size then we will want to compact anyway. // This shouldn't happen for streams like it can for pull // consumers on idle streams but better to be safe than sorry! - if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin { - if err := n.InstallSnapshot(snap); err == nil { - lastSnap, lastSnapTime = hash[:], time.Now() - } else if err != errNoSnapAvailable && err != errNodeClosed { - s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err) - } + ne, nb := n.Size() + if curState == lastState && ne < compactNumMin && nb < compactSizeMin { + return + } + + if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil { + lastState, lastSnapTime = curState, time.Now() + } else if err != errNoSnapAvailable && err != errNodeClosed { + s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err) } }