From 730d8921e4349300bce99d3355f1154abc649f9b Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 20 Apr 2022 08:03:34 -0600 Subject: [PATCH] [FIXED] LeafNode: propagation interest issue after a config reload When a configuration reload is done, the account's leaf node connections were not transfered to the new instance of the account, causing the interest to not be propagated until a leafnode reconnect or a server restart. Resolves #3009 Signed-off-by: Ivan Kozlovic --- server/leafnode_test.go | 111 ++++++++++++++++++++++++++++++++++++++++ server/reload.go | 2 + test/leafnode_test.go | 99 ----------------------------------- 3 files changed, 113 insertions(+), 99 deletions(-) diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 257f7294..4013c95f 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -5419,3 +5419,114 @@ func TestLeafNodeMinVersion(t *testing.T) { // OK } } + +func TestLeafNodeStreamAndShadowSubs(t *testing.T) { + hubConf := createConfFile(t, []byte(` + port: -1 + leafnodes { + port: -1 + authorization: { + user: leaf + password: leaf + account: B + } + } + accounts: { + A: { + users = [{user: usrA, password: usrA}] + exports: [{stream: foo.*.>}] + } + B: { + imports: [{stream: {account: A, subject: foo.*.>}}] + } + } + `)) + defer removeFile(t, hubConf) + hub, hubo := RunServerWithConfig(hubConf) + defer hub.Shutdown() + + leafConfContet := fmt.Sprintf(` + port: -1 + leafnodes { + remotes = [ + { + url: "nats-leaf://leaf:leaf@127.0.0.1:%d" + account: B + } + ] + } + accounts: { + B: { + exports: [{stream: foo.*.>}] + } + C: { + users: [{user: usrC, password: usrC}] + imports: [{stream: {account: B, subject: foo.bar.>}}] + } + } + `, hubo.LeafNode.Port) + leafConf := createConfFile(t, []byte(leafConfContet)) + defer removeFile(t, leafConf) + leafo := LoadConfig(leafConf) + leafo.LeafNode.ReconnectInterval = 50 * time.Millisecond + leaf := RunServer(leafo) + defer leaf.Shutdown() + + checkLeafNodeConnected(t, hub) + checkLeafNodeConnected(t, leaf) + + subPubAndCheck := func() { + t.Helper() + + ncl, err := nats.Connect(leaf.ClientURL(), nats.UserInfo("usrC", "usrC")) + if err != nil { + t.Fatalf("Error connecting: %v", err) + } + defer ncl.Close() + + fmt.Printf("@@IK: ------ sub to foo.*.baz -----\n") + // This will send an LS+ to the "hub" server. + sub, err := ncl.SubscribeSync("foo.*.baz") + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + ncl.Flush() + + ncm, err := nats.Connect(hub.ClientURL(), nats.UserInfo("usrA", "usrA")) + if err != nil { + t.Fatalf("Error connecting: %v", err) + } + defer ncm.Close() + + // Try a few times in case subject interest has not propagated yet + for i := 0; i < 5; i++ { + ncm.Publish("foo.bar.baz", []byte("msg")) + if _, err := sub.NextMsg(time.Second); err == nil { + // OK, done! + return + } + } + t.Fatal("Message was not received") + } + subPubAndCheck() + + // Now cause a restart of the accepting side so that the leaf connection + // is recreated. + hub.Shutdown() + hub = RunServer(hubo) + defer hub.Shutdown() + + checkLeafNodeConnected(t, hub) + checkLeafNodeConnected(t, leaf) + + subPubAndCheck() + + // Issue a config reload even though we make no modification. There was + // a defect that caused the interest propagation to break. + // Set the ReconnectInterval to the default value so that reload does not complain. + leaf.getOpts().LeafNode.ReconnectInterval = DEFAULT_LEAF_NODE_RECONNECT + reloadUpdateConfig(t, leaf, leafConf, leafConfContet) + + // Check again + subPubAndCheck() +} diff --git a/server/reload.go b/server/reload.go index 8d019ca8..da344c34 100644 --- a/server/reload.go +++ b/server/reload.go @@ -1482,6 +1482,8 @@ func (s *Server) reloadAuthorization() { newAcc.clients[c] = struct{}{} } } + // Same for leafnodes + newAcc.lleafs = append([]*client(nil), acc.lleafs...) newAcc.sl = acc.sl newAcc.rm = acc.rm diff --git a/test/leafnode_test.go b/test/leafnode_test.go index b00331d2..7e49715b 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -4422,105 +4422,6 @@ func TestLeafNodeAndGatewaysStreamAndShadowSubs(t *testing.T) { } } -func TestLeafNodeStreamAndShadowSubs(t *testing.T) { - hubConf := createConfFile(t, []byte(` - port: -1 - leafnodes { - port: -1 - authorization: { - user: leaf - password: leaf - account: B - } - } - accounts: { - A: { - users = [{user: usrA, password: usrA}] - exports: [{stream: foo.*.>}] - } - B: { - imports: [{stream: {account: A, subject: foo.*.>}}] - } - } - `)) - defer removeFile(t, hubConf) - hub, hubo := RunServerWithConfig(hubConf) - defer hub.Shutdown() - - leafConf := createConfFile(t, []byte(fmt.Sprintf(` - port: -1 - leafnodes { - remotes = [ - { - url: "nats-leaf://leaf:leaf@127.0.0.1:%d" - account: B - } - ] - } - accounts: { - B: { - exports: [{stream: foo.*.>}] - } - C: { - users: [{user: usrC, password: usrC}] - imports: [{stream: {account: B, subject: foo.bar.>}}] - } - } - `, hubo.LeafNode.Port))) - defer removeFile(t, leafConf) - leafo := LoadConfig(leafConf) - leafo.LeafNode.ReconnectInterval = 50 * time.Millisecond - leaf := RunServer(leafo) - defer leaf.Shutdown() - - checkLeafNodeConnected(t, hub) - checkLeafNodeConnected(t, leaf) - - ncl, err := nats.Connect(leaf.ClientURL(), nats.UserInfo("usrC", "usrC")) - if err != nil { - t.Fatalf("Error connecting: %v", err) - } - defer ncl.Close() - - // This will send an LS+ to the "hub" server. - sub, err := ncl.SubscribeSync("foo.*.baz") - if err != nil { - t.Fatalf("Error subscribing: %v", err) - } - ncl.Flush() - - pubAndCheck := func() { - t.Helper() - ncm, err := nats.Connect(hub.ClientURL(), nats.UserInfo("usrA", "usrA")) - if err != nil { - t.Fatalf("Error connecting: %v", err) - } - defer ncm.Close() - - // Try a few times in case subject interest has not propagated yet - for i := 0; i < 5; i++ { - ncm.Publish("foo.bar.baz", []byte("msg")) - if _, err := sub.NextMsg(time.Second); err == nil { - // OK, done! - return - } - } - t.Fatal("Message was not received") - } - pubAndCheck() - - // Now cause a restart of the accepting side so that the leaf connection - // is recreated. - hub.Shutdown() - hub = RunServer(hubo) - defer hub.Shutdown() - - checkLeafNodeConnected(t, hub) - checkLeafNodeConnected(t, leaf) - - pubAndCheck() -} - func TestLeafnodeHeaders(t *testing.T) { srv, opts := runLeafServer() defer srv.Shutdown()