Updates and fixes to PurgeEx

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-06-20 10:34:27 -07:00
parent 60bd178ab1
commit 89d930fd0f
4 changed files with 71 additions and 10 deletions

View File

@@ -904,7 +904,7 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState {
}
// If subj is empty or we are not tracking multiple subjects.
if subj == _EMPTY_ || subj == ">" || !fs.tms {
if subj == _EMPTY_ || subj == fwcs || !fs.tms {
total := lseq - sseq + 1
if state := fs.State(); len(state.Deleted) > 0 {
for _, dseq := range state.Deleted {
@@ -3113,7 +3113,7 @@ func subjectsAll(a, b string) bool {
}
func compareFn(subject string) func(string, string) bool {
if subject == _EMPTY_ || subject == ">" {
if subject == _EMPTY_ || subject == fwcs {
return subjectsAll
}
if subjectHasWildcard(subject) {
@@ -3125,13 +3125,27 @@ func compareFn(subject string) func(string, string) bool {
// PurgeEx will remove messages based on subject filters, sequence and number of messages to keep.
// Will return the number of purged messages.
func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) {
if subject == _EMPTY_ || subject == ">" && keep == 0 {
return fs.Purge()
if subject == _EMPTY_ || subject == fwcs {
if keep == 0 && (sequence == 0 || sequence == 1) {
return fs.Purge()
}
if sequence > 1 {
return fs.Compact(sequence)
} else if keep > 0 {
fs.mu.RLock()
msgs, lseq := fs.state.Msgs, fs.state.LastSeq
fs.mu.RUnlock()
if keep >= msgs {
return 0, nil
}
return fs.Compact(lseq - keep + 1)
}
return 0, nil
}
eq := compareFn(subject)
if ss := fs.FilteredState(1, subject); ss.Msgs > 0 {
if keep > 0 {
if keep > ss.Msgs {
if keep >= ss.Msgs {
return 0, nil
}
ss.Msgs -= keep

View File

@@ -7368,7 +7368,6 @@ func TestJetStreamClusterExtendedStreamPurge(t *testing.T) {
}
// Do manually for now.
nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
si, err := js.StreamInfo("KV")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@@ -7416,6 +7415,7 @@ func TestJetStreamClusterExtendedStreamPurge(t *testing.T) {
shouldFail(&JSApiStreamPurgeRequest{Sequence: 10, Keep: 10})
purge := func(preq *JSApiStreamPurgeRequest, newTotal uint64) {
t.Helper()
req, _ := json.Marshal(preq)
resp, err := nc.Request(fmt.Sprintf(JSApiStreamPurgeT, "KV"), req, time.Second)
if err != nil {
@@ -7437,6 +7437,7 @@ func TestJetStreamClusterExtendedStreamPurge(t *testing.T) {
}
}
expectLeft := func(subject string, expected uint64) {
t.Helper()
ci, err := js.AddConsumer("KV", &nats.ConsumerConfig{Durable: "dlc", FilterSubject: subject, AckPolicy: nats.AckExplicitPolicy})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@@ -7457,6 +7458,36 @@ func TestJetStreamClusterExtendedStreamPurge(t *testing.T) {
expectLeft("kv.baz", 50)
purge(&JSApiStreamPurgeRequest{Subject: "kv.*"}, 0)
// RESET
js.DeleteStream("KV")
// Do manually for now.
nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
if _, err := js.StreamInfo("KV"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Put in 100.
for i := 0; i < 100; i++ {
js.Publish("kv.foo", []byte("OK"))
}
purge(&JSApiStreamPurgeRequest{Subject: "kv.foo", Keep: 10}, 10)
purge(&JSApiStreamPurgeRequest{Subject: "kv.foo", Keep: 10}, 10)
expectLeft("kv.foo", 10)
// RESET AGAIN
js.DeleteStream("KV")
// Do manually for now.
nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
if _, err := js.StreamInfo("KV"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Put in 100.
for i := 0; i < 100; i++ {
js.Publish("kv.foo", []byte("OK"))
}
purge(&JSApiStreamPurgeRequest{Keep: 10}, 10)
expectLeft(">", 10)
})
}

View File

@@ -268,7 +268,7 @@ func (ms *memStore) FilteredState(sseq uint64, subj string) SimpleState {
}
// If we want everything.
if subj == _EMPTY_ || subj == ">" {
if subj == _EMPTY_ || subj == fwcs {
ss.Msgs, ss.First, ss.Last = ms.state.Msgs, ms.state.FirstSeq, ms.state.LastSeq
return ss
}
@@ -389,13 +389,28 @@ func (ms *memStore) expireMsgs() {
// PurgeEx will remove messages based on subject filters, sequence and number of messages to keep.
// Will return the number of purged messages.
func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint64, err error) {
if subject == _EMPTY_ || subject == ">" && keep == 0 {
return ms.Purge()
if subject == _EMPTY_ || subject == fwcs {
if keep == 0 && (sequence == 0 || sequence == 1) {
return ms.Purge()
}
if sequence > 1 {
return ms.Compact(sequence)
} else if keep > 0 {
ms.mu.RLock()
msgs, lseq := ms.state.Msgs, ms.state.LastSeq
ms.mu.RUnlock()
if keep >= msgs {
return 0, nil
}
return ms.Compact(lseq - keep + 1)
}
return 0, nil
}
eq := compareFn(subject)
if ss := ms.FilteredState(1, subject); ss.Msgs > 0 {
if keep > 0 {
if keep > ss.Msgs {
if keep >= ss.Msgs {
return 0, nil
}
ss.Msgs -= keep

View File

@@ -30,6 +30,7 @@ import (
const (
pwc = '*'
fwc = '>'
fwcs = ">"
tsep = "."
btsep = '.'
)