Merge pull request #1705 from nats-io/interest

Make interest notifications explicit match only
This commit is contained in:
Derek Collison
2020-11-12 08:29:28 -08:00
committed by GitHub
4 changed files with 100 additions and 60 deletions

View File

@@ -1651,7 +1651,7 @@ func (o *Consumer) didNotDeliver(seq uint64) {
if o.isPushMode() {
o.active = false
} else if o.pending != nil {
// push mode and we have pending.
// pull mode and we have pending.
if _, ok := o.pending[seq]; ok {
// We found this messsage on pending, we need
// to queue it up for immediate redelivery since

View File

@@ -138,9 +138,10 @@ func (s *Sublist) CacheEnabled() bool {
// RegisterNotification will register for notifications when interest for the given
// subject changes. The subject must be a literal publish type subject. The
// notification is true for when the first interest for a subject is inserted,
// and false when all interest in the subject is removed. The sublist will not
// block when trying to send the notification. Its up to the caller to make sure
// the channel send will not block.
// and false when all interest in the subject is removed. Note that this interest
// needs to be exact and that wildcards will not trigger the notifications. The sublist
// will not block when trying to send the notification. Its up to the caller to make
// sure the channel send will not block.
func (s *Sublist) RegisterNotification(subject string, notify chan<- bool) error {
if subjectHasWildcard(subject) {
return ErrInvalidSubject
@@ -149,8 +150,24 @@ func (s *Sublist) RegisterNotification(subject string, notify chan<- bool) error
return ErrNilChan
}
var hasInterest bool
r := s.Match(subject)
hasInterest := len(r.psubs)+len(r.qsubs) > 0
if len(r.psubs)+len(r.qsubs) > 0 {
for _, sub := range r.psubs {
if string(sub.subject) == subject {
hasInterest = true
break
}
}
for _, qsub := range r.qsubs {
qs := qsub[0]
if string(qs.subject) == subject {
hasInterest = true
break
}
}
}
s.Lock()
if s.notify == nil {
@@ -243,52 +260,48 @@ func (s *Sublist) addNotify(m map[string][]chan<- bool, subject string, notify c
// chkForInsertNotification will check to see if we need to notify on this subject.
// Write lock should be held.
func (s *Sublist) chkForInsertNotification(subject string, isLiteral bool) {
// If we are a literal, all notify subjects are also literal so just do a
// hash lookup here.
if isLiteral {
chs := s.notify.insert[subject]
if len(chs) > 0 {
for _, ch := range chs {
sendNotification(ch, true)
}
// Move from the insert map to the remove map.
s.notify.remove[subject] = append(s.notify.remove[subject], chs...)
delete(s.notify.insert, subject)
}
return
}
// We are not a literal, so we may match any subject that we want.
// Note we could be smarter here and try to make the list smaller, but probably not worth it TBH.
for target, chs := range s.notify.insert {
r := s.matchNoLock(target)
if len(r.psubs)+len(r.qsubs) > 0 {
for _, ch := range chs {
sendNotification(ch, true)
}
// Move from the insert map to the remove map.
s.notify.remove[target] = append(s.notify.remove[target], chs...)
delete(s.notify.insert, target)
break
func (s *Sublist) chkForInsertNotification(subject string) {
// All notify subjects are also literal so just do a hash lookup here.
if chs := s.notify.insert[subject]; len(chs) > 0 {
for _, ch := range chs {
sendNotification(ch, true)
}
// Move from the insert map to the remove map.
s.notify.remove[subject] = append(s.notify.remove[subject], chs...)
delete(s.notify.insert, subject)
}
}
// chkForRemoveNotification will check to see if we need to notify on this subject.
// Write lock should be held.
func (s *Sublist) chkForRemoveNotification(subject string, isLiteral bool) {
for target, chs := range s.notify.remove {
func (s *Sublist) chkForRemoveNotification(subject string) {
if chs := s.notify.remove[subject]; len(chs) > 0 {
// We need to always check that we have no interest anymore.
r := s.matchNoLock(target)
if len(r.psubs)+len(r.qsubs) == 0 {
var hasInterest bool
r := s.matchNoLock(subject)
if len(r.psubs)+len(r.qsubs) > 0 {
for _, sub := range r.psubs {
if string(sub.subject) == subject {
hasInterest = true
break
}
}
for _, qsub := range r.qsubs {
qs := qsub[0]
if string(qs.subject) == subject {
hasInterest = true
break
}
}
}
if !hasInterest {
for _, ch := range chs {
sendNotification(ch, false)
}
// Move from the remove map to the insert map.
s.notify.insert[target] = append(s.notify.insert[target], chs...)
delete(s.notify.remove, target)
break
s.notify.insert[subject] = append(s.notify.insert[subject], chs...)
delete(s.notify.remove, subject)
}
}
}
@@ -387,8 +400,8 @@ func (s *Sublist) Insert(sub *subscription) error {
s.addToCache(subject, sub)
atomic.AddUint64(&s.genid, 1)
if s.notify != nil && isnew && len(s.notify.insert) > 0 {
s.chkForInsertNotification(subject, !haswc)
if s.notify != nil && isnew && !haswc && len(s.notify.insert) > 0 {
s.chkForInsertNotification(subject)
}
s.Unlock()
@@ -728,8 +741,8 @@ func (s *Sublist) remove(sub *subscription, shouldLock bool, doCacheUpdates bool
atomic.AddUint64(&s.genid, 1)
}
if s.notify != nil && last && len(s.notify.remove) > 0 {
s.chkForRemoveNotification(subject, !haswc)
if s.notify != nil && last && !haswc && len(s.notify.remove) > 0 {
s.chkForRemoveNotification(subject)
}
return nil

View File

@@ -1085,7 +1085,7 @@ func TestSublistRegisterInterestNotification(t *testing.T) {
t.Fatalf("Expected to return false on non-existent notification entry")
}
// This should work.
// This should work properly.
if err := s.RegisterNotification("foo", ch); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -1176,27 +1176,28 @@ func TestSublistRegisterInterestNotification(t *testing.T) {
}
// Let's do some wildcard checks.
// Wildcards will not trigger interest.
subpwc := newSub("*")
s.Insert(subpwc)
expectNone()
if err := s.RegisterNotification("foo", ch); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
expectFalse()
subpwc := newSub("*")
s.Insert(subpwc)
s.Insert(sub)
expectTrue()
s.Insert(sub)
expectNone()
s.Remove(sub)
expectNone()
expectFalse()
s.Remove(subpwc)
expectFalse()
expectNone()
subfwc := newSub(">")
s.Insert(subfwc)
expectTrue()
expectNone()
s.Insert(subpwc)
expectNone()
@@ -1205,7 +1206,7 @@ func TestSublistRegisterInterestNotification(t *testing.T) {
expectNone()
s.Remove(subfwc)
expectFalse()
expectNone()
// Test batch
subs := []*subscription{sub, sub2, sub3, sub4, subpwc, subfwc}
@@ -1218,6 +1219,29 @@ func TestSublistRegisterInterestNotification(t *testing.T) {
expectOne()
expectFalse()
// Test queue subs
qsub := newQSub("foo.bar.baz", "1")
s.Insert(qsub)
expectNone()
if err := s.RegisterNotification("foo.bar.baz", ch); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
expectTrue()
wcqsub := newQSub("foo.bar.>", "1")
s.Insert(wcqsub)
expectNone()
s.Remove(qsub)
expectFalse()
s.Remove(wcqsub)
expectNone()
s.Insert(wcqsub)
expectNone()
// Test non-blocking notifications.
if err := s.RegisterNotification("bar", ch); err != nil {
t.Fatalf("Unexpected error: %v", err)

View File

@@ -7480,13 +7480,14 @@ func TestJetStreamAPIConsumerListPaging(t *testing.T) {
nc := clientConnectToServer(t, s)
defer nc.Close()
sub, _ := nc.SubscribeSync("d.*")
defer sub.Unsubscribe()
nc.Flush()
consumersNum := server.JSApiNamesLimit
for i := 1; i <= consumersNum; i++ {
_, err := mset.AddConsumer(&server.ConsumerConfig{DeliverSubject: fmt.Sprintf("d.%d", i)})
dsubj := fmt.Sprintf("d.%d", i)
sub, _ := nc.SubscribeSync(dsubj)
defer sub.Unsubscribe()
nc.Flush()
_, err := mset.AddConsumer(&server.ConsumerConfig{DeliverSubject: dsubj})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -8905,6 +8906,7 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) {
sub2, _ := nc2.SubscribeSync(nats.NewInbox())
defer sub2.Unsubscribe()
nc2.Flush()
o2, err := mset.AddConsumer(&server.ConsumerConfig{
Durable: "dur2",
@@ -8945,6 +8947,7 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) {
// Now close the 2nd subscription...
sub2.Unsubscribe()
nc2.Flush()
// Send 2 more new messages
for i := 0; i < toSend; i++ {
@@ -8959,7 +8962,7 @@ func TestJetStreamAckExplicitMsgRemoval(t *testing.T) {
for i := 0; i < toSend; i++ {
m, err := sub1.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error acking message: %v", err)
t.Fatalf("Error getting message to ack: %v", err)
}
m.Respond(nil)
}