diff --git a/server/consumer.go b/server/consumer.go index c04b9a8b..26c6d983 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1651,7 +1651,7 @@ func (o *Consumer) didNotDeliver(seq uint64) { if o.isPushMode() { o.active = false } else if o.pending != nil { - // push mode and we have pending. + // pull mode and we have pending. if _, ok := o.pending[seq]; ok { // We found this messsage on pending, we need // to queue it up for immediate redelivery since diff --git a/server/sublist.go b/server/sublist.go index 7ae9a3b8..37ea486e 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -138,9 +138,10 @@ func (s *Sublist) CacheEnabled() bool { // RegisterNotification will register for notifications when interest for the given // subject changes. The subject must be a literal publish type subject. The // notification is true for when the first interest for a subject is inserted, -// and false when all interest in the subject is removed. The sublist will not -// block when trying to send the notification. Its up to the caller to make sure -// the channel send will not block. +// and false when all interest in the subject is removed. Note that this interest +// needs to be exact and that wildcards will not trigger the notifications. The sublist +// will not block when trying to send the notification. Its up to the caller to make +// sure the channel send will not block. func (s *Sublist) RegisterNotification(subject string, notify chan<- bool) error { if subjectHasWildcard(subject) { return ErrInvalidSubject @@ -149,8 +150,24 @@ func (s *Sublist) RegisterNotification(subject string, notify chan<- bool) error return ErrNilChan } + var hasInterest bool r := s.Match(subject) - hasInterest := len(r.psubs)+len(r.qsubs) > 0 + + if len(r.psubs)+len(r.qsubs) > 0 { + for _, sub := range r.psubs { + if string(sub.subject) == subject { + hasInterest = true + break + } + } + for _, qsub := range r.qsubs { + qs := qsub[0] + if string(qs.subject) == subject { + hasInterest = true + break + } + } + } s.Lock() if s.notify == nil { @@ -243,52 +260,48 @@ func (s *Sublist) addNotify(m map[string][]chan<- bool, subject string, notify c // chkForInsertNotification will check to see if we need to notify on this subject. // Write lock should be held. -func (s *Sublist) chkForInsertNotification(subject string, isLiteral bool) { - // If we are a literal, all notify subjects are also literal so just do a - // hash lookup here. - if isLiteral { - chs := s.notify.insert[subject] - if len(chs) > 0 { - for _, ch := range chs { - sendNotification(ch, true) - } - // Move from the insert map to the remove map. - s.notify.remove[subject] = append(s.notify.remove[subject], chs...) - delete(s.notify.insert, subject) - } - return - } - - // We are not a literal, so we may match any subject that we want. - // Note we could be smarter here and try to make the list smaller, but probably not worth it TBH. - for target, chs := range s.notify.insert { - r := s.matchNoLock(target) - if len(r.psubs)+len(r.qsubs) > 0 { - for _, ch := range chs { - sendNotification(ch, true) - } - // Move from the insert map to the remove map. - s.notify.remove[target] = append(s.notify.remove[target], chs...) - delete(s.notify.insert, target) - break +func (s *Sublist) chkForInsertNotification(subject string) { + // All notify subjects are also literal so just do a hash lookup here. + if chs := s.notify.insert[subject]; len(chs) > 0 { + for _, ch := range chs { + sendNotification(ch, true) } + // Move from the insert map to the remove map. + s.notify.remove[subject] = append(s.notify.remove[subject], chs...) + delete(s.notify.insert, subject) } } // chkForRemoveNotification will check to see if we need to notify on this subject. // Write lock should be held. -func (s *Sublist) chkForRemoveNotification(subject string, isLiteral bool) { - for target, chs := range s.notify.remove { +func (s *Sublist) chkForRemoveNotification(subject string) { + if chs := s.notify.remove[subject]; len(chs) > 0 { // We need to always check that we have no interest anymore. - r := s.matchNoLock(target) - if len(r.psubs)+len(r.qsubs) == 0 { + var hasInterest bool + r := s.matchNoLock(subject) + + if len(r.psubs)+len(r.qsubs) > 0 { + for _, sub := range r.psubs { + if string(sub.subject) == subject { + hasInterest = true + break + } + } + for _, qsub := range r.qsubs { + qs := qsub[0] + if string(qs.subject) == subject { + hasInterest = true + break + } + } + } + if !hasInterest { for _, ch := range chs { sendNotification(ch, false) } // Move from the remove map to the insert map. - s.notify.insert[target] = append(s.notify.insert[target], chs...) - delete(s.notify.remove, target) - break + s.notify.insert[subject] = append(s.notify.insert[subject], chs...) + delete(s.notify.remove, subject) } } } @@ -387,8 +400,8 @@ func (s *Sublist) Insert(sub *subscription) error { s.addToCache(subject, sub) atomic.AddUint64(&s.genid, 1) - if s.notify != nil && isnew && len(s.notify.insert) > 0 { - s.chkForInsertNotification(subject, !haswc) + if s.notify != nil && isnew && !haswc && len(s.notify.insert) > 0 { + s.chkForInsertNotification(subject) } s.Unlock() @@ -728,8 +741,8 @@ func (s *Sublist) remove(sub *subscription, shouldLock bool, doCacheUpdates bool atomic.AddUint64(&s.genid, 1) } - if s.notify != nil && last && len(s.notify.remove) > 0 { - s.chkForRemoveNotification(subject, !haswc) + if s.notify != nil && last && !haswc && len(s.notify.remove) > 0 { + s.chkForRemoveNotification(subject) } return nil diff --git a/server/sublist_test.go b/server/sublist_test.go index 4920fc80..43a2ae84 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -1085,7 +1085,7 @@ func TestSublistRegisterInterestNotification(t *testing.T) { t.Fatalf("Expected to return false on non-existent notification entry") } - // This should work. + // This should work properly. if err := s.RegisterNotification("foo", ch); err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1176,27 +1176,28 @@ func TestSublistRegisterInterestNotification(t *testing.T) { } // Let's do some wildcard checks. + // Wildcards will not trigger interest. + subpwc := newSub("*") + s.Insert(subpwc) + expectNone() + if err := s.RegisterNotification("foo", ch); err != nil { t.Fatalf("Unexpected error: %v", err) } expectFalse() - subpwc := newSub("*") - s.Insert(subpwc) + s.Insert(sub) expectTrue() - s.Insert(sub) - expectNone() - s.Remove(sub) - expectNone() + expectFalse() s.Remove(subpwc) - expectFalse() + expectNone() subfwc := newSub(">") s.Insert(subfwc) - expectTrue() + expectNone() s.Insert(subpwc) expectNone() @@ -1205,7 +1206,7 @@ func TestSublistRegisterInterestNotification(t *testing.T) { expectNone() s.Remove(subfwc) - expectFalse() + expectNone() // Test batch subs := []*subscription{sub, sub2, sub3, sub4, subpwc, subfwc} @@ -1218,6 +1219,29 @@ func TestSublistRegisterInterestNotification(t *testing.T) { expectOne() expectFalse() + // Test queue subs + qsub := newQSub("foo.bar.baz", "1") + s.Insert(qsub) + expectNone() + + if err := s.RegisterNotification("foo.bar.baz", ch); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + expectTrue() + + wcqsub := newQSub("foo.bar.>", "1") + s.Insert(wcqsub) + expectNone() + + s.Remove(qsub) + expectFalse() + + s.Remove(wcqsub) + expectNone() + + s.Insert(wcqsub) + expectNone() + // Test non-blocking notifications. if err := s.RegisterNotification("bar", ch); err != nil { t.Fatalf("Unexpected error: %v", err) diff --git a/test/jetstream_test.go b/test/jetstream_test.go index 4ba362e3..176e8bdd 100644 --- a/test/jetstream_test.go +++ b/test/jetstream_test.go @@ -7480,13 +7480,14 @@ func TestJetStreamAPIConsumerListPaging(t *testing.T) { nc := clientConnectToServer(t, s) defer nc.Close() - sub, _ := nc.SubscribeSync("d.*") - defer sub.Unsubscribe() - nc.Flush() - consumersNum := server.JSApiNamesLimit for i := 1; i <= consumersNum; i++ { - _, err := mset.AddConsumer(&server.ConsumerConfig{DeliverSubject: fmt.Sprintf("d.%d", i)}) + dsubj := fmt.Sprintf("d.%d", i) + sub, _ := nc.SubscribeSync(dsubj) + defer sub.Unsubscribe() + nc.Flush() + + _, err := mset.AddConsumer(&server.ConsumerConfig{DeliverSubject: dsubj}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -8905,6 +8906,7 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { sub2, _ := nc2.SubscribeSync(nats.NewInbox()) defer sub2.Unsubscribe() + nc2.Flush() o2, err := mset.AddConsumer(&server.ConsumerConfig{ Durable: "dur2", @@ -8945,6 +8947,7 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { // Now close the 2nd subscription... sub2.Unsubscribe() + nc2.Flush() // Send 2 more new messages for i := 0; i < toSend; i++ { @@ -8959,7 +8962,7 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) { for i := 0; i < toSend; i++ { m, err := sub1.NextMsg(time.Second) if err != nil { - t.Fatalf("Error acking message: %v", err) + t.Fatalf("Error getting message to ack: %v", err) } m.Respond(nil) }