From 3053039ff3a8a3f21819cf1d1b7a89ef38c089c0 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 16 Dec 2021 17:20:16 -0700 Subject: [PATCH] [FIXED] JetStream: interest across gateways If the interest existed prior to the initial creation of the consumer, the gateway "watcher" would not be started, which means that interest moving across the super-cluster after that would not be detected. The watcher runs every second and not sure if this is costly or not, so we may want to go a different approach of having a separate interest change channel that would be specific to gateways. But this means adding a new sublist where the interest would be registered and that sublist would need to be updated when processing GW RSub and RUnsub? Signed-off-by: Ivan Kozlovic --- server/consumer.go | 22 ++++++++------- server/jetstream_cluster_test.go | 48 ++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 10 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index fa5ab1f3..6c03e8c0 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -786,16 +786,17 @@ func (o *consumer) setLeader(isLeader bool) { if o.isPushMode() { o.inch = make(chan bool, 8) o.acc.sl.registerNotification(o.cfg.DeliverSubject, o.cfg.DeliverGroup, o.inch) - if o.active = <-o.inch; !o.active { - // Check gateways in case they are enabled. - if s.gateway.enabled { - o.active = s.hasGatewayInterest(o.acc.Name, o.cfg.DeliverSubject) - stopAndClearTimer(&o.gwdtmr) - o.gwdtmr = time.AfterFunc(time.Second, func() { o.watchGWinterest() }) - } - } else { + if o.active = <-o.inch; o.active { o.checkQueueInterest() } + // Check gateways in case they are enabled. + if s.gateway.enabled { + if !o.active { + o.active = s.hasGatewayInterest(o.acc.Name, o.cfg.DeliverSubject) + } + stopAndClearTimer(&o.gwdtmr) + o.gwdtmr = time.AfterFunc(time.Second, func() { o.watchGWinterest() }) + } } // If we are not in ReplayInstant mode mark us as in replay state until resolved. @@ -1277,7 +1278,7 @@ func (o *consumer) updateDeliverSubjectLocked(newDeliver string) { o.forceExpirePending() } - o.acc.sl.ClearNotification(o.dsubj, o.inch) + o.acc.sl.clearNotification(o.dsubj, o.cfg.DeliverGroup, o.inch) o.dsubj, o.cfg.DeliverSubject = newDeliver, newDeliver // When we register new one it will deliver to update state loop. o.acc.sl.registerNotification(newDeliver, o.cfg.DeliverGroup, o.inch) @@ -3133,6 +3134,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { o.signalNewMessages() } n := o.node + qgroup := o.cfg.DeliverGroup o.mu.Unlock() if c != nil { @@ -3143,7 +3145,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { } if delivery != _EMPTY_ { - a.sl.ClearNotification(delivery, o.inch) + a.sl.clearNotification(delivery, qgroup, o.inch) } mset.mu.Lock() diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 9c072073..cfe2f16a 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -9716,6 +9716,54 @@ func TestJetStreamConsumerUpdates(t *testing.T) { t.Run("Clustered", func(t *testing.T) { testConsumerUpdate(t, c.randomServer(), 2) }) } +func TestJetStreamSuperClusterPushConsumerInterest(t *testing.T) { + sc := createJetStreamSuperCluster(t, 3, 2) + defer sc.shutdown() + + for _, test := range []struct { + name string + queue string + }{ + {"non queue", _EMPTY_}, + {"queue", "queue"}, + } { + t.Run(test.name, func(t *testing.T) { + testInterest := func(s *Server) { + t.Helper() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + var sub *nats.Subscription + if test.queue != _EMPTY_ { + sub, err = js.QueueSubscribeSync("foo", test.queue) + } else { + sub, err = js.SubscribeSync("foo", nats.Durable("dur")) + } + require_NoError(t, err) + + js.Publish("foo", []byte("msg1")) + // Since the GW watcher is checking every 1sec, make sure we are + // giving it enough time for the delivery to start. + _, err = sub.NextMsg(2 * time.Second) + require_NoError(t, err) + } + + // Create the durable push consumer from cluster "0" + testInterest(sc.clusters[0].servers[0]) + + // Now "move" to a server in cluster "1" + testInterest(sc.clusters[1].servers[0]) + }) + } +} + // Support functions // Used to setup superclusters for tests.