Fix for extended purge by sequence.

Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
Derek Collison
2021-06-22 07:38:30 -07:00
parent 4d76a0d55c
commit 2e145196b8
3 changed files with 69 additions and 3 deletions

View File

@@ -3152,7 +3152,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
}
last := ss.Last
if sequence > 0 {
last = sequence + 1
last = sequence - 1
}
for seq := ss.First; seq <= last; seq++ {
if sm, _ := fs.msgForSeq(seq); sm != nil && eq(sm.subj, subject) {

View File

@@ -7454,7 +7454,7 @@ func TestJetStreamClusterExtendedStreamPurge(t *testing.T) {
purge(&JSApiStreamPurgeRequest{Subject: "kv.bar", Keep: 1}, 801)
expectLeft("kv.bar", 1)
purge(&JSApiStreamPurgeRequest{Subject: "kv.baz", Sequence: 2850}, 751)
purge(&JSApiStreamPurgeRequest{Subject: "kv.baz", Sequence: 2851}, 751)
expectLeft("kv.baz", 50)
purge(&JSApiStreamPurgeRequest{Subject: "kv.*"}, 0)
@@ -7503,7 +7503,73 @@ func TestJetStreamClusterExtendedStreamPurge(t *testing.T) {
expectLeft(">", 11)
})
}
}
func TestPurgeBySequence(t *testing.T) {
for _, st := range []StorageType{FileStorage, MemoryStorage} {
t.Run(st.String(), func(t *testing.T) {
c := createJetStreamClusterExplicit(t, "JSC", 3)
defer c.shutdown()
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()
cfg := StreamConfig{
Name: "KV",
Subjects: []string{"kv.*.*"},
Storage: st,
Replicas: 2,
MaxMsgsPer: 5,
}
req, err := json.Marshal(cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second)
for i := 0; i < 20; i++ {
if _, err = js.Publish("kv.myapp.username", []byte(fmt.Sprintf("value %d", i))); err != nil {
t.Fatalf("request failed: %s", err)
}
}
for i := 0; i < 20; i++ {
if _, err = js.Publish("kv.myapp.password", []byte(fmt.Sprintf("value %d", i))); err != nil {
t.Fatalf("request failed: %s", err)
}
}
expectSequences := func(t *testing.T, subject string, seq ...int) {
sub, err := js.SubscribeSync(subject)
if err != nil {
t.Fatalf("sub failed: %s", err)
}
defer sub.Unsubscribe()
for _, i := range seq {
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("didn't get message: %s", err)
}
meta, err := msg.Metadata()
if err != nil {
t.Fatalf("didn't get metadata: %s", err)
}
if meta.Sequence.Stream != uint64(i) {
t.Fatalf("expected sequence %d got %d", i, meta.Sequence.Stream)
}
}
}
expectSequences(t, "kv.myapp.username", 16, 17, 18, 19, 20)
expectSequences(t, "kv.myapp.password", 36, 37, 38, 39, 40)
// delete up to but not including 18 of username...
jr, _ := json.Marshal(&JSApiStreamPurgeRequest{Subject: "kv.myapp.username", Sequence: 18})
_, err = nc.Request(fmt.Sprintf(JSApiStreamPurgeT, "KV"), jr, time.Second)
if err != nil {
t.Fatalf("request failed: %s", err)
}
// 18 should still be there
expectSequences(t, "kv.myapp.username", 18, 19, 20)
})
}
}
// Support functions

View File

@@ -417,7 +417,7 @@ func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint6
}
last := ss.Last
if sequence > 0 {
last = sequence + 1
last = sequence - 1
}
ms.mu.Lock()
for seq := ss.First; seq <= last; seq++ {