From 5c6731a2b3bcb687d0326442acd8634b3563a598 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Tue, 25 Aug 2020 19:59:10 -0700 Subject: [PATCH] Add support for cluster name changes from static to dynamic Signed-off-by: Waldemar Quevedo --- server/leafnode_test.go | 3 +- server/monitor.go | 1 + server/reload.go | 56 ++++- server/reload_test.go | 524 +++++++++++++++++++++++++++++++++++++++- server/route.go | 160 ++++++++++-- server/server.go | 66 +++-- 6 files changed, 770 insertions(+), 40 deletions(-) diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 0555908b..96e9d4fc 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -1635,7 +1635,8 @@ func TestLeafNodeOriginClusterInfo(t *testing.T) { // Now make sure that if we update our cluster name, simulating the settling // of dynamic cluster names between competing servers. - s.setClusterName("xyz") + s.setClusterName("xyz", false) + // Make sure we disconnect and reconnect. checkLeafNodeConnectedCount(t, s, 0) checkLeafNodeConnected(t, s) diff --git a/server/monitor.go b/server/monitor.go index ed221d7d..02d54d41 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1337,6 +1337,7 @@ func (s *Server) updateVarzRuntimeFields(v *Varz, forceUpdate bool, pcpu float64 for key, val := range s.httpReqStats { v.HTTPReqStats[key] = val } + v.Cluster.Name = s.info.Cluster // Update Gateway remote urls if applicable gw := s.gateway diff --git a/server/reload.go b/server/reload.go index acd89e69..6407864b 100644 --- a/server/reload.go +++ b/server/reload.go @@ -54,9 +54,13 @@ type option interface { // cluster permissions. IsClusterPermsChange() bool - // IsJetStreamChange inidicates a change in the servers config for JetStream. + // IsJetStreamChange indicates a change in the servers config for JetStream. // Account changes will be handled separately in reloadAuthorization. IsJetStreamChange() bool + + // IsClusterNameChange indicates that the cluster name has changed which might + // implicate changes in the cluster membership of the server. + IsClusterNameChange() bool } // noopOption is a base struct that provides default no-op behaviors. @@ -82,6 +86,10 @@ func (n noopOption) IsJetStreamChange() bool { return false } +func (n noopOption) IsClusterNameChange() bool { + return false +} + // loggingOption is a base struct that provides default option behaviors for // logging-related options. type loggingOption struct { @@ -292,8 +300,10 @@ func (u *nkeysOption) Apply(server *Server) { // clusterOption implements the option interface for the `cluster` setting. type clusterOption struct { authOption + oldValue ClusterOpts newValue ClusterOpts permsChanged bool + nameChanged bool } // Apply the cluster change. @@ -313,8 +323,17 @@ func (c *clusterOption) Apply(s *Server) { } s.setRouteInfoHostPortAndIP() s.mu.Unlock() - if c.newValue.Name != "" && c.newValue.Name != s.ClusterName() { - s.setClusterName(c.newValue.Name) + + // Check whether the cluster name has been removed or changed + // as that could affect a cluster membership. + switch { + case c.oldValue.Name != "" && c.newValue.Name == "": + s.setClusterName(s.defaultClusterName, true) + c.nameChanged = true + case c.newValue.Name != "" && c.oldValue.Name != c.newValue.Name: + // Use the new explicit cluster name from the config. + s.setClusterName(c.newValue.Name, false) + c.nameChanged = true } s.Noticef("Reloaded: cluster") if tlsRequired && c.newValue.TLSConfig.InsecureSkipVerify { @@ -326,6 +345,10 @@ func (c *clusterOption) IsClusterPermsChange() bool { return c.permsChanged } +func (c *clusterOption) IsClusterNameChange() bool { + return c.nameChanged +} + // routesOption implements the option interface for the cluster `routes` // setting. type routesOption struct { @@ -847,7 +870,7 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { return nil, err } permsChanged := !reflect.DeepEqual(newClusterOpts.Permissions, oldClusterOpts.Permissions) - diffOpts = append(diffOpts, &clusterOption{newValue: newClusterOpts, permsChanged: permsChanged}) + diffOpts = append(diffOpts, &clusterOption{oldValue: oldClusterOpts, newValue: newClusterOpts, permsChanged: permsChanged}) case "routes": add, remove := diffRoutes(oldValue.([]*url.URL), newValue.([]*url.URL)) diffOpts = append(diffOpts, &routesOption{add: add, remove: remove}) @@ -1018,6 +1041,7 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { reloadAuth = false reloadClusterPerms = false reloadClientTrcLvl = false + reloadClusterName = false ) for _, opt := range opts { opt.Apply(s) @@ -1033,6 +1057,9 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { if opt.IsClusterPermsChange() { reloadClusterPerms = true } + if opt.IsClusterNameChange() { + reloadClusterName = true + } } if reloadLogging { @@ -1047,6 +1074,9 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { if reloadClusterPerms { s.reloadClusterPermissions(ctx.oldClusterPerms) } + if reloadClusterName { + s.reloadClusterName() + } s.Noticef("Reloaded server configuration") } @@ -1100,6 +1130,24 @@ func (s *Server) reloadClientTraceLevel() { } } +// reloadClusterName detects cluster membership changes triggered +// due to a reload where the name changes. +func (s *Server) reloadClusterName() { + s.mu.Lock() + // Get all connected routes and notify the cluster rename. + infoJSON := s.routeInfoJSON + for _, route := range s.routes { + route.mu.Lock() + route.enqueueProto(infoJSON) + route.mu.Unlock() + } + + // Reloading without an explicit cluster name makes a server + // be susceptible to dynamic cluster name changes. + s.susceptible = s.opts.Cluster.Name == "" + s.mu.Unlock() +} + // reloadAuthorization reconfigures the server authorization settings, // disconnects any clients who are no longer authorized, and removes any // unauthorized subscriptions. diff --git a/server/reload_test.go b/server/reload_test.go index 20d942ff..7b73274c 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -1652,7 +1652,8 @@ func TestConfigReloadClusterAdvertise(t *testing.T) { orgClusterPort := s.ClusterAddr().Port - verify := func(expectedHost string, expectedPort int, expectedIP string) { + verify := func(t *testing.T, expectedHost string, expectedPort int, expectedIP string) { + t.Helper() s.mu.Lock() routeInfo := s.routeInfo routeInfoJSON := Info{} @@ -1679,7 +1680,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) { cluster_advertise: "me:1" } `) - verify("me", 1, "nats-route://me:1/") + verify(t, "me", 1, "nats-route://me:1/") // Update config with cluster_advertise (no port specified) reloadUpdateConfig(t, s, conf, ` @@ -1689,7 +1690,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) { cluster_advertise: "me" } `) - verify("me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort)) + verify(t, "me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort)) // Update config with cluster_advertise (-1 port specified) reloadUpdateConfig(t, s, conf, ` @@ -1699,7 +1700,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) { cluster_advertise: "me:-1" } `) - verify("me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort)) + verify(t, "me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort)) // Update to remove cluster_advertise reloadUpdateConfig(t, s, conf, ` @@ -1708,7 +1709,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) { listen: "0.0.0.0:-1" } `) - verify("0.0.0.0", orgClusterPort, "") + verify(t, "0.0.0.0", orgClusterPort, "") } func TestConfigReloadClusterNoAdvertise(t *testing.T) { @@ -1787,6 +1788,519 @@ func TestConfigReloadClusterName(t *testing.T) { } } +func TestConfigClusterMembershipReload(t *testing.T) { + // + // [A] starts in dynamic mode. + // + s1, _, conf1 := runReloadServerWithContent(t, []byte(` + listen: "0.0.0.0:-1" + server_name: "A" + + cluster: { + listen: "0.0.0.0:-1" + } + `)) + defer os.Remove(conf1) + defer s1.Shutdown() + + got := s1.ClusterName() + if got == "" { + t.Fatalf("Expected update clustername to be set dynamically") + } + + // + // [A] joins AB cluster with explicit name. + // + reloadUpdateConfig(t, s1, conf1, ` + listen: "0.0.0.0:-1" + server_name: "A" + + cluster: { + name: "AB" + listen: "0.0.0.0:-1" + } + `) + if s1.ClusterName() != "AB" { + t.Fatalf("Expected update clustername of \"AB\", got %q", s1.ClusterName()) + } + + // + // [B] joins AB cluster with explicit name. + // + template := fmt.Sprintf(` + listen: "0.0.0.0:-1" + server_name: "B" + + cluster: { + name: "AB" + listen: "0.0.0.0:-1" + routes: [ nats://localhost:%d ] # Route to A + } + `, s1.ClusterAddr().Port) + + s2, _, conf2 := runReloadServerWithContent(t, []byte(template)) + defer os.Remove(conf2) + defer s2.Shutdown() + checkClusterFormed(t, s1, s2) + + // + // [A] client sends request to [B] client and should be able to respond. + // + nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%d", "127.0.0.1", s1.Addr().(*net.TCPAddr).Port)) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer nc1.Close() + + nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%d", "127.0.0.1", s2.Addr().(*net.TCPAddr).Port)) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer nc2.Close() + nc1.Subscribe("test", func(m *nats.Msg) { + m.Respond([]byte("pong")) + }) + nc1.Flush() + _, err = nc2.Request("test", []byte("ping"), 2*time.Second) + if err != nil { + t.Fatalf("Error making request to cluster, got: %s", err) + } + + // + // [B] leaves the cluster AB and stops soliciting from [A], + // [A] is still is part of AB cluster with explicit name at this point. + // + template = fmt.Sprintf(` + listen: "0.0.0.0:-1" + server_name: "B" + + cluster: { + # name: "AB" + listen: "0.0.0.0:-1" + # routes: [ nats://localhost:%d ] # Route to A is gone + } + `, s1.ClusterAddr().Port) + reloadUpdateConfig(t, s2, conf2, template) + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + if numRoutes := s1.NumRoutes(); numRoutes > 0 { + return fmt.Errorf("Expected %d routes for server %q, got %d", 0, s1.ID(), numRoutes) + } + got = s2.ClusterName() + if got == "AB" || got == "" { + return fmt.Errorf("Expected update cluster name to be new, got %q", got) + } + got = s1.ClusterName() + if got != "AB" { + return fmt.Errorf("Expected update cluster name to be AB, got %q", got) + } + + // [B] leaving AB cluster and stop soliciting should dissolve the cluster. + if numRoutes := s1.NumRoutes(); numRoutes != 0 { + return fmt.Errorf("Expected no routes for server %q, got %d", s1.ID(), numRoutes) + } + if numRoutes := s2.NumRoutes(); numRoutes != 0 { + return fmt.Errorf("Expected no routes for server %q, got %d", s2.ID(), numRoutes) + } + return nil + }) + + // + // [A] leaves cluster AB and goes back to dynamic. + // + reloadUpdateConfig(t, s1, conf1, ` + listen: "0.0.0.0:-1" + server_name: "A" + + cluster: { + listen: "0.0.0.0:-1" + } + `) + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + got = s1.ClusterName() + if got == "AB" || got == "" { + return fmt.Errorf("Expected update cluster name to be new, got %q", got) + } + + return nil + }) + + // + // [B] client request fails since not part of cluster. + // + _, err = nc2.Request("test", []byte("failed ping"), 2*time.Second) + if err == nil { + t.Fatalf("Expected error making a request to cluster.") + } + + // + // [C] solicits from [A] and both form a dynamic cluster. + // + template = fmt.Sprintf(` + listen: "0.0.0.0:-1" + server_name: "C" + + cluster: { + listen: "0.0.0.0:-1" + routes: [ nats://localhost:%d ] # Route to A + } + `, s1.ClusterAddr().Port) + + s3, _, conf3 := runReloadServerWithContent(t, []byte(template)) + defer os.Remove(conf3) + defer s3.Shutdown() + checkClusterFormed(t, s1, s3) + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + if numRoutes := s1.NumRoutes(); numRoutes < 1 { + return fmt.Errorf("Expected %d routes for server %q, got %d", 0, s1.ID(), numRoutes) + } + + if s1.ClusterName() != s3.ClusterName() { + return fmt.Errorf("Expected cluster names to be the same: %s != %s", s1.ClusterName(), s3.ClusterName()) + } + + if s2.ClusterName() == s3.ClusterName() { + return fmt.Errorf("Expected cluster names to not be the same: %s == %s", s1.ClusterName(), s3.ClusterName()) + } + return nil + }) + + // + // [C] client makes request to service from [A] which should respond OK. + // + nc3, err := nats.Connect(fmt.Sprintf("nats://%s:%d", "127.0.0.1", s3.Addr().(*net.TCPAddr).Port)) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer nc3.Close() + + _, err = nc3.Request("test", []byte("ping"), 2*time.Second) + if err != nil { + t.Fatalf("Error making request to cluster, got: %s", err) + } +} + +func TestConfigSolicitClusterMembershipReload(t *testing.T) { + // + // [A] starts in dynamic mode. + // + sA, _, confA := runReloadServerWithContent(t, []byte(` + listen: "0.0.0.0:-1" + server_name: "A" + + cluster: { + listen: "0.0.0.0:-1" + } + `)) + defer os.Remove(confA) + defer sA.Shutdown() + + if sA.ClusterName() == "" { + t.Fatalf("Expected update cluster name to be set dynamically") + } + + // + // [B] starts in dynamic mode. + // + sB, _, confB := runReloadServerWithContent(t, []byte(` + listen: "0.0.0.0:-1" + server_name: "B" + + cluster: { + listen: "0.0.0.0:-1" + } + `)) + defer os.Remove(confB) + defer sB.Shutdown() + + if sB.ClusterName() == "" { + t.Fatalf("Expected update cluster name to be set dynamically") + } + + // + // [C] starts with ABC cluster name and the 2 routes. + // + clusterConf := fmt.Sprintf(` + cluster { + name: "ABC" + listen: "0.0.0.0:-1" + routes: [ + nats://localhost:%d + nats://localhost:%d + ] + } + `, sA.ClusterAddr().Port, sB.ClusterAddr().Port) + + sC, _, confC := runReloadServerWithContent(t, []byte(` + listen: "0.0.0.0:-1" + server_name: "C" + `+clusterConf)) + defer os.Remove(confC) + defer sC.Shutdown() + + if sC.ClusterName() != "ABC" { + t.Fatalf("Expected update clustername to be set dynamically") + } + + checkClusterFormed(t, sA, sB, sC) +} + +func TestConfigLeafnodeClusterMembershipReload(t *testing.T) { + // + // [GROUND] starts in dynamic mode. + // + s0, _, conf0 := runReloadServerWithContent(t, []byte(` + listen: "0.0.0.0:-1" + server_name: "GROUND" + + leafnodes: { + listen: "0.0.0.0:-1" + } + `)) + defer os.Remove(conf0) + defer s0.Shutdown() + + leafConf := fmt.Sprintf(` + leafnodes: { + remotes [{ url: "nats://localhost:%d" }] + } + `, s0.LeafnodeAddr().Port) + + // + // [A] connects to [GROUND] via leafnode port + // + sA, _, confA := runReloadServerWithContent(t, []byte(` + listen: "0.0.0.0:-1" + server_name: "A" + + cluster: { + listen: "0.0.0.0:-1" + } + `+leafConf)) + defer os.Remove(confA) + defer sA.Shutdown() + + // + // [B] connects to [GROUND] via leafnode port + // + sB, _, confB := runReloadServerWithContent(t, []byte(` + listen: "0.0.0.0:-1" + server_name: "B" + + cluster: { + listen: "0.0.0.0:-1" + } + `+leafConf)) + defer os.Remove(confB) + defer sB.Shutdown() + + // + // [A] client can make requests to service at [B] + // with the request traveling through [GROUND] node. + // + ncA, err := nats.Connect(fmt.Sprintf("nats://localhost:%d", sA.Addr().(*net.TCPAddr).Port)) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer ncA.Close() + + ncB, err := nats.Connect(fmt.Sprintf("nats://localhost:%d", sB.Addr().(*net.TCPAddr).Port)) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer ncB.Close() + + ncB.Subscribe("help", func(msg *nats.Msg) { + msg.Respond([]byte("OK")) + }) + ncB.Flush() + time.Sleep(500 * time.Millisecond) + + makeRequest := func(payload []byte) error { + _, err := ncA.Request("help", payload, 5*time.Second) + if err != nil { + return err + } + return nil + } + + // + // Make roundtrip via leafnode connections. + // + // [A] <-- lid --> [GROUND] <-- lid --> [B] + // + // + err = makeRequest([]byte("VIA LEAFNODE")) + if err != nil { + t.Fatal(err) + } + + // + // [GROUND] should have 2 leaf connections and + // a single message flowing in/out. + // + leafz, err := s0.Leafz(nil) + if err != nil { + t.Fatal(err) + } + if len(leafz.Leafs) != 2 { + t.Errorf("Expected 2 leafs but got %d", len(leafz.Leafs)) + } + for _, leaf := range leafz.Leafs { + got := int(leaf.InMsgs) + expected := 1 + if got != expected { + t.Errorf("Expected: %d, got: %d", expected, got) + } + + got = int(leaf.OutMsgs) + expected = 1 + if got != expected { + t.Errorf("Expected: %d, got: %d", expected, got) + } + + got = int(leaf.NumSubs) + expected = 2 + if got != expected { + t.Errorf("Expected: %d, got: %d", expected, got) + } + } + + // + // [B] solicit from [A] which will make [A] use the explicit + // cluster name AB, since [A] is in dynamic cluster name mode, + // forming a cluster together. + // + clusterConf := fmt.Sprintf(` + cluster { + name: "AB" + listen: "0.0.0.0:-1" + routes: [ nats://localhost:%d ] + } + `, sA.ClusterAddr().Port) + + reloadUpdateConfig(t, sB, confB, ` + listen: "0.0.0.0:-1" + server_name: "B" + `+clusterConf+leafConf) + + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + if sA.ClusterName() != "AB" || sB.ClusterName() != "AB" { + return fmt.Errorf("Expected clustername of \"AB\", got %q and %q", sA.ClusterName(), sB.ClusterName()) + } + return nil + }) + checkClusterFormed(t, sA, sB) + + // + // The cluster renaming should have caused a reconnection + // so stats will be reset. + // + checkNoChangeInLeafStats := func(t *testing.T) { + t.Helper() + leafz, err = s0.Leafz(nil) + if err != nil { + t.Fatal(err) + } + for _, leaf := range leafz.Leafs { + got := int(leaf.InMsgs) + expected := 0 + if got != expected { + t.Errorf("Expected: %d InMsgs, got: %d", expected, got) + } + + got = int(leaf.OutMsgs) + expected = 0 + if got != expected { + t.Errorf("Expected: %d OutMsgs, got: %d", expected, got) + } + + got = int(leaf.NumSubs) + expected = 2 + if got != expected { + t.Errorf("Expected: %d NumSubs, got: %d", expected, got) + } + } + } + checkNoChangeInLeafStats(t) + + connzB, err := sB.Connz(nil) + if err != nil { + t.Fatal(err) + } + if len(connzB.Conns) != 1 || connzB.Conns[0].InMsgs != 1 || connzB.Conns[0].OutMsgs != 1 { + t.Fatal("Expected connection to node B to receive messages.") + } + + // + // [A] to [B] roundtrip should be now via the cluster routes, + // not the leafnode connections through the [GROUND] node. + // + err = makeRequest([]byte("AFTER CLUSTER FORMED")) + if err != nil { + t.Fatal(err) + } + time.Sleep(500 * time.Millisecond) + connzB, err = sB.Connz(nil) + if err != nil { + t.Fatal(err) + } + if len(connzB.Conns) != 1 || connzB.Conns[0].InMsgs != 2 || connzB.Conns[0].OutMsgs != 2 { + t.Fatal("Expected connection to node B to receive messages.") + } + + // [B] stops soliciting routes from [A] and goes back to dynamic. + // This will cause another leafnode reconnection. + reloadUpdateConfig(t, sB, confB, ` + listen: "0.0.0.0:-1" + server_name: "B" + + cluster: { + listen: "0.0.0.0:-1" + } + `+leafConf) + + // Confirm that there are no routes to both servers and + // that the cluster name has changed. + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + if numRoutes := sA.NumRoutes(); numRoutes != 0 { + return fmt.Errorf("Expected no routes for server, got %d", numRoutes) + } + if numRoutes := sB.NumRoutes(); numRoutes != 0 { + return fmt.Errorf("Expected no routes for server, got %d", numRoutes) + } + + nameB := sB.ClusterName() + if nameB == "AB" || nameB == "" { + return fmt.Errorf("Expected clustername to change, got %q", nameB) + } + + // Wait for leafnode connections to reconnect. + leafz, _ = s0.Leafz(nil) + if len(leafz.Leafs) < 2 { + return fmt.Errorf("Expected 2 leafnode connections, got: %d", len(leafz.Leafs)) + } + + // + // [A] cluster name should not be AB anymore after [B] went back to dynamic mode. + // + if sA.ClusterName() == "AB" { + return fmt.Errorf("Expected cluster name to not be AB!") + } + + return nil + }) + + // New request should have been through the leafnode again, + // all using a dynamic cluster name on each side. + // + // [A] <-- lid --> [GROUND] <-- lid --> [B] + // + err = makeRequest([]byte("REQUEST VIA LEAFNODE AGAIN")) + if err != nil { + t.Fatalf("Expected response via leafnode, got: %s", err) + } +} + func TestConfigReloadMaxSubsUnsupported(t *testing.T) { s, _, conf := runReloadServerWithContent(t, []byte(`max_subs: 1`)) defer os.Remove(conf) diff --git a/server/route.go b/server/route.go index c47cb70b..37ef2c46 100644 --- a/server/route.go +++ b/server/route.go @@ -81,6 +81,11 @@ type route struct { gatewayURL string leafnodeURL string hash string + + // Remember the last cluster name and cluster dynamic state + // from a route in case it has been announced to us. + clusterName string + clusterDynamic bool } type connectInfo struct { @@ -491,6 +496,7 @@ func (c *client) processRouteInfo(info *Info) { supportsHeaders := c.srv.supportsHeaders() clusterName := c.srv.ClusterName() + isSusceptible := c.srv.isClusterNameSusceptibleToChanges() c.mu.Lock() // Connection can be closed at any time (by auth timeout, etc). @@ -514,13 +520,75 @@ func (c *client) processRouteInfo(info *Info) { // Detect if we have a mismatch of cluster names. if info.Cluster != "" && info.Cluster != clusterName { c.mu.Unlock() - // If we are dynamic we may update our cluster name. - // Use other if remote is non dynamic or their name is "bigger" - if s.isClusterNameDynamic() && (!info.Dynamic || (strings.Compare(clusterName, info.Cluster) < 0)) { - s.setClusterName(info.Cluster) + + isDynamic := s.isClusterNameDynamic() + remoteIsDynamic := info.Dynamic + if isDynamic && !remoteIsDynamic { + // The remote INFO sent to this node contains what should be the name of the cluster. + // Update the name marking that no longer susceptible to dynamic cluster updates since + // there is an explicit name in the cluster and cause other nodes to reconnect to + // this node using the explicit name. + s.setClusterName(info.Cluster, false) s.removeAllRoutesExcept(c) c.mu.Lock() + + c.route.clusterName = info.Cluster + c.route.clusterDynamic = info.Dynamic + } else if !isDynamic && remoteIsDynamic { + c.srv.mu.Lock() + routeInfoJSON := s.routeInfoJSON + c.srv.mu.Unlock() + + c.mu.Lock() + + // In case there is a mismatch with what we know about this route, + // we will send it another INFO message with the explicit cluster name + // so that the remote dynamic node uses our cluster name. + if c.route.clusterName != info.Cluster { + c.enqueueProto(routeInfoJSON) + } + + // Override the cluster name from this route with ours since it should win. + c.route.clusterName = s.info.Cluster + c.route.clusterDynamic = info.Dynamic + } else if isDynamic && remoteIsDynamic { + if isSusceptible { + // Ignore the message unless we are susceptible to dynamic cluster updates and their + // dynamic cluster name is a better fit than what we have. If we are not susceptible, + // it means that there is an explicit name in the cluster and eventually nodes with the + // explicit cluster name will make this node use the explicit cluster name via INFO messages. + if strings.Compare(clusterName, info.Cluster) < 0 { + s.setClusterName(info.Cluster, true) + s.removeAllRoutesExcept(c) + } + } else if !s.nonDynamicClusterNameRoutePresent(info.ID) { + // If the route moved from being static to dynamic (thus also changing its name), + // then only use their name if no other static, non dynamic nodes exist and it beats + // this node's default unique cluster name. This can happen for example when the last node + // from a cluster of nodes has its config reloaded to remove the name and go back to dynamic mode. + + // Find any other route with an explicit name but the one that just went back to dynamic mode. + // NOTE: Sometimes nodes with dynamic names can also send their INFOs but due to races, + // by the time others have received it they have already changed its name to an explicit one + // due to a CONNECT message that was sent. This check also covers that scenario. + + // Use the correct name and recluster with that name. + if strings.Compare(s.defaultClusterName, info.Cluster) < 0 { + // Use theirs, let other nodes reconnect using their cluster name. + s.setClusterName(info.Cluster, true) + s.removeAllRoutesExcept(c) + } else { + // Use ours, let other nodes reconnect using our cluster name. + s.setClusterName(s.defaultClusterName, true) + s.removeAllRoutesExcept(c) + } + } + + c.mu.Lock() + c.route.clusterName = info.Cluster + c.route.clusterDynamic = info.Dynamic } else { + // We have an explicit cluster name and reject explicit cluster name changes. c.closeConnection(ClusterNameConflict) return } @@ -1920,23 +1988,35 @@ func (c *client) processRouteConnect(srv *Server, arg []byte, lang string) error } clusterName := srv.ClusterName() + isSusceptible := srv.isClusterNameSusceptibleToChanges() // If we have a cluster name set, make sure it matches ours. if proto.Cluster != clusterName { - shouldReject := true // If we have a dynamic name we will do additional checks. - if srv.isClusterNameDynamic() { - if !proto.Dynamic || strings.Compare(clusterName, proto.Cluster) < 0 { - // We will take on their name since theirs is configured or higher then ours. - srv.setClusterName(proto.Cluster) - if !proto.Dynamic { - srv.getOpts().Cluster.Name = proto.Cluster - } + isDynamic := srv.isClusterNameDynamic() + remoteIsDynamic := proto.Dynamic + + if isDynamic && !remoteIsDynamic { + // c.Noticef("Updating the explicit name from the CONNECT message: ", proto.Cluster) + // Use the explicit name and mark current node as not susceptible + // to dynamic cluster name changes. + srv.setClusterName(proto.Cluster, false) + srv.removeAllRoutesExcept(c) + } else if !isDynamic && remoteIsDynamic { + // Do nothing since the remote may use this node's cluster name + // when it is sent an INFO message. Also override the cluster name + // from this route with ours since should win. + c.mu.Lock() + c.route.clusterName = srv.info.Cluster + c.mu.Unlock() + } else if isDynamic && remoteIsDynamic { + // If we haven't learned an explicit name, we will take on their name if theirs is higher then ours. + if isSusceptible && strings.Compare(clusterName, proto.Cluster) < 0 { + srv.setClusterName(proto.Cluster, isSusceptible) srv.removeAllRoutesExcept(c) - shouldReject = false } - } - if shouldReject { + } else { + // We have an explicit cluster name and reject explicit cluster name changes. errTxt := fmt.Sprintf("Rejecting connection, cluster name %q does not match %q", proto.Cluster, srv.info.Cluster) c.Errorf(errTxt) c.sendErr(errTxt) @@ -1953,6 +2033,10 @@ func (c *client) processRouteConnect(srv *Server, arg []byte, lang string) error c.route.lnoc = proto.LNOC c.setRoutePermissions(perms) c.headers = supportsHeaders && proto.Headers + + // Remember their cluster name. + c.route.clusterName = proto.Cluster + c.route.clusterDynamic = proto.Dynamic c.mu.Unlock() return nil } @@ -1973,6 +2057,23 @@ func (s *Server) removeAllRoutesExcept(c *client) { } } +// Called when we update our cluster name during negotiations with remotes. +func (s *Server) nonDynamicClusterNameRoutePresent(remoteID string) bool { + s.mu.Lock() + defer s.mu.Unlock() + for _, r := range s.routes { + // Check all routes but this one. + if r.route.remoteID == remoteID { + continue + } + if r.route.clusterName != "" && !r.route.clusterDynamic { + return true + } + } + + return false +} + func (s *Server) removeRoute(c *client) { var rID string var lnURL string @@ -1989,6 +2090,25 @@ func (s *Server) removeRoute(c *client) { } c.mu.Unlock() s.mu.Lock() + + // Check whether we do not have any other routes that are not using explicit names, + // since in that case this node should go back to using a dynamic cluster name + // that will be negotiated among the remaining members of the cluster. + var revertClusterName bool + if s.isClusterNameDynamic() && !s.susceptible { + var nonDynamicRouteFound bool + for _, r := range s.routes { + if r.route.clusterName != "" && !r.route.clusterDynamic { + nonDynamicRouteFound = true + } + } + + // Check whether there are no more routes, if so go back to use the original dynamic cluster name. + if len(s.routes) <= 1 && nonDynamicRouteFound { + revertClusterName = true + } + } + delete(s.routes, cid) if r != nil { rc, ok := s.remotes[rID] @@ -2010,4 +2130,14 @@ func (s *Server) removeRoute(c *client) { } s.removeFromTempClients(cid) s.mu.Unlock() + + // When the last route to this cluster is disconnected + // and we were in dynamic mode, then we will go back + // to using the original dynamic name that we had. + if revertClusterName { + s.mu.Lock() + name := s.defaultClusterName + s.mu.Unlock() + s.setClusterName(name, true) + } } diff --git a/server/server.go b/server/server.go index 2e9a6955..b71897ad 100644 --- a/server/server.go +++ b/server/server.go @@ -218,6 +218,14 @@ type Server struct { // Websocket structure websocket srvWebsocket + + // susceptible is to mark whether this node is susceptible + // to dynamic cluster name INFO/CONNECT proto updates. + susceptible bool + + // defaultClusterName is the default cluster name used + // in case dynamic clustering name is used. + defaultClusterName string } // Make sure all are 64bits for atomic use @@ -289,17 +297,19 @@ func NewServer(opts *Options) (*Server, error) { now := time.Now() s := &Server{ - kp: kp, - configFile: opts.ConfigFile, - info: info, - prand: rand.New(rand.NewSource(time.Now().UnixNano())), - opts: opts, - done: make(chan bool, 1), - start: now, - configTime: now, - gwLeafSubs: NewSublistWithCache(), - httpBasePath: httpBasePath, - eventIds: nuid.New(), + kp: kp, + configFile: opts.ConfigFile, + info: info, + prand: rand.New(rand.NewSource(time.Now().UnixNano())), + opts: opts, + done: make(chan bool, 1), + start: now, + configTime: now, + gwLeafSubs: NewSublistWithCache(), + httpBasePath: httpBasePath, + eventIds: nuid.New(), + susceptible: opts.Cluster.Name == "", + defaultClusterName: nuid.Next(), } // Trusted root operator keys. @@ -325,9 +335,11 @@ func NewServer(opts *Options) (*Server, error) { return nil, err } - // If we have a cluster definition but do not have a cluster name, create one. + // If we have a cluster definition but do not have a cluster name, + // use the default dynamic cluster name for the node. if opts.Cluster.Port != 0 && opts.Cluster.Name == "" { - s.info.Cluster = nuid.Next() + s.info.Cluster = s.defaultClusterName + s.susceptible = true } // This is normally done in the AcceptLoop, once the @@ -449,7 +461,7 @@ func (s *Server) ClusterName() string { } // setClusterName will update the cluster name for this server. -func (s *Server) setClusterName(name string) { +func (s *Server) setClusterName(name string, susceptible bool) { s.mu.Lock() var resetCh chan struct{} if s.sys != nil && s.info.Cluster != name { @@ -458,8 +470,16 @@ func (s *Server) setClusterName(name string) { } s.info.Cluster = name s.routeInfo.Cluster = name + + // Marks whether this is a dynamic name change that is + // susceptible to changes due to INFO/CONNECT messages + // from other nodes in the cluster. + s.susceptible = susceptible + s.routeInfo.Dynamic = susceptible + // Regenerate the info byte array s.generateRouteInfoJSON() + // Need to close solicited leaf nodes. The close has to be done outside of the server lock. var leafs []*client for _, c := range s.leafs { @@ -477,7 +497,6 @@ func (s *Server) setClusterName(name string) { resetCh <- struct{}{} } s.Noticef("Cluster name updated to %s", name) - } // Return whether the cluster name is dynamic. @@ -485,6 +504,13 @@ func (s *Server) isClusterNameDynamic() bool { return s.getOpts().Cluster.Name == "" } +// Return whether the cluster name is susceptible to cluster name changes. +func (s *Server) isClusterNameSusceptibleToChanges() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.susceptible +} + // ClientURL returns the URL used to connect clients. Helpful in testing // when we designate a random client port (-1). func (s *Server) ClientURL() string { @@ -2545,6 +2571,16 @@ func (s *Server) ProfilerAddr() *net.TCPAddr { return s.profiler.Addr().(*net.TCPAddr) } +// LeafnodeAddr returns the net.Addr object for the leafnode listener. +func (s *Server) LeafnodeAddr() *net.TCPAddr { + s.mu.Lock() + defer s.mu.Unlock() + if s.leafNodeListener == nil { + return nil + } + return s.leafNodeListener.Addr().(*net.TCPAddr) +} + // ReadyForConnections returns `true` if the server is ready to accept clients // and, if routing is enabled, route connections. If after the duration // `dur` the server is still not ready, returns `false`.