[FIXED] JetStream: Restarted queue subscriptions may not receive msgs

The server was not properly handling queue subscriptions internal
notifications which could lead to messages not being delivered
to applications using a queue group to consume from a JetStream
consumer after they were restarted.

Resolves #1066

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2022-09-05 13:30:26 -06:00
parent 434bc34713
commit 16eb2af0ec
2 changed files with 51 additions and 4 deletions

View File

@@ -427,7 +427,6 @@ func (s *Sublist) Insert(sub *subscription) error {
} else {
if n.qsubs == nil {
n.qsubs = make(map[string]map[*subscription]*subscription)
isnew = true
}
qname := string(sub.queue)
// This is a queue subscription
@@ -435,6 +434,7 @@ func (s *Sublist) Insert(sub *subscription) error {
if !ok {
subs = make(map[*subscription]*subscription)
n.qsubs[qname] = subs
isnew = true
}
subs[sub] = sub
}
@@ -892,8 +892,10 @@ func (s *Sublist) removeFromNode(n *node, sub *subscription) (found, last bool)
_, found = qsub[sub]
delete(qsub, sub)
if len(qsub) == 0 {
// This is the last queue subscription interest when len(qsub) == 0, not
// when n.qsubs is empty.
last = true
delete(n.qsubs, string(sub.queue))
last = len(n.qsubs) == 0
}
return found, last
}

View File

@@ -1125,7 +1125,7 @@ func TestSublistRegisterInterestNotification(t *testing.T) {
}
tt := time.NewTimer(time.Second)
expectBool := func(b bool) {
expectBoolWithCh := func(ch chan bool, b bool) {
t.Helper()
tt.Reset(time.Second)
defer tt.Stop()
@@ -1138,6 +1138,10 @@ func TestSublistRegisterInterestNotification(t *testing.T) {
t.Fatalf("Timeout waiting for expected value")
}
}
expectBool := func(b bool) {
t.Helper()
expectBoolWithCh(ch, b)
}
expectFalse := func() {
t.Helper()
expectBool(false)
@@ -1152,12 +1156,16 @@ func TestSublistRegisterInterestNotification(t *testing.T) {
t.Fatalf("Expected no notifications, had %d and first was %v", lch, <-ch)
}
}
expectOne := func() {
expectOneWithCh := func(ch chan bool) {
t.Helper()
if len(ch) != 1 {
t.Fatalf("Expected 1 notification")
}
}
expectOne := func() {
t.Helper()
expectOneWithCh(ch)
}
expectOne()
expectFalse()
@@ -1348,6 +1356,43 @@ func TestSublistRegisterInterestNotification(t *testing.T) {
t.Fatalf("Expected to return true")
}
if err := s.RegisterQueueNotification("some.subject", "queue1", ch); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
expectOne()
expectFalse()
qsub1 = newQSub("some.subject", "queue1")
s.Insert(qsub1)
expectTrue()
// Create a second channel for this other queue
ch2 := make(chan bool, 1)
if err := s.RegisterQueueNotification("some.subject", "queue2", ch2); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
expectOneWithCh(ch2)
expectBoolWithCh(ch2, false)
qsub2 = newQSub("some.subject", "queue2")
s.Insert(qsub2)
expectBoolWithCh(ch2, true)
// But we should not get notification on queue1
expectNone()
s.Remove(qsub1)
expectFalse()
s.Remove(qsub2)
expectBoolWithCh(ch2, false)
if !s.ClearQueueNotification("some.subject", "queue1", ch) {
t.Fatalf("Expected to return true")
}
if !s.ClearQueueNotification("some.subject", "queue2", ch2) {
t.Fatalf("Expected to return true")
}
// Test non-blocking notifications.
if err := s.RegisterNotification("bar", ch); err != nil {
t.Fatalf("Unexpected error: %v", err)