diff --git a/server/sublist.go b/server/sublist.go index bb204ad5..ef0b283b 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -427,7 +427,6 @@ func (s *Sublist) Insert(sub *subscription) error { } else { if n.qsubs == nil { n.qsubs = make(map[string]map[*subscription]*subscription) - isnew = true } qname := string(sub.queue) // This is a queue subscription @@ -435,6 +434,7 @@ func (s *Sublist) Insert(sub *subscription) error { if !ok { subs = make(map[*subscription]*subscription) n.qsubs[qname] = subs + isnew = true } subs[sub] = sub } @@ -892,8 +892,10 @@ func (s *Sublist) removeFromNode(n *node, sub *subscription) (found, last bool) _, found = qsub[sub] delete(qsub, sub) if len(qsub) == 0 { + // This is the last queue subscription interest when len(qsub) == 0, not + // when n.qsubs is empty. + last = true delete(n.qsubs, string(sub.queue)) - last = len(n.qsubs) == 0 } return found, last } diff --git a/server/sublist_test.go b/server/sublist_test.go index 668fcaaa..ed954a78 100644 --- a/server/sublist_test.go +++ b/server/sublist_test.go @@ -1125,7 +1125,7 @@ func TestSublistRegisterInterestNotification(t *testing.T) { } tt := time.NewTimer(time.Second) - expectBool := func(b bool) { + expectBoolWithCh := func(ch chan bool, b bool) { t.Helper() tt.Reset(time.Second) defer tt.Stop() @@ -1138,6 +1138,10 @@ func TestSublistRegisterInterestNotification(t *testing.T) { t.Fatalf("Timeout waiting for expected value") } } + expectBool := func(b bool) { + t.Helper() + expectBoolWithCh(ch, b) + } expectFalse := func() { t.Helper() expectBool(false) @@ -1152,12 +1156,16 @@ func TestSublistRegisterInterestNotification(t *testing.T) { t.Fatalf("Expected no notifications, had %d and first was %v", lch, <-ch) } } - expectOne := func() { + expectOneWithCh := func(ch chan bool) { t.Helper() if len(ch) != 1 { t.Fatalf("Expected 1 notification") } } + expectOne := func() { + t.Helper() + expectOneWithCh(ch) + } expectOne() expectFalse() @@ -1348,6 +1356,43 @@ func TestSublistRegisterInterestNotification(t *testing.T) { t.Fatalf("Expected to return true") } + if err := s.RegisterQueueNotification("some.subject", "queue1", ch); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + expectOne() + expectFalse() + + qsub1 = newQSub("some.subject", "queue1") + s.Insert(qsub1) + expectTrue() + + // Create a second channel for this other queue + ch2 := make(chan bool, 1) + if err := s.RegisterQueueNotification("some.subject", "queue2", ch2); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + expectOneWithCh(ch2) + expectBoolWithCh(ch2, false) + + qsub2 = newQSub("some.subject", "queue2") + s.Insert(qsub2) + expectBoolWithCh(ch2, true) + + // But we should not get notification on queue1 + expectNone() + + s.Remove(qsub1) + expectFalse() + s.Remove(qsub2) + expectBoolWithCh(ch2, false) + + if !s.ClearQueueNotification("some.subject", "queue1", ch) { + t.Fatalf("Expected to return true") + } + if !s.ClearQueueNotification("some.subject", "queue2", ch2) { + t.Fatalf("Expected to return true") + } + // Test non-blocking notifications. if err := s.RegisterNotification("bar", ch); err != nil { t.Fatalf("Unexpected error: %v", err)