Merge pull request #2464 from nats-io/consumer-pending

Consumer num pending fixes for multiple matches and merging.
This commit is contained in:
Derek Collison
2021-08-24 08:17:19 -07:00
committed by GitHub
3 changed files with 78 additions and 8 deletions

View File

@@ -1270,13 +1270,15 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
for _, mb := range fs.blks {
mb.mu.RLock()
for subj, ss := range mb.fss {
oss := fss[subj]
if oss.First == 0 { // New
fss[subj] = *ss
} else {
// Merge here.
oss.Last, oss.Msgs = ss.Last, oss.Msgs+ss.Msgs
fss[subj] = oss
if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) {
oss := fss[subj]
if oss.First == 0 { // New
fss[subj] = *ss
} else {
// Merge here.
oss.Last, oss.Msgs = ss.Last, oss.Msgs+ss.Msgs
fss[subj] = oss
}
}
}
mb.mu.RUnlock()

View File

@@ -12910,6 +12910,67 @@ func TestJetStreamLongStreamNamesAndPubAck(t *testing.T) {
js.Publish("foo", []byte("HELLO"))
}
func TestJetStreamPerSubjectPending(t *testing.T) {
for _, st := range []nats.StorageType{nats.FileStorage, nats.MemoryStorage} {
t.Run(st.String(), func(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
config := s.JetStreamConfig()
if config != nil {
defer removeDir(t, config.StoreDir)
}
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "KV_X",
Subjects: []string{"$KV.X.>"},
MaxMsgsPerSubject: 5,
Storage: st,
})
if err != nil {
t.Fatalf("add stream failed: %s", err)
}
// the message we will care for
_, err = js.Publish("$KV.X.x.y.z", []byte("hello world"))
if err != nil {
t.Fatalf("publish failed: %s", err)
}
// make sure there's some unrelated message after
_, err = js.Publish("$KV.X.1", []byte("hello world"))
if err != nil {
t.Fatalf("publish failed: %s", err)
}
// we expect the wildcard filter subject to match only the one message and so pending will be 0
sub, err := js.SubscribeSync("$KV.X.x.>", nats.DeliverLastPerSubject())
if err != nil {
t.Fatalf("subscribe failed: %s", err)
}
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("next failed: %s", err)
}
meta, err := msg.Metadata()
if err != nil {
t.Fatalf("meta failed: %s", err)
}
// with DeliverLastPerSubject set this is never 0, but without setting that its 0 correctly
if meta.NumPending != 0 {
t.Fatalf("expected numpending 0 got %d", meta.NumPending)
}
})
}
}
///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////

View File

@@ -328,7 +328,14 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
fss := make(map[string]SimpleState)
for subj, ss := range ms.fss {
if subject == _EMPTY_ || subject == fwcs || subjectIsSubsetMatch(subj, subject) {
fss[subj] = *ss
oss := fss[subj]
if oss.First == 0 { // New
fss[subj] = *ss
} else {
// Merge here.
oss.Last, oss.Msgs = ss.Last, oss.Msgs+ss.Msgs
fss[subj] = oss
}
}
}
return fss