Fix PurgeEx replay with sequence & keep succeeds

This commit is contained in:
Maurice van Veen
2023-06-04 11:56:28 +02:00
parent e1f8064e9e
commit 132567de39
4 changed files with 22 additions and 14 deletions

View File

@@ -5108,10 +5108,6 @@ func compareFn(subject string) func(string, string) bool {
// PurgeEx will remove messages based on subject filters, sequence and number of messages to keep.
// Will return the number of purged messages.
func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) {
if sequence > 1 && keep > 0 {
return 0, ErrPurgeArgMismatch
}
if subject == _EMPTY_ || subject == fwcs {
if keep == 0 && (sequence == 0 || sequence == 1) {
return fs.Purge()

View File

@@ -4367,16 +4367,34 @@ func TestJetStreamClusterPurgeExReplayAfterRestart(t *testing.T) {
si = runTest(func(js nats.JetStreamManager) {
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Keep: 1})
require_NoError(t, err)
// Send 4 more messages.
sendStreamMsg(t, nc, "TEST.1", "OK")
sendStreamMsg(t, nc, "TEST.2", "OK")
sendStreamMsg(t, nc, "TEST.3", "OK")
sendStreamMsg(t, nc, "TEST.1", "OK")
})
if si.State.Msgs != 5 {
t.Fatalf("Expected 5 msgs after restart, got %d", si.State.Msgs)
}
if si.State.FirstSeq != 5 || si.State.LastSeq != 9 {
t.Fatalf("Expected FirstSeq=5, LastSeq=9 after restart, got FirstSeq=%d, LastSeq=%d",
si.State.FirstSeq, si.State.LastSeq)
}
// Now test a keep on a subject
si = runTest(func(js nats.JetStreamManager) {
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Subject: "TEST.1", Keep: 1})
require_NoError(t, err)
// Send 3 more messages.
sendStreamMsg(t, nc, "TEST.1", "OK")
sendStreamMsg(t, nc, "TEST.2", "OK")
sendStreamMsg(t, nc, "TEST.3", "OK")
})
if si.State.Msgs != 4 {
t.Fatalf("Expected 4 msgs after restart, got %d", si.State.Msgs)
if si.State.Msgs != 7 {
t.Fatalf("Expected 7 msgs after restart, got %d", si.State.Msgs)
}
if si.State.FirstSeq != 5 || si.State.LastSeq != 8 {
t.Fatalf("Expected FirstSeq=5, LastSeq=8 after restart, got FirstSeq=%d, LastSeq=%d",
if si.State.FirstSeq != 5 || si.State.LastSeq != 12 {
t.Fatalf("Expected FirstSeq=5, LastSeq=12 after restart, got FirstSeq=%d, LastSeq=%d",
si.State.FirstSeq, si.State.LastSeq)
}
}

View File

@@ -614,10 +614,6 @@ func (ms *memStore) expireMsgs() {
// PurgeEx will remove messages based on subject filters, sequence and number of messages to keep.
// Will return the number of purged messages.
func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) {
if sequence > 1 && keep > 0 {
return 0, ErrPurgeArgMismatch
}
if subject == _EMPTY_ || subject == fwcs {
if keep == 0 && (sequence == 0 || sequence == 1) {
return ms.Purge()

View File

@@ -61,8 +61,6 @@ var (
ErrInvalidSequence = errors.New("invalid sequence")
// ErrSequenceMismatch is returned when storing a raw message and the expected sequence is wrong.
ErrSequenceMismatch = errors.New("expected sequence does not match store")
// ErrPurgeArgMismatch is returned when PurgeEx is called with sequence > 1 and keep > 0.
ErrPurgeArgMismatch = errors.New("sequence > 1 && keep > 0 not allowed")
)
// StoreMsg is the stored message format for messages that are retained by the Store layer.