Do not let !NeedSnapshot() avoid snapshots and compaction.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-02-01 22:05:25 -07:00
parent d40c4c6fb3
commit e9a983c802
3 changed files with 95 additions and 25 deletions

View File

@@ -1879,7 +1879,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Should only to be called from leader.
doSnapshot := func() {
if mset == nil || isRestore || time.Since(lastSnapTime) < minSnapDelta || !n.NeedSnapshot() {
if mset == nil || isRestore || time.Since(lastSnapTime) < minSnapDelta {
return
}
@@ -4005,10 +4005,9 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
}
// Check several things to see if we need a snapshot.
if needSnap := force && n.NeedSnapshot(); !needSnap {
if !force && !n.NeedSnapshot() {
// Check if we should compact etc. based on size of log.
ne, nb := n.Size()
if needSnap = nb > 0 && ne >= compactNumMin || nb > compactSizeMin; !needSnap {
if ne, nb := n.Size(); ne < compactNumMin && nb < compactSizeMin {
return
}
}

View File

@@ -6050,3 +6050,92 @@ func TestNoRaceJetStreamEndToEndLatency(t *testing.T) {
t.Fatalf("Expected max latency to be < 250ms, got %v", max)
}
}
func TestNoRaceJetStreamClusterEnsureWALCompact(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "dlc",
DeliverSubject: "zz",
Replicas: 3,
})
require_NoError(t, err)
// Force snapshot on stream leader.
sl := c.streamLeader(globalAccountName, "TEST")
mset, err := sl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
node := mset.raftNode()
require_True(t, node != nil)
err = node.InstallSnapshot(mset.stateSnapshot())
require_NoError(t, err)
// Now publish more than should be needed to cause an additional snapshot.
ns := 75_000
for i := 0; i <= ns; i++ {
_, err := js.Publish("foo", []byte("bar"))
require_NoError(t, err)
}
// Grab progress and use that to look into WAL entries.
_, _, applied := node.Progress()
// If ne == ns that means snapshots and compacts were not happening when
// they should have been.
if ne, _ := node.Applied(applied); ne >= uint64(ns) {
t.Fatalf("Did not snapshot and compact the raft WAL, entries == %d", ne)
}
// Now check consumer.
// Force snapshot on consumerleader.
cl := c.consumerLeader(globalAccountName, "TEST", "dlc")
mset, err = cl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("dlc")
require_True(t, o != nil)
node = o.raftNode()
require_True(t, node != nil)
snap, err := o.store.EncodedState()
require_NoError(t, err)
err = node.InstallSnapshot(snap)
require_NoError(t, err)
received, done := 0, make(chan bool)
nc.Subscribe("zz", func(m *nats.Msg) {
received++
if received >= ns {
done <- true
}
m.Ack()
})
select {
case <-done:
return
case <-time.After(10 * time.Second):
t.Fatalf("Did not received all %d msgs, only %d", ns, received)
}
// Do same trick and check that WAL was compacted.
// Grab progress and use that to look into WAL entries.
_, _, applied = node.Progress()
// If ne == ns that means snapshots and compacts were not happening when
// they should have been.
if ne, _ := node.Applied(applied); ne >= uint64(ns) {
t.Fatalf("Did not snapshot and compact the raft WAL, entries == %d", ne)
}
}

View File

@@ -40,7 +40,6 @@ type RaftNode interface {
SendSnapshot(snap []byte) error
NeedSnapshot() bool
Applied(index uint64) (entries uint64, bytes uint64)
Compact(index uint64) error
State() RaftState
Size() (entries, bytes uint64)
Progress() (index, commit, applied uint64)
@@ -849,23 +848,6 @@ func (n *raft) ResumeApply() {
n.resetElectionTimeout()
}
// Compact will compact our WAL up to and including index.
// This is for when we know we have our state on stable storage.
// E.g. snapshots.
func (n *raft) Compact(index uint64) error {
n.Lock()
defer n.Unlock()
// Error if we had a previous write error.
if n.werr != nil {
return n.werr
}
_, err := n.wal.Compact(index + 1)
if err != nil {
n.setWriteErrLocked(err)
}
return err
}
// Applied is to be called when the FSM has applied the committed entries.
// Applied will return the number of entries and an estimation of the
// byte size that could be removed with a snapshot/compact.
@@ -995,8 +977,8 @@ func (n *raft) InstallSnapshot(data []byte) error {
n.snapfile = sfile
if _, err := n.wal.Compact(snap.lastIndex); err != nil {
n.setWriteErrLocked(err)
n.Unlock()
n.setWriteErr(err)
return err
}
n.Unlock()
@@ -2944,8 +2926,8 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.pindex = ae.pindex
n.pterm = ae.pterm
n.commit = ae.pindex
_, err := n.wal.Compact(n.pindex)
if err != nil {
if _, err := n.wal.Compact(n.pindex); err != nil {
n.setWriteErrLocked(err)
n.Unlock()
return