From e2e3de997789e5a1e5ea58ac7dc0cab742ee5ffb Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 28 Apr 2021 17:11:51 -0600 Subject: [PATCH] [FIXED] Message loop with cluster, leaf nodes and queue subs In a setup with a cluster of servers to which 2 different leaf nodes attach to, and queue subs are attached to one of the leaf, if the leaf server is restarted and reconnects to another server in the cluster, there was a risk for an infinite message loop between some servers in the "hub" cluster. Signed-off-by: Ivan Kozlovic --- server/client.go | 29 +++++--- server/leafnode.go | 8 +- server/leafnode_test.go | 157 ++++++++++++++++++++++++++++++++++++++++ server/route.go | 6 +- 4 files changed, 186 insertions(+), 14 deletions(-) diff --git a/server/client.go b/server/client.go index 802b5d37..4adda8e9 100644 --- a/server/client.go +++ b/server/client.go @@ -4114,7 +4114,10 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, for i := 0; i < len(qsubs); i++ { sub = qsubs[i] if sub.client.kind == LEAF || sub.client.kind == ROUTER { - if rsub == nil { + // If we have assigned an rsub already, replace if the destination is LEAF + // since we want to favor that compared to a ROUTER. We could make sure that + // we override only if previous was a ROUTER and not a LEAF, but we don't have to. + if rsub == nil || sub.client.kind == LEAF { rsub = sub } } else { @@ -4545,6 +4548,7 @@ func (c *client) closeConnection(reason ClosedState) { srv = c.srv noReconnect = c.flags.isSet(noReconnect) acc = c.acc + spoke bool ) // Snapshot for use if we are a client connection. @@ -4560,6 +4564,7 @@ func (c *client) closeConnection(reason ClosedState) { sub.close() subs = append(subs, sub) } + spoke = c.isSpokeLeafNode() } if c.route != nil { @@ -4593,7 +4598,6 @@ func (c *client) closeConnection(reason ClosedState) { // Unregister srv.removeClient(c) - notSpoke := !(kind == LEAF && c.isSpokeLeafNode()) // Update remote subscriptions. if acc != nil && (kind == CLIENT || kind == LEAF) { qsubs := map[string]*qsub{} @@ -4602,29 +4606,36 @@ func (c *client) closeConnection(reason ClosedState) { c.unsubscribe(acc, sub, true, false) // Update route as normal for a normal subscriber. if sub.queue == nil { - if notSpoke { + if !spoke { srv.updateRouteSubscriptionMap(acc, sub, -1) + if srv.gateway.enabled { + srv.gatewayUpdateSubInterest(acc.Name, sub, -1) + } } srv.updateLeafNodes(acc, sub, -1) } else { // We handle queue subscribers special in case we // have a bunch we can just send one update to the // connected routes. + num := int32(1) + if kind == LEAF { + num = sub.qw + } key := string(sub.subject) + " " + string(sub.queue) if esub, ok := qsubs[key]; ok { - esub.n++ + esub.n += num } else { - qsubs[key] = &qsub{sub, 1} + qsubs[key] = &qsub{sub, num} } } - if srv.gateway.enabled && notSpoke { - srv.gatewayUpdateSubInterest(acc.Name, sub, -1) - } } // Process any qsubs here. for _, esub := range qsubs { - if notSpoke { + if !spoke { srv.updateRouteSubscriptionMap(acc, esub.sub, -(esub.n)) + if srv.gateway.enabled { + srv.gatewayUpdateSubInterest(acc.Name, esub.sub, -(esub.n)) + } } srv.updateLeafNodes(acc, esub.sub, -(esub.n)) } diff --git a/server/leafnode.go b/server/leafnode.go index 3f253372..15abe091 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1592,6 +1592,7 @@ func (c *client) processLeafSub(argo []byte) (err error) { key := string(sub.sid) osub := c.subs[key] updateGWs := false + delta := int32(1) if osub == nil { c.subs[key] = sub // Now place into the account sl. @@ -1605,6 +1606,7 @@ func (c *client) processLeafSub(argo []byte) (err error) { updateGWs = srv.gateway.enabled } else if sub.queue != nil { // For a queue we need to update the weight. + delta = sub.qw - atomic.LoadInt32(&osub.qw) atomic.StoreInt32(&osub.qw, sub.qw) acc.sl.UpdateRemoteQSub(osub) } @@ -1620,14 +1622,14 @@ func (c *client) processLeafSub(argo []byte) (err error) { // other leaf nodes as needed. if !spoke { // If we are routing add to the route map for the associated account. - srv.updateRouteSubscriptionMap(acc, sub, 1) + srv.updateRouteSubscriptionMap(acc, sub, delta) if updateGWs { - srv.gatewayUpdateSubInterest(acc.Name, sub, 1) + srv.gatewayUpdateSubInterest(acc.Name, sub, delta) } } // Now check on leafnode updates for other leaf nodes. We understand solicited // and non-solicited state in this call so we will do the right thing. - srv.updateLeafNodes(acc, sub, 1) + srv.updateLeafNodes(acc, sub, delta) return nil } diff --git a/server/leafnode_test.go b/server/leafnode_test.go index d4559226..7da76420 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -3513,3 +3513,160 @@ func TestLeafNodeNoPingBeforeConnect(t *testing.T) { } } } + +func TestLeafNodeNoMsgLoop(t *testing.T) { + hubConf := ` + listen: "127.0.0.1:-1" + accounts { + FOO { + users [ + {username: leaf, password: pass} + {username: user, password: pass} + ] + } + } + cluster { + name: "hub" + listen: "127.0.0.1:-1" + %s + } + leafnodes { + listen: "127.0.0.1:-1" + authorization { + account: FOO + } + } + ` + configS1 := createConfFile(t, []byte(fmt.Sprintf(hubConf, ""))) + defer removeFile(t, configS1) + s1, o1 := RunServerWithConfig(configS1) + defer s1.Shutdown() + + configS2S3 := createConfFile(t, []byte(fmt.Sprintf(hubConf, fmt.Sprintf(`routes: ["nats://127.0.0.1:%d"]`, o1.Cluster.Port)))) + defer removeFile(t, configS2S3) + s2, o2 := RunServerWithConfig(configS2S3) + defer s2.Shutdown() + + s3, _ := RunServerWithConfig(configS2S3) + defer s3.Shutdown() + + checkClusterFormed(t, s1, s2, s3) + + contentLN := ` + listen: "127.0.0.1:%d" + accounts { + FOO { + users [ + {username: leaf, password: pass} + {username: user, password: pass} + ] + } + } + leafnodes { + remotes = [ + { + url: "nats://leaf:pass@127.0.0.1:%d" + account: FOO + } + ] + } + ` + lnconf := createConfFile(t, []byte(fmt.Sprintf(contentLN, -1, o1.LeafNode.Port))) + defer removeFile(t, lnconf) + sl1, slo1 := RunServerWithConfig(lnconf) + defer sl1.Shutdown() + + sl2, slo2 := RunServerWithConfig(lnconf) + defer sl2.Shutdown() + + checkLeafNodeConnected(t, sl1) + checkLeafNodeConnected(t, sl2) + + // Create users on each leafnode + nc1, err := nats.Connect(fmt.Sprintf("nats://user:pass@127.0.0.1:%d", slo1.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc1.Close() + + rch := make(chan struct{}, 1) + nc2, err := nats.Connect( + fmt.Sprintf("nats://user:pass@127.0.0.1:%d", slo2.Port), + nats.ReconnectWait(50*time.Millisecond), + nats.ReconnectHandler(func(_ *nats.Conn) { + rch <- struct{}{} + }), + ) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + + // Create queue subs on sl2 + nc2.QueueSubscribe("foo", "bar", func(_ *nats.Msg) {}) + nc2.QueueSubscribe("foo", "bar", func(_ *nats.Msg) {}) + nc2.Flush() + + // Wait for interest to propagate to sl1 + checkSubInterest(t, sl1, "FOO", "foo", 250*time.Millisecond) + + // Create sub on sl1 + ch := make(chan *nats.Msg, 10) + nc1.Subscribe("foo", func(m *nats.Msg) { + select { + case ch <- m: + default: + } + }) + nc1.Flush() + + checkSubInterest(t, sl2, "FOO", "foo", 250*time.Millisecond) + + // Produce from sl1 + nc1.Publish("foo", []byte("msg1")) + + // Check message is received by plain sub + select { + case <-ch: + case <-time.After(time.Second): + t.Fatalf("Did not receive message") + } + + // Restart leaf node, this time make sure we connect to 2nd server. + sl2.Shutdown() + + // Use config file but this time reuse the client port and set the 2nd server for + // the remote leaf node port. + lnconf = createConfFile(t, []byte(fmt.Sprintf(contentLN, slo2.Port, o2.LeafNode.Port))) + defer removeFile(t, lnconf) + sl2, _ = RunServerWithConfig(lnconf) + defer sl2.Shutdown() + + checkLeafNodeConnected(t, sl2) + + // Wait for client to reconnect + select { + case <-rch: + case <-time.After(time.Second): + t.Fatalf("Did not reconnect") + } + + // Produce a new messages + for i := 0; i < 10; i++ { + nc1.Publish("foo", []byte(fmt.Sprintf("msg%d", 2+i))) + + // Check sub receives 1 message + select { + case <-ch: + case <-time.After(time.Second): + t.Fatalf("Did not receive message") + } + // Check that there is no more... + select { + case m := <-ch: + t.Fatalf("Loop: received second message %s", m.Data) + case <-time.After(50 * time.Millisecond): + // OK + } + } +} diff --git a/server/route.go b/server/route.go index d9e4614b..fa2c0a69 100644 --- a/server/route.go +++ b/server/route.go @@ -1087,6 +1087,7 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { osub := c.subs[key] updateGWs := false + delta := int32(1) if osub == nil { c.subs[key] = sub // Now place into the account sl. @@ -1100,17 +1101,18 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { updateGWs = srv.gateway.enabled } else if sub.queue != nil { // For a queue we need to update the weight. + delta = sub.qw - atomic.LoadInt32(&osub.qw) atomic.StoreInt32(&osub.qw, sub.qw) acc.sl.UpdateRemoteQSub(osub) } c.mu.Unlock() if updateGWs { - srv.gatewayUpdateSubInterest(acc.Name, sub, 1) + srv.gatewayUpdateSubInterest(acc.Name, sub, delta) } // Now check on leafnode updates. - srv.updateLeafNodes(acc, sub, 1) + srv.updateLeafNodes(acc, sub, delta) if c.opts.Verbose { c.sendOK()