diff --git a/server/filestore.go b/server/filestore.go index 8fd7b20d..be722616 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5108,10 +5108,6 @@ func compareFn(subject string) func(string, string) bool { // PurgeEx will remove messages based on subject filters, sequence and number of messages to keep. // Will return the number of purged messages. func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) { - if sequence > 1 && keep > 0 { - return 0, ErrPurgeArgMismatch - } - if subject == _EMPTY_ || subject == fwcs { if keep == 0 && (sequence == 0 || sequence == 1) { return fs.Purge() diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index df109c33..3e4cd530 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -4367,16 +4367,34 @@ func TestJetStreamClusterPurgeExReplayAfterRestart(t *testing.T) { si = runTest(func(js nats.JetStreamManager) { err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Keep: 1}) require_NoError(t, err) + // Send 4 more messages. + sendStreamMsg(t, nc, "TEST.1", "OK") + sendStreamMsg(t, nc, "TEST.2", "OK") + sendStreamMsg(t, nc, "TEST.3", "OK") + sendStreamMsg(t, nc, "TEST.1", "OK") + }) + if si.State.Msgs != 5 { + t.Fatalf("Expected 5 msgs after restart, got %d", si.State.Msgs) + } + if si.State.FirstSeq != 5 || si.State.LastSeq != 9 { + t.Fatalf("Expected FirstSeq=5, LastSeq=9 after restart, got FirstSeq=%d, LastSeq=%d", + si.State.FirstSeq, si.State.LastSeq) + } + + // Now test a keep on a subject + si = runTest(func(js nats.JetStreamManager) { + err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Subject: "TEST.1", Keep: 1}) + require_NoError(t, err) // Send 3 more messages. sendStreamMsg(t, nc, "TEST.1", "OK") sendStreamMsg(t, nc, "TEST.2", "OK") sendStreamMsg(t, nc, "TEST.3", "OK") }) - if si.State.Msgs != 4 { - t.Fatalf("Expected 4 msgs after restart, got %d", si.State.Msgs) + if si.State.Msgs != 7 { + t.Fatalf("Expected 7 msgs after restart, got %d", si.State.Msgs) } - if si.State.FirstSeq != 5 || si.State.LastSeq != 8 { - t.Fatalf("Expected FirstSeq=5, LastSeq=8 after restart, got FirstSeq=%d, LastSeq=%d", + if si.State.FirstSeq != 5 || si.State.LastSeq != 12 { + t.Fatalf("Expected FirstSeq=5, LastSeq=12 after restart, got FirstSeq=%d, LastSeq=%d", si.State.FirstSeq, si.State.LastSeq) } } diff --git a/server/memstore.go b/server/memstore.go index e6f13f04..1482fc33 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -614,10 +614,6 @@ func (ms *memStore) expireMsgs() { // PurgeEx will remove messages based on subject filters, sequence and number of messages to keep. // Will return the number of purged messages. func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) { - if sequence > 1 && keep > 0 { - return 0, ErrPurgeArgMismatch - } - if subject == _EMPTY_ || subject == fwcs { if keep == 0 && (sequence == 0 || sequence == 1) { return ms.Purge() diff --git a/server/store.go b/server/store.go index a08e92c6..9a1b4792 100644 --- a/server/store.go +++ b/server/store.go @@ -61,8 +61,6 @@ var ( ErrInvalidSequence = errors.New("invalid sequence") // ErrSequenceMismatch is returned when storing a raw message and the expected sequence is wrong. ErrSequenceMismatch = errors.New("expected sequence does not match store") - // ErrPurgeArgMismatch is returned when PurgeEx is called with sequence > 1 and keep > 0. - ErrPurgeArgMismatch = errors.New("sequence > 1 && keep > 0 not allowed") ) // StoreMsg is the stored message format for messages that are retained by the Store layer.