diff --git a/server/filestore.go b/server/filestore.go index 91d17ffc..cc9ca1c3 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -904,7 +904,7 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState { } // If subj is empty or we are not tracking multiple subjects. - if subj == _EMPTY_ || subj == ">" || !fs.tms { + if subj == _EMPTY_ || subj == fwcs || !fs.tms { total := lseq - sseq + 1 if state := fs.State(); len(state.Deleted) > 0 { for _, dseq := range state.Deleted { @@ -3113,7 +3113,7 @@ func subjectsAll(a, b string) bool { } func compareFn(subject string) func(string, string) bool { - if subject == _EMPTY_ || subject == ">" { + if subject == _EMPTY_ || subject == fwcs { return subjectsAll } if subjectHasWildcard(subject) { @@ -3125,13 +3125,27 @@ func compareFn(subject string) func(string, string) bool { // PurgeEx will remove messages based on subject filters, sequence and number of messages to keep. // Will return the number of purged messages. func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) { - if subject == _EMPTY_ || subject == ">" && keep == 0 { - return fs.Purge() + if subject == _EMPTY_ || subject == fwcs { + if keep == 0 && (sequence == 0 || sequence == 1) { + return fs.Purge() + } + if sequence > 1 { + return fs.Compact(sequence) + } else if keep > 0 { + fs.mu.RLock() + msgs, lseq := fs.state.Msgs, fs.state.LastSeq + fs.mu.RUnlock() + if keep >= msgs { + return 0, nil + } + return fs.Compact(lseq - keep + 1) + } + return 0, nil } eq := compareFn(subject) if ss := fs.FilteredState(1, subject); ss.Msgs > 0 { if keep > 0 { - if keep > ss.Msgs { + if keep >= ss.Msgs { return 0, nil } ss.Msgs -= keep diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index c346fca6..cddd4127 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -7368,7 +7368,6 @@ func TestJetStreamClusterExtendedStreamPurge(t *testing.T) { } // Do manually for now. nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) - si, err := js.StreamInfo("KV") if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -7416,6 +7415,7 @@ func TestJetStreamClusterExtendedStreamPurge(t *testing.T) { shouldFail(&JSApiStreamPurgeRequest{Sequence: 10, Keep: 10}) purge := func(preq *JSApiStreamPurgeRequest, newTotal uint64) { + t.Helper() req, _ := json.Marshal(preq) resp, err := nc.Request(fmt.Sprintf(JSApiStreamPurgeT, "KV"), req, time.Second) if err != nil { @@ -7437,6 +7437,7 @@ func TestJetStreamClusterExtendedStreamPurge(t *testing.T) { } } expectLeft := func(subject string, expected uint64) { + t.Helper() ci, err := js.AddConsumer("KV", &nats.ConsumerConfig{Durable: "dlc", FilterSubject: subject, AckPolicy: nats.AckExplicitPolicy}) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -7457,6 +7458,36 @@ func TestJetStreamClusterExtendedStreamPurge(t *testing.T) { expectLeft("kv.baz", 50) purge(&JSApiStreamPurgeRequest{Subject: "kv.*"}, 0) + + // RESET + js.DeleteStream("KV") + // Do manually for now. + nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) + if _, err := js.StreamInfo("KV"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Put in 100. + for i := 0; i < 100; i++ { + js.Publish("kv.foo", []byte("OK")) + } + purge(&JSApiStreamPurgeRequest{Subject: "kv.foo", Keep: 10}, 10) + purge(&JSApiStreamPurgeRequest{Subject: "kv.foo", Keep: 10}, 10) + expectLeft("kv.foo", 10) + + // RESET AGAIN + js.DeleteStream("KV") + // Do manually for now. + nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) + if _, err := js.StreamInfo("KV"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Put in 100. + for i := 0; i < 100; i++ { + js.Publish("kv.foo", []byte("OK")) + } + purge(&JSApiStreamPurgeRequest{Keep: 10}, 10) + expectLeft(">", 10) + }) } diff --git a/server/memstore.go b/server/memstore.go index 7ef8e189..de0cd20c 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -268,7 +268,7 @@ func (ms *memStore) FilteredState(sseq uint64, subj string) SimpleState { } // If we want everything. - if subj == _EMPTY_ || subj == ">" { + if subj == _EMPTY_ || subj == fwcs { ss.Msgs, ss.First, ss.Last = ms.state.Msgs, ms.state.FirstSeq, ms.state.LastSeq return ss } @@ -389,13 +389,28 @@ func (ms *memStore) expireMsgs() { // PurgeEx will remove messages based on subject filters, sequence and number of messages to keep. // Will return the number of purged messages. func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) { - if subject == _EMPTY_ || subject == ">" && keep == 0 { - return ms.Purge() + if subject == _EMPTY_ || subject == fwcs { + if keep == 0 && (sequence == 0 || sequence == 1) { + return ms.Purge() + } + if sequence > 1 { + return ms.Compact(sequence) + } else if keep > 0 { + ms.mu.RLock() + msgs, lseq := ms.state.Msgs, ms.state.LastSeq + ms.mu.RUnlock() + if keep >= msgs { + return 0, nil + } + return ms.Compact(lseq - keep + 1) + } + return 0, nil + } eq := compareFn(subject) if ss := ms.FilteredState(1, subject); ss.Msgs > 0 { if keep > 0 { - if keep > ss.Msgs { + if keep >= ss.Msgs { return 0, nil } ss.Msgs -= keep diff --git a/server/sublist.go b/server/sublist.go index 83925ed3..28b5b9ff 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -30,6 +30,7 @@ import ( const ( pwc = '*' fwc = '>' + fwcs = ">" tsep = "." btsep = '.' )