mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Install snapshot and compact when WAL grows, even when no state changes occur
This commit is contained in:
@@ -1882,9 +1882,17 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
|
||||
}
|
||||
|
||||
snap := mset.stateSnapshot()
|
||||
if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) {
|
||||
ne, nb := n.Size()
|
||||
hash := highwayhash.Sum(snap, key)
|
||||
// If the state hasn't changed but the log has gone way over
|
||||
// the compaction size then we will want to compact anyway.
|
||||
// This shouldn't happen for streams like it can for pull
|
||||
// consumers on idle streams but better to be safe than sorry!
|
||||
if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin {
|
||||
if err := n.InstallSnapshot(snap); err == nil {
|
||||
lastSnap, lastSnapTime = hash[:], time.Now()
|
||||
} else {
|
||||
s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4015,22 +4023,29 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
|
||||
var lastSnap []byte
|
||||
var lastSnapTime time.Time
|
||||
|
||||
doSnapshot := func(force bool) {
|
||||
doSnapshot := func() {
|
||||
// Bail if trying too fast and not in a forced situation.
|
||||
if !force && time.Since(lastSnapTime) < minSnapDelta {
|
||||
if time.Since(lastSnapTime) < minSnapDelta {
|
||||
return
|
||||
}
|
||||
|
||||
// Check several things to see if we need a snapshot.
|
||||
if !force || !n.NeedSnapshot() {
|
||||
ne, nb := n.Size()
|
||||
if !n.NeedSnapshot() {
|
||||
// Check if we should compact etc. based on size of log.
|
||||
if ne, nb := n.Size(); ne < compactNumMin && nb < compactSizeMin {
|
||||
if ne < compactNumMin && nb < compactSizeMin {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if snap, err := o.store.EncodedState(); err == nil {
|
||||
if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) {
|
||||
hash := highwayhash.Sum(snap, key)
|
||||
// If the state hasn't changed but the log has gone way over
|
||||
// the compaction size then we will want to compact anyway.
|
||||
// This can happen for example when a pull consumer fetches a
|
||||
// lot on an idle stream, log entries get distributed but the
|
||||
// state never changes, therefore the log never gets compacted.
|
||||
if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin {
|
||||
if err := n.InstallSnapshot(snap); err == nil {
|
||||
lastSnap, lastSnapTime = hash[:], time.Now()
|
||||
} else {
|
||||
@@ -4076,7 +4091,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
|
||||
if ce == nil {
|
||||
recovering = false
|
||||
if n.NeedSnapshot() {
|
||||
doSnapshot(true)
|
||||
doSnapshot()
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -4084,7 +4099,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
|
||||
ne, nb := n.Applied(ce.Index)
|
||||
// If we have at least min entries to compact, go ahead and snapshot/compact.
|
||||
if nb > 0 && ne >= compactNumMin || nb > compactSizeMin {
|
||||
doSnapshot(false)
|
||||
doSnapshot()
|
||||
}
|
||||
} else {
|
||||
s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name)
|
||||
@@ -4096,7 +4111,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
|
||||
js.setConsumerAssignmentRecovering(ca)
|
||||
}
|
||||
if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader {
|
||||
doSnapshot(true)
|
||||
doSnapshot()
|
||||
}
|
||||
|
||||
// We may receive a leader change after the consumer assignment which would cancel us
|
||||
@@ -4185,7 +4200,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
|
||||
}
|
||||
|
||||
case <-t.C:
|
||||
doSnapshot(false)
|
||||
doSnapshot()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2779,3 +2779,59 @@ func TestJetStreamClusterInterestPolicyEphemeral(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestJetStreamClusterWALBuildupOnNoOpPull tests whether or not the consumer
|
||||
// RAFT log is being compacted when the stream is idle but we are performing
|
||||
// lots of fetches. Otherwise the disk usage just spirals out of control if
|
||||
// there are no other state changes to trigger a compaction.
|
||||
func TestJetStreamClusterWALBuildupOnNoOpPull(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "R3S", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
_, err := js.AddStream(&nats.StreamConfig{
|
||||
Name: "TEST",
|
||||
Subjects: []string{"foo"},
|
||||
Replicas: 3,
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
sub, err := js.PullSubscribe(
|
||||
"foo",
|
||||
"durable",
|
||||
nats.ConsumerReplicas(3),
|
||||
)
|
||||
require_NoError(t, err)
|
||||
|
||||
for i := 0; i < 10000; i++ {
|
||||
_, _ = sub.Fetch(1, nats.MaxWait(time.Microsecond))
|
||||
}
|
||||
|
||||
// Needs to be at least 5 seconds, otherwise we won't hit the
|
||||
// minSnapDelta that prevents us from snapshotting too often
|
||||
time.Sleep(time.Second * 6)
|
||||
|
||||
for i := 0; i < 1024; i++ {
|
||||
_, _ = sub.Fetch(1, nats.MaxWait(time.Microsecond))
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
server := c.randomNonConsumerLeader(globalAccountName, "TEST", "durable")
|
||||
|
||||
stream, err := server.globalAccount().lookupStream("TEST")
|
||||
require_NoError(t, err)
|
||||
|
||||
consumer := stream.lookupConsumer("durable")
|
||||
require_NotNil(t, consumer)
|
||||
|
||||
entries, bytes := consumer.raftNode().Size()
|
||||
t.Log("new entries:", entries)
|
||||
t.Log("new bytes:", bytes)
|
||||
|
||||
if max := uint64(1024); entries > max {
|
||||
t.Fatalf("got %d entries, expected less than %d entries", entries, max)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,6 +81,13 @@ func require_NoError(t testing.TB, err error) {
|
||||
}
|
||||
}
|
||||
|
||||
func require_NotNil(t testing.TB, v any) {
|
||||
t.Helper()
|
||||
if v == nil {
|
||||
t.Fatalf("require not nil, but got: %v", v)
|
||||
}
|
||||
}
|
||||
|
||||
func require_Contains(t *testing.T, s string, subStrs ...string) {
|
||||
t.Helper()
|
||||
for _, subStr := range subStrs {
|
||||
|
||||
Reference in New Issue
Block a user