Make sure to process extended purge operations correctly when being replayed on a restart.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2023-06-03 17:49:45 -07:00
parent 449b429b58
commit dee532495d
2 changed files with 105 additions and 5 deletions

View File

@@ -2808,12 +2808,12 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
panic(err.Error())
}
// Ignore if we are recovering and we have already processed.
if isRecovering {
if mset.state().FirstSeq <= sp.LastSeq {
// Make sure all messages from the purge are gone.
mset.store.Compact(sp.LastSeq + 1)
if isRecovering && (sp.Request == nil || sp.Request.Sequence == 0) {
if sp.Request == nil {
sp.Request = &JSApiStreamPurgeRequest{Sequence: sp.LastSeq}
} else {
sp.Request.Sequence = sp.LastSeq
}
continue
}
s := js.server()

View File

@@ -4280,3 +4280,103 @@ func TestJetStreamClusterLeafnodePlusDaisyChainSetup(t *testing.T) {
// Each cluster hop that has the export/import mapping will add another T message copy.
checkSubsPending(t, tsub, num*4)
}
// https://github.com/nats-io/nats-server/pull/4197
func TestJetStreamClusterPurgeExReplayAfterRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "P3F", 3)
defer c.shutdown()
// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"TEST.>"},
Replicas: 3,
})
require_NoError(t, err)
sendStreamMsg(t, nc, "TEST.0", "OK")
sendStreamMsg(t, nc, "TEST.1", "OK")
sendStreamMsg(t, nc, "TEST.2", "OK")
runTest := func(f func(js nats.JetStreamManager)) *nats.StreamInfo {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
// install snapshot, then execute interior func, ensuring the purge will be recovered later
fsl := c.streamLeader(globalAccountName, "TEST")
fsl.JetStreamSnapshotStream(globalAccountName, "TEST")
f(js)
time.Sleep(250 * time.Millisecond)
fsl.Shutdown()
fsl.WaitForShutdown()
fsl = c.restartServer(fsl)
c.waitOnServerCurrent(fsl)
nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()
c.waitOnStreamLeader(globalAccountName, "TEST")
sl := c.streamLeader(globalAccountName, "TEST")
// keep stepping down so the stream leader matches the initial leader
// we need to check if it restored from the snapshot properly
for sl != fsl {
_, err := nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second)
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")
sl = c.streamLeader(globalAccountName, "TEST")
}
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
return si
}
si := runTest(func(js nats.JetStreamManager) {
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Subject: "TEST.0"})
require_NoError(t, err)
})
if si.State.Msgs != 2 {
t.Fatalf("Expected 2 msgs after restart, got %d", si.State.Msgs)
}
if si.State.FirstSeq != 2 || si.State.LastSeq != 3 {
t.Fatalf("Expected FirstSeq=2, LastSeq=3 after restart, got FirstSeq=%d, LastSeq=%d",
si.State.FirstSeq, si.State.LastSeq)
}
si = runTest(func(js nats.JetStreamManager) {
err = js.PurgeStream("TEST")
require_NoError(t, err)
// Send 2 more messages.
sendStreamMsg(t, nc, "TEST.1", "OK")
sendStreamMsg(t, nc, "TEST.2", "OK")
})
if si.State.Msgs != 2 {
t.Fatalf("Expected 2 msgs after restart, got %d", si.State.Msgs)
}
if si.State.FirstSeq != 4 || si.State.LastSeq != 5 {
t.Fatalf("Expected FirstSeq=4, LastSeq=5 after restart, got FirstSeq=%d, LastSeq=%d",
si.State.FirstSeq, si.State.LastSeq)
}
// Now test a keep
si = runTest(func(js nats.JetStreamManager) {
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Keep: 1})
require_NoError(t, err)
// Send 3 more messages.
sendStreamMsg(t, nc, "TEST.1", "OK")
sendStreamMsg(t, nc, "TEST.2", "OK")
sendStreamMsg(t, nc, "TEST.3", "OK")
})
if si.State.Msgs != 4 {
t.Fatalf("Expected 4 msgs after restart, got %d", si.State.Msgs)
}
if si.State.FirstSeq != 5 || si.State.LastSeq != 8 {
t.Fatalf("Expected FirstSeq=5, LastSeq=8 after restart, got FirstSeq=%d, LastSeq=%d",
si.State.FirstSeq, si.State.LastSeq)
}
}