diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c642b876..153fab1c 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1882,9 +1882,17 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps } snap := mset.stateSnapshot() - if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) { + ne, nb := n.Size() + hash := highwayhash.Sum(snap, key) + // If the state hasn't changed but the log has gone way over + // the compaction size then we will want to compact anyway. + // This shouldn't happen for streams like it can for pull + // consumers on idle streams but better to be safe than sorry! + if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() + } else { + s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err) } } } @@ -4015,22 +4023,29 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { var lastSnap []byte var lastSnapTime time.Time - doSnapshot := func(force bool) { + doSnapshot := func() { // Bail if trying too fast and not in a forced situation. - if !force && time.Since(lastSnapTime) < minSnapDelta { + if time.Since(lastSnapTime) < minSnapDelta { return } // Check several things to see if we need a snapshot. - if !force || !n.NeedSnapshot() { + ne, nb := n.Size() + if !n.NeedSnapshot() { // Check if we should compact etc. based on size of log. - if ne, nb := n.Size(); ne < compactNumMin && nb < compactSizeMin { + if ne < compactNumMin && nb < compactSizeMin { return } } if snap, err := o.store.EncodedState(); err == nil { - if hash := highwayhash.Sum(snap, key); !bytes.Equal(hash[:], lastSnap) { + hash := highwayhash.Sum(snap, key) + // If the state hasn't changed but the log has gone way over + // the compaction size then we will want to compact anyway. + // This can happen for example when a pull consumer fetches a + // lot on an idle stream, log entries get distributed but the + // state never changes, therefore the log never gets compacted. + if !bytes.Equal(hash[:], lastSnap) || ne >= compactNumMin || nb >= compactSizeMin { if err := n.InstallSnapshot(snap); err == nil { lastSnap, lastSnapTime = hash[:], time.Now() } else { @@ -4076,7 +4091,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { if ce == nil { recovering = false if n.NeedSnapshot() { - doSnapshot(true) + doSnapshot() } continue } @@ -4084,7 +4099,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { ne, nb := n.Applied(ce.Index) // If we have at least min entries to compact, go ahead and snapshot/compact. if nb > 0 && ne >= compactNumMin || nb > compactSizeMin { - doSnapshot(false) + doSnapshot() } } else { s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name) @@ -4096,7 +4111,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { js.setConsumerAssignmentRecovering(ca) } if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader { - doSnapshot(true) + doSnapshot() } // We may receive a leader change after the consumer assignment which would cancel us @@ -4185,7 +4200,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { } case <-t.C: - doSnapshot(false) + doSnapshot() } } } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index d7b4cfe2..4646bc79 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -2779,3 +2779,59 @@ func TestJetStreamClusterInterestPolicyEphemeral(t *testing.T) { }) } } + +// TestJetStreamClusterWALBuildupOnNoOpPull tests whether or not the consumer +// RAFT log is being compacted when the stream is idle but we are performing +// lots of fetches. Otherwise the disk usage just spirals out of control if +// there are no other state changes to trigger a compaction. +func TestJetStreamClusterWALBuildupOnNoOpPull(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + sub, err := js.PullSubscribe( + "foo", + "durable", + nats.ConsumerReplicas(3), + ) + require_NoError(t, err) + + for i := 0; i < 10000; i++ { + _, _ = sub.Fetch(1, nats.MaxWait(time.Microsecond)) + } + + // Needs to be at least 5 seconds, otherwise we won't hit the + // minSnapDelta that prevents us from snapshotting too often + time.Sleep(time.Second * 6) + + for i := 0; i < 1024; i++ { + _, _ = sub.Fetch(1, nats.MaxWait(time.Microsecond)) + } + + time.Sleep(time.Second) + + server := c.randomNonConsumerLeader(globalAccountName, "TEST", "durable") + + stream, err := server.globalAccount().lookupStream("TEST") + require_NoError(t, err) + + consumer := stream.lookupConsumer("durable") + require_NotNil(t, consumer) + + entries, bytes := consumer.raftNode().Size() + t.Log("new entries:", entries) + t.Log("new bytes:", bytes) + + if max := uint64(1024); entries > max { + t.Fatalf("got %d entries, expected less than %d entries", entries, max) + } +} diff --git a/server/test_test.go b/server/test_test.go index 9aa46522..a853310f 100644 --- a/server/test_test.go +++ b/server/test_test.go @@ -81,6 +81,13 @@ func require_NoError(t testing.TB, err error) { } } +func require_NotNil(t testing.TB, v any) { + t.Helper() + if v == nil { + t.Fatalf("require not nil, but got: %v", v) + } +} + func require_Contains(t *testing.T, s string, subStrs ...string) { t.Helper() for _, subStr := range subStrs {