From df2147bc7c70d3ea7801383de2994824b591320a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 4 Oct 2021 09:00:18 -0700 Subject: [PATCH] Fix for rollups and filtered purge Signed-off-by: Derek Collison --- server/jetstream_cluster_test.go | 68 ++++++++++++++++++++++++++++++++ server/stream.go | 13 +++++- 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 2c57a728..8de8a98c 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -9084,6 +9084,74 @@ func TestJetStreamRollups(t *testing.T) { } } +func TestJetStreamRollupSubjectAndWatchers(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + cfg := &nats.StreamConfig{ + Name: "KVW", + Subjects: []string{"kv.*"}, + MaxMsgsPerSubject: 10, + Replicas: 2, + } + if _, err := js.AddStream(cfg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sub, err := js.SubscribeSync("kv.*") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + send := func(key, value string) { + t.Helper() + _, err := js.Publish("kv."+key, []byte(value)) + require_NoError(t, err) + } + + rollup := func(key, value string) { + t.Helper() + m := nats.NewMsg("kv." + key) + m.Data = []byte(value) + m.Header.Set(JSMsgRollup, JSMsgRollupSubject) + _, err := js.PublishMsg(m) + require_NoError(t, err) + } + + expectUpdate := func(key, value string, seq uint64) { + t.Helper() + m, err := sub.NextMsg(time.Second) + require_NoError(t, err) + if m.Subject != "kv."+key { + t.Fatalf("Keys don't match: %q vs %q", m.Subject[3:], key) + } + if string(m.Data) != value { + t.Fatalf("Values don't match: %q vs %q", m.Data, value) + } + meta, err := m.Metadata() + require_NoError(t, err) + if meta.Sequence.Consumer != seq { + t.Fatalf("Sequences don't match: %v vs %v", meta.Sequence.Consumer, value) + } + } + + send("name", "derek") + expectUpdate("name", "derek", 1) + send("age", "22") + expectUpdate("age", "22", 2) + send("age", "33") + expectUpdate("age", "33", 3) + send("name", "ivan") + expectUpdate("name", "ivan", 4) + send("name", "rip") + expectUpdate("name", "rip", 5) + rollup("age", "50") + expectUpdate("age", "50", 6) +} + // Support functions // Used to setup superclusters for tests. diff --git a/server/stream.go b/server/stream.go index 21a0fe76..0712cefb 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1012,6 +1012,9 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err var _obs [4]*consumer obs := _obs[:0] for _, o := range mset.consumers { + if preq != nil && !o.isFilteredMatch(preq.Subject) { + continue + } obs = append(obs, o) } mset.mu.Unlock() @@ -1028,8 +1031,16 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err // Purge consumers. var state StreamState mset.store.FastState(&state) + fseq := state.FirstSeq + + // Check for filtered purge. + if preq != nil && preq.Subject != _EMPTY_ { + ss := mset.store.FilteredState(state.FirstSeq, preq.Subject) + fseq = ss.First + } + for _, o := range obs { - o.purge(state.FirstSeq) + o.purge(fseq) } return purged, nil }