mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 02:07:59 -07:00
Fix for rollups and filtered purge
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -9084,6 +9084,74 @@ func TestJetStreamRollups(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJetStreamRollupSubjectAndWatchers(t *testing.T) {
|
||||
c := createJetStreamClusterExplicit(t, "JSC", 3)
|
||||
defer c.shutdown()
|
||||
|
||||
nc, js := jsClientConnect(t, c.randomServer())
|
||||
defer nc.Close()
|
||||
|
||||
cfg := &nats.StreamConfig{
|
||||
Name: "KVW",
|
||||
Subjects: []string{"kv.*"},
|
||||
MaxMsgsPerSubject: 10,
|
||||
Replicas: 2,
|
||||
}
|
||||
if _, err := js.AddStream(cfg); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
sub, err := js.SubscribeSync("kv.*")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
send := func(key, value string) {
|
||||
t.Helper()
|
||||
_, err := js.Publish("kv."+key, []byte(value))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
rollup := func(key, value string) {
|
||||
t.Helper()
|
||||
m := nats.NewMsg("kv." + key)
|
||||
m.Data = []byte(value)
|
||||
m.Header.Set(JSMsgRollup, JSMsgRollupSubject)
|
||||
_, err := js.PublishMsg(m)
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
expectUpdate := func(key, value string, seq uint64) {
|
||||
t.Helper()
|
||||
m, err := sub.NextMsg(time.Second)
|
||||
require_NoError(t, err)
|
||||
if m.Subject != "kv."+key {
|
||||
t.Fatalf("Keys don't match: %q vs %q", m.Subject[3:], key)
|
||||
}
|
||||
if string(m.Data) != value {
|
||||
t.Fatalf("Values don't match: %q vs %q", m.Data, value)
|
||||
}
|
||||
meta, err := m.Metadata()
|
||||
require_NoError(t, err)
|
||||
if meta.Sequence.Consumer != seq {
|
||||
t.Fatalf("Sequences don't match: %v vs %v", meta.Sequence.Consumer, value)
|
||||
}
|
||||
}
|
||||
|
||||
send("name", "derek")
|
||||
expectUpdate("name", "derek", 1)
|
||||
send("age", "22")
|
||||
expectUpdate("age", "22", 2)
|
||||
send("age", "33")
|
||||
expectUpdate("age", "33", 3)
|
||||
send("name", "ivan")
|
||||
expectUpdate("name", "ivan", 4)
|
||||
send("name", "rip")
|
||||
expectUpdate("name", "rip", 5)
|
||||
rollup("age", "50")
|
||||
expectUpdate("age", "50", 6)
|
||||
}
|
||||
|
||||
// Support functions
|
||||
|
||||
// Used to setup superclusters for tests.
|
||||
|
||||
@@ -1012,6 +1012,9 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
|
||||
var _obs [4]*consumer
|
||||
obs := _obs[:0]
|
||||
for _, o := range mset.consumers {
|
||||
if preq != nil && !o.isFilteredMatch(preq.Subject) {
|
||||
continue
|
||||
}
|
||||
obs = append(obs, o)
|
||||
}
|
||||
mset.mu.Unlock()
|
||||
@@ -1028,8 +1031,16 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
|
||||
// Purge consumers.
|
||||
var state StreamState
|
||||
mset.store.FastState(&state)
|
||||
fseq := state.FirstSeq
|
||||
|
||||
// Check for filtered purge.
|
||||
if preq != nil && preq.Subject != _EMPTY_ {
|
||||
ss := mset.store.FilteredState(state.FirstSeq, preq.Subject)
|
||||
fseq = ss.First
|
||||
}
|
||||
|
||||
for _, o := range obs {
|
||||
o.purge(state.FirstSeq)
|
||||
o.purge(fseq)
|
||||
}
|
||||
return purged, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user