From 2e145196b89eecce02c4e8396e451f37e50bb0d1 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 22 Jun 2021 07:38:30 -0700 Subject: [PATCH] Fix for extended purge by sequence. Signed-off-by: Derek Collison --- server/filestore.go | 2 +- server/jetstream_cluster_test.go | 68 +++++++++++++++++++++++++++++++- server/memstore.go | 2 +- 3 files changed, 69 insertions(+), 3 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index cc9ca1c3..a458858d 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -3152,7 +3152,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint } last := ss.Last if sequence > 0 { - last = sequence + 1 + last = sequence - 1 } for seq := ss.First; seq <= last; seq++ { if sm, _ := fs.msgForSeq(seq); sm != nil && eq(sm.subj, subject) { diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index e930992f..6497fa69 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -7454,7 +7454,7 @@ func TestJetStreamClusterExtendedStreamPurge(t *testing.T) { purge(&JSApiStreamPurgeRequest{Subject: "kv.bar", Keep: 1}, 801) expectLeft("kv.bar", 1) - purge(&JSApiStreamPurgeRequest{Subject: "kv.baz", Sequence: 2850}, 751) + purge(&JSApiStreamPurgeRequest{Subject: "kv.baz", Sequence: 2851}, 751) expectLeft("kv.baz", 50) purge(&JSApiStreamPurgeRequest{Subject: "kv.*"}, 0) @@ -7503,7 +7503,73 @@ func TestJetStreamClusterExtendedStreamPurge(t *testing.T) { expectLeft(">", 11) }) } +} +func TestPurgeBySequence(t *testing.T) { + for _, st := range []StorageType{FileStorage, MemoryStorage} { + t.Run(st.String(), func(t *testing.T) { + + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + cfg := StreamConfig{ + Name: "KV", + Subjects: []string{"kv.*.*"}, + Storage: st, + Replicas: 2, + MaxMsgsPer: 5, + } + req, err := json.Marshal(cfg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) + for i := 0; i < 20; i++ { + if _, err = js.Publish("kv.myapp.username", []byte(fmt.Sprintf("value %d", i))); err != nil { + t.Fatalf("request failed: %s", err) + } + } + for i := 0; i < 20; i++ { + if _, err = js.Publish("kv.myapp.password", []byte(fmt.Sprintf("value %d", i))); err != nil { + t.Fatalf("request failed: %s", err) + } + } + expectSequences := func(t *testing.T, subject string, seq ...int) { + sub, err := js.SubscribeSync(subject) + if err != nil { + t.Fatalf("sub failed: %s", err) + } + defer sub.Unsubscribe() + for _, i := range seq { + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("didn't get message: %s", err) + } + meta, err := msg.Metadata() + if err != nil { + t.Fatalf("didn't get metadata: %s", err) + } + if meta.Sequence.Stream != uint64(i) { + t.Fatalf("expected sequence %d got %d", i, meta.Sequence.Stream) + } + } + } + expectSequences(t, "kv.myapp.username", 16, 17, 18, 19, 20) + expectSequences(t, "kv.myapp.password", 36, 37, 38, 39, 40) + + // delete up to but not including 18 of username... + jr, _ := json.Marshal(&JSApiStreamPurgeRequest{Subject: "kv.myapp.username", Sequence: 18}) + _, err = nc.Request(fmt.Sprintf(JSApiStreamPurgeT, "KV"), jr, time.Second) + if err != nil { + t.Fatalf("request failed: %s", err) + } + // 18 should still be there + expectSequences(t, "kv.myapp.username", 18, 19, 20) + }) + } } // Support functions diff --git a/server/memstore.go b/server/memstore.go index de0cd20c..632960b4 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -417,7 +417,7 @@ func (ms *memStore) PurgeEx(subject string, sequence, keep uint64) (purged uint6 } last := ss.Last if sequence > 0 { - last = sequence + 1 + last = sequence - 1 } ms.mu.Lock() for seq := ss.First; seq <= last; seq++ {