From afb5086f176c30640c1d97fd1bd2ce11394da8d1 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 20 Jun 2023 18:31:08 -0600 Subject: [PATCH] [CHANGED] LeafNode: remotes from same server binding to same hub account Previously, the server would reject a second remote leafnode connection from the same server if it was binding to the same account on the hub even if the remote was using different local accounts. Signed-off-by: Ivan Kozlovic --- server/client.go | 3 + server/leafnode.go | 38 ++-- server/leafnode_test.go | 454 +++++++++++++++++++++++++++++++++++----- 3 files changed, 435 insertions(+), 60 deletions(-) diff --git a/server/client.go b/server/client.go index 5e752d40..b48bbff9 100644 --- a/server/client.go +++ b/server/client.go @@ -624,6 +624,9 @@ type ClientOpts struct { // Routes and Leafnodes only Import *SubjectPermission `json:"import,omitempty"` Export *SubjectPermission `json:"export,omitempty"` + + // Leafnodes + RemoteAccount string `json:"remote_account,omitempty"` } var defaultOpts = ClientOpts{Verbose: true, Pedantic: true, Echo: true} diff --git a/server/leafnode.go b/server/leafnode.go index 35cb186a..900a7aca 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -78,6 +78,8 @@ type leaf struct { remoteServer string // domain name of remote server remoteDomain string + // account name of remote server + remoteAccName string // Used to suppress sub and unsub interest. Same as routes but our audience // here is tied to this leaf node. This will hold all subscriptions except this // leaf nodes. This represents all the interest we want to send to the other side. @@ -348,8 +350,8 @@ func (s *Server) updateRemoteLeafNodesTLSConfig(opts *Options) { return } - s.mu.Lock() - defer s.mu.Unlock() + s.mu.RLock() + defer s.mu.RUnlock() // Changes in the list of remote leaf nodes is not supported. // However, make sure that we don't go over the arrays. @@ -764,16 +766,17 @@ var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}[^\n]*[-]{3,}\n)(.+)(?:\n\s*[- func (c *client) sendLeafConnect(clusterName string, headers bool) error { // We support basic user/pass and operator based user JWT with signatures. cinfo := leafConnectInfo{ - Version: VERSION, - ID: c.srv.info.ID, - Domain: c.srv.info.Domain, - Name: c.srv.info.Name, - Hub: c.leaf.remote.Hub, - Cluster: clusterName, - Headers: headers, - JetStream: c.acc.jetStreamConfigured(), - DenyPub: c.leaf.remote.DenyImports, - Compression: c.leaf.compression, + Version: VERSION, + ID: c.srv.info.ID, + Domain: c.srv.info.Domain, + Name: c.srv.info.Name, + Hub: c.leaf.remote.Hub, + Cluster: clusterName, + Headers: headers, + JetStream: c.acc.jetStreamConfigured(), + DenyPub: c.leaf.remote.DenyImports, + Compression: c.leaf.compression, + RemoteAccount: c.acc.GetName(), } // If a signature callback is specified, this takes precedence over anything else. @@ -1310,6 +1313,8 @@ func (c *client) processLeafnodeInfo(info *Info) { // Clear deadline that was set in createLeafNode while waiting for the INFO. c.nc.SetDeadline(time.Time{}) resumeConnect = true + } else if !firstINFO && didSolicit { + c.leaf.remoteAccName = info.RemoteAccount } // Check if we have the remote account information and if so make sure it's stored. @@ -1503,6 +1508,7 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c } myRemoteDomain := c.leaf.remoteDomain mySrvName := c.leaf.remoteServer + remoteAccName := c.leaf.remoteAccName myClustName := c.leaf.remoteCluster solicited := c.leaf.remote != nil c.mu.Unlock() @@ -1518,7 +1524,8 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c // We have code for the loop detection elsewhere, which also delays // attempt to reconnect. if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName && - ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName { + ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName && + remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName { old = ol } ol.mu.Unlock() @@ -1693,6 +1700,9 @@ type leafConnectInfo struct { // Just used to detect wrong connection attempts. Gateway string `json:"gateway,omitempty"` + + // Tells the accept side which account the remote is binding to. + RemoteAccount string `json:"remote_account,omitempty"` } // processLeafNodeConnect will process the inbound connect args. @@ -1774,6 +1784,8 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro // Remember the remote server. c.leaf.remoteServer = proto.Name + // Remember the remote account name + c.leaf.remoteAccName = proto.RemoteAccount // If the other side has declared itself a hub, so we will take on the spoke role. if proto.Hub { diff --git a/server/leafnode_test.go b/server/leafnode_test.go index fb7b6ec2..7838b762 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -2185,65 +2185,80 @@ func TestLeafNodeLoopDetectedDueToReconnect(t *testing.T) { checkLeafNodeConnected(t, sl) } -func TestLeafNodeTwoRemotesBindToSameAccount(t *testing.T) { +func TestLeafNodeTwoRemotesBindToSameHubAccount(t *testing.T) { opts := DefaultOptions() opts.LeafNode.Host = "127.0.0.1" opts.LeafNode.Port = -1 s := RunServer(opts) defer s.Shutdown() - conf := ` - listen: 127.0.0.1:-1 - cluster { name: ln22, listen: 127.0.0.1:-1 } - accounts { - a { users [ {user: a, password: a} ]} - b { users [ {user: b, password: b} ]} - } - leafnodes { - remotes = [ - { - url: nats-leaf://127.0.0.1:%d - account: a + for _, test := range []struct { + name string + account string + fail bool + }{ + {"different local accounts", "b", false}, + {"same local accounts", "a", true}, + } { + t.Run(test.name, func(t *testing.T) { + conf := ` + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1 } + accounts { + a { users [ {user: a, password: a} ]} + b { users [ {user: b, password: b} ]} } - { - url: nats-leaf://127.0.0.1:%d - account: b + leafnodes { + remotes = [ + { + url: nats-leaf://127.0.0.1:%[1]d + account: a + } + { + url: nats-leaf://127.0.0.1:%[1]d + account: %s + } + ] } - ] - } - ` - lconf := createConfFile(t, []byte(fmt.Sprintf(conf, opts.LeafNode.Port, opts.LeafNode.Port))) + ` + lconf := createConfFile(t, []byte(fmt.Sprintf(conf, opts.LeafNode.Port, test.account))) - lopts, err := ProcessConfigFile(lconf) - if err != nil { - t.Fatalf("Error loading config file: %v", err) - } - lopts.NoLog = false - ln, err := NewServer(lopts) - if err != nil { - t.Fatalf("Error creating server: %v", err) - } - defer ln.Shutdown() - l := &captureErrorLogger{errCh: make(chan string, 10)} - ln.SetLogger(l, false, false) + lopts, err := ProcessConfigFile(lconf) + if err != nil { + t.Fatalf("Error loading config file: %v", err) + } + lopts.NoLog = false + ln, err := NewServer(lopts) + if err != nil { + t.Fatalf("Error creating server: %v", err) + } + defer ln.Shutdown() + l := &captureErrorLogger{errCh: make(chan string, 10)} + ln.SetLogger(l, false, false) - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - ln.Start() - }() + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + ln.Start() + }() - select { - case err := <-l.errCh: - if !strings.Contains(err, DuplicateRemoteLeafnodeConnection.String()) { - t.Fatalf("Unexpected error: %v", err) - } - case <-time.After(2 * time.Second): - t.Fatal("Did not get any error") + select { + case err := <-l.errCh: + if test.fail && !strings.Contains(err, DuplicateRemoteLeafnodeConnection.String()) { + t.Fatalf("Did not get expected duplicate connection error: %v", err) + } else if !test.fail && strings.Contains(err, DuplicateRemoteLeafnodeConnection.String()) { + t.Fatalf("Incorrectly detected a duplicate connection: %v", err) + } + case <-time.After(250 * time.Millisecond): + if test.fail { + t.Fatal("Did not get expected error") + } + } + ln.Shutdown() + wg.Wait() + }) } - ln.Shutdown() - wg.Wait() } func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) { @@ -6443,3 +6458,348 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t }) require_True(t, r2.Load() > r1.Load()) } + +func TestLeafNodeTwoRemotesToSameHubAccount(t *testing.T) { + conf1 := createConfFile(t, []byte(` + port: -1 + server_name: "hub" + accounts { + HA { users: [{user: ha, password: pwd}] } + } + leafnodes { + port: -1 + } + `)) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + conf2 := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + server_name: "spoke" + accounts { + A { users: [{user: A, password: pwd}] } + B { users: [{user: B, password: pwd}] } + C { users: [{user: C, password: pwd}] } + } + leafnodes { + remotes [ + { + url: "nats://ha:pwd@127.0.0.1:%d" + local: "A" + } + { + url: "nats://ha:pwd@127.0.0.1:%d" + local: "C" + } + ] + } + `, o1.LeafNode.Port, o1.LeafNode.Port))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + l := &captureErrorLogger{errCh: make(chan string, 10)} + s2.SetLogger(l, false, false) + + checkLeafNodeConnectedCount(t, s2, 2) + + // Make sure we don't get duplicate leafnode connection errors + deadline := time.NewTimer(1500 * time.Millisecond) + for done := false; !done; { + select { + case err := <-l.errCh: + if strings.Contains(err, DuplicateRemoteLeafnodeConnection.String()) { + t.Fatalf("Got error: %v", err) + } + case <-deadline.C: + done = true + } + } + + nca := natsConnect(t, s2.ClientURL(), nats.UserInfo("A", "pwd")) + defer nca.Close() + suba := natsSubSync(t, nca, "A") + ncb := natsConnect(t, s2.ClientURL(), nats.UserInfo("B", "pwd")) + defer ncb.Close() + subb := natsSubSync(t, ncb, "B") + ncc := natsConnect(t, s2.ClientURL(), nats.UserInfo("C", "pwd")) + defer ncc.Close() + subc := natsSubSync(t, ncc, "C") + subs := map[string]*nats.Subscription{"A": suba, "B": subb, "C": subc} + + for _, subj := range []string{"A", "C"} { + checkSubInterest(t, s1, "HA", subj, time.Second) + } + + nc := natsConnect(t, s1.ClientURL(), nats.UserInfo("ha", "pwd")) + defer nc.Close() + + for _, subj := range []string{"A", "B", "C"} { + natsPub(t, nc, subj, []byte("hello")) + } + + for _, subj := range []string{"A", "B", "C"} { + var expected bool + if subj != "B" { + expected = true + } + sub := subs[subj] + if expected { + natsNexMsg(t, sub, time.Second) + } else { + if _, err := sub.NextMsg(50 * time.Millisecond); err != nats.ErrTimeout { + t.Fatalf("Expected timeout error, got %v", err) + } + } + } +} + +func TestLeafNodeTwoRemotesToSameHubAccountWithClusters(t *testing.T) { + hubTmpl := ` + port: -1 + server_name: "%s" + accounts { + HA { users: [{user: HA, password: pwd}] } + } + cluster { + name: "hub" + port: -1 + %s + } + leafnodes { + port: -1 + } + ` + confH1 := createConfFile(t, []byte(fmt.Sprintf(hubTmpl, "H1", _EMPTY_))) + sh1, oh1 := RunServerWithConfig(confH1) + defer sh1.Shutdown() + + confH2 := createConfFile(t, []byte(fmt.Sprintf(hubTmpl, "H2", fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", oh1.Cluster.Port)))) + sh2, oh2 := RunServerWithConfig(confH2) + defer sh2.Shutdown() + + checkClusterFormed(t, sh1, sh2) + + spokeTmpl := ` + port: -1 + server_name: "%s" + accounts { + A { users: [{user: A, password: pwd}] } + B { users: [{user: B, password: pwd}] } + } + cluster { + name: "spoke" + port: -1 + %s + } + leafnodes { + remotes [ + { + url: "nats://HA:pwd@127.0.0.1:%d" + local: "A" + } + { + url: "nats://HA:pwd@127.0.0.1:%d" + local: "B" + } + ] + } + ` + for _, test := range []struct { + name string + sp2Leafport int + }{ + {"connect to different hub servers", oh2.LeafNode.Port}, + {"connect to same hub server", oh1.LeafNode.Port}, + } { + t.Run(test.name, func(t *testing.T) { + confSP1 := createConfFile(t, []byte(fmt.Sprintf(spokeTmpl, "SP1", _EMPTY_, oh1.LeafNode.Port, oh1.LeafNode.Port))) + sp1, osp1 := RunServerWithConfig(confSP1) + defer sp1.Shutdown() + + confSP2 := createConfFile(t, []byte(fmt.Sprintf(spokeTmpl, "SP2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", osp1.Cluster.Port), test.sp2Leafport, test.sp2Leafport))) + sp2, _ := RunServerWithConfig(confSP2) + defer sp2.Shutdown() + + checkClusterFormed(t, sp1, sp2) + checkLeafNodeConnectedCount(t, sp1, 2) + checkLeafNodeConnectedCount(t, sp2, 2) + + var conns []*nats.Conn + createConn := func(s *Server, user string) { + t.Helper() + nc := natsConnect(t, s.ClientURL(), nats.UserInfo(user, "pwd")) + conns = append(conns, nc) + } + for _, nc := range conns { + defer nc.Close() + } + createConn(sh1, "HA") + createConn(sh2, "HA") + createConn(sp1, "A") + createConn(sp2, "A") + createConn(sp1, "B") + createConn(sp2, "B") + + check := func(subConn *nats.Conn, subj string, checkA, checkB bool) { + t.Helper() + sub := natsSubSync(t, subConn, subj) + defer sub.Unsubscribe() + + checkSubInterest(t, sh1, "HA", subj, time.Second) + checkSubInterest(t, sh2, "HA", subj, time.Second) + if checkA { + checkSubInterest(t, sp1, "A", subj, time.Second) + checkSubInterest(t, sp2, "A", subj, time.Second) + } + if checkB { + checkSubInterest(t, sp1, "B", subj, time.Second) + checkSubInterest(t, sp2, "B", subj, time.Second) + } + + for i, ncp := range conns { + // Don't publish from account "A" connections if we are + // dealing with account "B", and vice-versa. + if !checkA && i >= 2 && i <= 3 { + continue + } + if !checkB && i >= 4 { + continue + } + natsPub(t, ncp, subj, []byte("hello")) + natsNexMsg(t, sub, time.Second) + // Make sure we don't get a duplicate + if msg, err := sub.NextMsg(50 * time.Millisecond); err != nats.ErrTimeout { + t.Fatalf("Unexpected message or error: msg=%v - err=%v", msg, err) + } + } + } + check(conns[0], "HA.1", true, true) + check(conns[1], "HA.2", true, true) + check(conns[2], "SPA.1", true, false) + check(conns[3], "SPA.2", true, false) + check(conns[4], "SPB.1", false, true) + check(conns[5], "SPB.2", false, true) + }) + } +} + +func TestLeafNodeSameLocalAccountToMultipleHubs(t *testing.T) { + hub1Conf := createConfFile(t, []byte(` + port: -1 + server_name: hub1 + accounts { + hub1 { users: [{user: hub1, password: pwd}] } + } + leafnodes { + port: -1 + } + `)) + sh1, oh1 := RunServerWithConfig(hub1Conf) + defer sh1.Shutdown() + + hub2Conf := createConfFile(t, []byte(` + port: -1 + server_name: hub2 + accounts { + hub2 { users: [{user: hub2, password: pwd}] } + } + leafnodes { + port: -1 + } + `)) + sh2, oh2 := RunServerWithConfig(hub2Conf) + defer sh2.Shutdown() + + lconf := createConfFile(t, []byte(fmt.Sprintf(` + port: -1 + server_name: leaf + accounts { + A { users: [{user: A, password: pwd}] } + B { users: [{user: B, password: pwd}] } + C { users: [{user: C, password: pwd}] } + } + leafnodes { + port: -1 + remotes [ + { + url: nats://hub1:pwd@127.0.0.1:%[1]d + local: "A" + } + { + url: nats://hub1:pwd@127.0.0.1:%[1]d + local: "C" + } + { + url: nats://hub2:pwd@127.0.0.1:%[2]d + local: "A" + } + { + url: nats://hub2:pwd@127.0.0.1:%[2]d + local: "B" + } + ] + } + `, oh1.LeafNode.Port, oh2.LeafNode.Port))) + s, _ := RunServerWithConfig(lconf) + defer s.Shutdown() + + // The leafnode to hub1 should have 2 connections (A and C) + // while the one to hub2 should have 2 connections (A and B) + checkLeafNodeConnectedCount(t, sh1, 2) + checkLeafNodeConnectedCount(t, sh2, 2) + checkLeafNodeConnectedCount(t, s, 4) + + nca := natsConnect(t, s.ClientURL(), nats.UserInfo("A", "pwd")) + defer nca.Close() + suba := natsSubSync(t, nca, "A") + ncb := natsConnect(t, s.ClientURL(), nats.UserInfo("B", "pwd")) + defer ncb.Close() + subb := natsSubSync(t, ncb, "B") + ncc := natsConnect(t, s.ClientURL(), nats.UserInfo("C", "pwd")) + defer ncc.Close() + subc := natsSubSync(t, ncc, "C") + + checkSubInterest(t, sh1, "hub1", "A", time.Second) + checkSubNoInterest(t, sh1, "hub1", "B", time.Second) + checkSubInterest(t, sh1, "hub1", "C", time.Second) + + checkSubInterest(t, sh2, "hub2", "A", time.Second) + checkSubInterest(t, sh2, "hub2", "B", time.Second) + checkSubNoInterest(t, sh2, "hub2", "C", time.Second) + + nch1 := natsConnect(t, sh1.ClientURL(), nats.UserInfo("hub1", "pwd")) + defer nch1.Close() + nch2 := natsConnect(t, sh2.ClientURL(), nats.UserInfo("hub2", "pwd")) + defer nch2.Close() + + checkNoMsg := func(sub *nats.Subscription) { + t.Helper() + if msg, err := sub.NextMsg(50 * time.Millisecond); err != nats.ErrTimeout { + t.Fatalf("Unexpected message: %s", msg.Data) + } + } + + checkSub := func(sub *nats.Subscription, subj, payload string) { + t.Helper() + msg := natsNexMsg(t, sub, time.Second) + require_Equal(t, subj, msg.Subject) + require_Equal(t, payload, string(msg.Data)) + // Make sure we don't get duplicates + checkNoMsg(sub) + } + + natsPub(t, nch1, "A", []byte("msgA1")) + checkSub(suba, "A", "msgA1") + natsPub(t, nch1, "B", []byte("msgB1")) + checkNoMsg(subb) + natsPub(t, nch1, "C", []byte("msgC1")) + checkSub(subc, "C", "msgC1") + + natsPub(t, nch2, "A", []byte("msgA2")) + checkSub(suba, "A", "msgA2") + natsPub(t, nch2, "B", []byte("msgB2")) + checkSub(subb, "B", "msgB2") + natsPub(t, nch2, "C", []byte("msgC2")) + checkNoMsg(subc) +}