mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
The meta layer should snapshot if any oustanding entries are present, regardless of hash.
Fixes this test [TestJetStreamClusterDeleteAndRestoreAndRestart] which would flap since it would not snapshot since hash was same but had entries that would erase stream data. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1041,7 +1041,7 @@ func (js *jetStream) monitorCluster() {
|
||||
// Make sure to stop the raft group on exit to prevent accidental memory bloat.
|
||||
defer n.Stop()
|
||||
|
||||
const compactInterval = 2 * time.Minute
|
||||
const compactInterval = time.Minute
|
||||
t := time.NewTicker(compactInterval)
|
||||
defer t.Stop()
|
||||
|
||||
@@ -1051,10 +1051,10 @@ func (js *jetStream) monitorCluster() {
|
||||
defer lt.Stop()
|
||||
|
||||
var (
|
||||
isLeader bool
|
||||
lastSnap []byte
|
||||
lastSnapTime time.Time
|
||||
minSnapDelta = 10 * time.Second
|
||||
isLeader bool
|
||||
lastSnapTime time.Time
|
||||
compactSizeMin = uint64(8 * 1024 * 1024) // 8MB
|
||||
minSnapDelta = 10 * time.Second
|
||||
)
|
||||
|
||||
// Highwayhash key for generating hashes.
|
||||
@@ -1070,10 +1070,10 @@ func (js *jetStream) monitorCluster() {
|
||||
if js.isMetaRecovering() {
|
||||
return
|
||||
}
|
||||
snap := js.metaSnapshot()
|
||||
if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) {
|
||||
if err := n.InstallSnapshot(snap); err == nil {
|
||||
lastSnap, lastSnapTime = hash[:], time.Now()
|
||||
// For the meta layer we want to snapshot when asked if we need one or have any entries that we can compact.
|
||||
if ne, _ := n.Size(); ne > 0 || n.NeedSnapshot() {
|
||||
if err := n.InstallSnapshot(js.metaSnapshot()); err == nil {
|
||||
lastSnapTime = time.Now()
|
||||
} else if err != errNoSnapAvailable && err != errNodeClosed {
|
||||
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
|
||||
}
|
||||
@@ -1129,13 +1129,11 @@ func (js *jetStream) monitorCluster() {
|
||||
// FIXME(dlc) - Deal with errors.
|
||||
if didSnap, didStreamRemoval, didConsumerRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
|
||||
_, nb := n.Applied(ce.Index)
|
||||
if js.hasPeerEntries(ce.Entries) || didSnap || didStreamRemoval {
|
||||
// Since we received one make sure we have our own since we do not store
|
||||
// our meta state outside of raft.
|
||||
if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) {
|
||||
doSnapshot()
|
||||
} else if didConsumerRemoval && time.Since(lastSnapTime) > minSnapDelta/2 {
|
||||
doSnapshot()
|
||||
} else if lls := len(lastSnap); nb > uint64(lls*8) && lls > 0 && time.Since(lastSnapTime) > minSnapDelta {
|
||||
} else if nb > compactSizeMin && time.Since(lastSnapTime) > minSnapDelta {
|
||||
doSnapshot()
|
||||
}
|
||||
ce.ReturnToPool()
|
||||
|
||||
Reference in New Issue
Block a user