Merge pull request #2750 from nats-io/fix_2745

[FIXED] JetStream: interest across gateways
This commit is contained in:
Ivan Kozlovic
2021-12-20 09:45:50 -07:00
committed by GitHub
2 changed files with 60 additions and 10 deletions

View File

@@ -786,16 +786,17 @@ func (o *consumer) setLeader(isLeader bool) {
if o.isPushMode() {
o.inch = make(chan bool, 8)
o.acc.sl.registerNotification(o.cfg.DeliverSubject, o.cfg.DeliverGroup, o.inch)
if o.active = <-o.inch; !o.active {
// Check gateways in case they are enabled.
if s.gateway.enabled {
o.active = s.hasGatewayInterest(o.acc.Name, o.cfg.DeliverSubject)
stopAndClearTimer(&o.gwdtmr)
o.gwdtmr = time.AfterFunc(time.Second, func() { o.watchGWinterest() })
}
} else {
if o.active = <-o.inch; o.active {
o.checkQueueInterest()
}
// Check gateways in case they are enabled.
if s.gateway.enabled {
if !o.active {
o.active = s.hasGatewayInterest(o.acc.Name, o.cfg.DeliverSubject)
}
stopAndClearTimer(&o.gwdtmr)
o.gwdtmr = time.AfterFunc(time.Second, func() { o.watchGWinterest() })
}
}
// If we are not in ReplayInstant mode mark us as in replay state until resolved.
@@ -1277,7 +1278,7 @@ func (o *consumer) updateDeliverSubjectLocked(newDeliver string) {
o.forceExpirePending()
}
o.acc.sl.ClearNotification(o.dsubj, o.inch)
o.acc.sl.clearNotification(o.dsubj, o.cfg.DeliverGroup, o.inch)
o.dsubj, o.cfg.DeliverSubject = newDeliver, newDeliver
// When we register new one it will deliver to update state loop.
o.acc.sl.registerNotification(newDeliver, o.cfg.DeliverGroup, o.inch)
@@ -3133,6 +3134,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
o.signalNewMessages()
}
n := o.node
qgroup := o.cfg.DeliverGroup
o.mu.Unlock()
if c != nil {
@@ -3143,7 +3145,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
}
if delivery != _EMPTY_ {
a.sl.ClearNotification(delivery, o.inch)
a.sl.clearNotification(delivery, qgroup, o.inch)
}
mset.mu.Lock()

View File

@@ -9716,6 +9716,54 @@ func TestJetStreamConsumerUpdates(t *testing.T) {
t.Run("Clustered", func(t *testing.T) { testConsumerUpdate(t, c.randomServer(), 2) })
}
func TestJetStreamSuperClusterPushConsumerInterest(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 2)
defer sc.shutdown()
for _, test := range []struct {
name string
queue string
}{
{"non queue", _EMPTY_},
{"queue", "queue"},
} {
t.Run(test.name, func(t *testing.T) {
testInterest := func(s *Server) {
t.Helper()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
var sub *nats.Subscription
if test.queue != _EMPTY_ {
sub, err = js.QueueSubscribeSync("foo", test.queue)
} else {
sub, err = js.SubscribeSync("foo", nats.Durable("dur"))
}
require_NoError(t, err)
js.Publish("foo", []byte("msg1"))
// Since the GW watcher is checking every 1sec, make sure we are
// giving it enough time for the delivery to start.
_, err = sub.NextMsg(2 * time.Second)
require_NoError(t, err)
}
// Create the durable push consumer from cluster "0"
testInterest(sc.clusters[0].servers[0])
// Now "move" to a server in cluster "1"
testInterest(sc.clusters[1].servers[0])
})
}
}
// Support functions
// Used to setup superclusters for tests.