diff --git a/server/reload.go b/server/reload.go index 3df1c10c..7834412c 100644 --- a/server/reload.go +++ b/server/reload.go @@ -25,6 +25,7 @@ import ( "time" "github.com/nats-io/jwt/v2" + "github.com/nats-io/nuid" ) // FlagSnapshot captures the server options as specified by CLI flags at @@ -54,9 +55,13 @@ type option interface { // cluster permissions. IsClusterPermsChange() bool - // IsJetStreamChange inidicates a change in the servers config for JetStream. + // IsJetStreamChange indicates a change in the servers config for JetStream. // Account changes will be handled separately in reloadAuthorization. IsJetStreamChange() bool + + // IsClusterNameChange indicates that the cluster name has changed which might + // implicate changes in the cluster membership of the server. + IsClusterNameChange() bool } // noopOption is a base struct that provides default no-op behaviors. @@ -82,6 +87,10 @@ func (n noopOption) IsJetStreamChange() bool { return false } +func (n noopOption) IsClusterNameChange() bool { + return false +} + // loggingOption is a base struct that provides default option behaviors for // logging-related options. type loggingOption struct { @@ -292,8 +301,10 @@ func (u *nkeysOption) Apply(server *Server) { // clusterOption implements the option interface for the `cluster` setting. type clusterOption struct { authOption + oldValue ClusterOpts newValue ClusterOpts permsChanged bool + nameChanged bool } // Apply the cluster change. @@ -313,10 +324,22 @@ func (c *clusterOption) Apply(s *Server) { } s.setRouteInfoHostPortAndIP() s.mu.Unlock() - if c.newValue.Name != "" && c.newValue.Name != s.ClusterName() { + + // Check whether the cluster name has been removed or changed as that could affect a cluster membership. + switch { + case c.oldValue.Name != "" && c.newValue.Name == "": + // NOTE: If this node is still part of the static routes from a node that has an explicit cluster name + // then this new generated dynamic name will be overridden by the remote one when it sends CONNECT again. + s.setClusterName(nuid.Next()) + c.nameChanged = true + case c.newValue.Name != "" && c.oldValue.Name != c.newValue.Name: + // Use the new explicit cluster name from the config. s.setClusterName(c.newValue.Name) + c.nameChanged = true } + s.Noticef("Reloaded: cluster") + if tlsRequired && c.newValue.TLSConfig.InsecureSkipVerify { s.Warnf(clusterTLSInsecureWarning) } @@ -326,6 +349,10 @@ func (c *clusterOption) IsClusterPermsChange() bool { return c.permsChanged } +func (c *clusterOption) IsClusterNameChange() bool { + return c.nameChanged +} + // routesOption implements the option interface for the cluster `routes` // setting. type routesOption struct { @@ -847,7 +874,7 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { return nil, err } permsChanged := !reflect.DeepEqual(newClusterOpts.Permissions, oldClusterOpts.Permissions) - diffOpts = append(diffOpts, &clusterOption{newValue: newClusterOpts, permsChanged: permsChanged}) + diffOpts = append(diffOpts, &clusterOption{oldValue: oldClusterOpts, newValue: newClusterOpts, permsChanged: permsChanged}) case "routes": add, remove := diffRoutes(oldValue.([]*url.URL), newValue.([]*url.URL)) diffOpts = append(diffOpts, &routesOption{add: add, remove: remove}) @@ -1018,6 +1045,7 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { reloadAuth = false reloadClusterPerms = false reloadClientTrcLvl = false + reloadClusterName = false ) for _, opt := range opts { opt.Apply(s) @@ -1033,6 +1061,9 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { if opt.IsClusterPermsChange() { reloadClusterPerms = true } + if opt.IsClusterNameChange() { + reloadClusterName = true + } } if reloadLogging { @@ -1047,6 +1078,9 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) { if reloadClusterPerms { s.reloadClusterPermissions(ctx.oldClusterPerms) } + if reloadClusterName { + s.reloadClusterName() + } s.Noticef("Reloaded server configuration") } @@ -1100,6 +1134,20 @@ func (s *Server) reloadClientTraceLevel() { } } +// reloadClusterName detects cluster membership changes triggered +// due to a reload where the name changes. +func (s *Server) reloadClusterName() { + s.mu.Lock() + // Get all connected routes and notify the cluster rename. + infoJSON := s.routeInfoJSON + for _, route := range s.routes { + route.mu.Lock() + route.enqueueProto(infoJSON) + route.mu.Unlock() + } + s.mu.Unlock() +} + // reloadAuthorization reconfigures the server authorization settings, // disconnects any clients who are no longer authorized, and removes any // unauthorized subscriptions. diff --git a/server/reload_test.go b/server/reload_test.go index 20d942ff..ec988f54 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -1652,7 +1652,8 @@ func TestConfigReloadClusterAdvertise(t *testing.T) { orgClusterPort := s.ClusterAddr().Port - verify := func(expectedHost string, expectedPort int, expectedIP string) { + verify := func(t *testing.T, expectedHost string, expectedPort int, expectedIP string) { + t.Helper() s.mu.Lock() routeInfo := s.routeInfo routeInfoJSON := Info{} @@ -1679,7 +1680,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) { cluster_advertise: "me:1" } `) - verify("me", 1, "nats-route://me:1/") + verify(t, "me", 1, "nats-route://me:1/") // Update config with cluster_advertise (no port specified) reloadUpdateConfig(t, s, conf, ` @@ -1689,7 +1690,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) { cluster_advertise: "me" } `) - verify("me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort)) + verify(t, "me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort)) // Update config with cluster_advertise (-1 port specified) reloadUpdateConfig(t, s, conf, ` @@ -1699,7 +1700,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) { cluster_advertise: "me:-1" } `) - verify("me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort)) + verify(t, "me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort)) // Update to remove cluster_advertise reloadUpdateConfig(t, s, conf, ` @@ -1708,7 +1709,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) { listen: "0.0.0.0:-1" } `) - verify("0.0.0.0", orgClusterPort, "") + verify(t, "0.0.0.0", orgClusterPort, "") } func TestConfigReloadClusterNoAdvertise(t *testing.T) { @@ -1787,6 +1788,452 @@ func TestConfigReloadClusterName(t *testing.T) { } } +func TestConfigClusterMembershipReload(t *testing.T) { + // + // [A] starts in dynamic mode. + // + s1, _, conf1 := runReloadServerWithContent(t, []byte(` + listen: "0.0.0.0:-1" + server_name: "A" + + cluster: { + listen: "0.0.0.0:-1" + } + `)) + defer os.Remove(conf1) + defer s1.Shutdown() + + got := s1.ClusterName() + if got == "" { + t.Fatalf("Expected update clustername to be set dynamically") + } + + // + // [A] joins AB cluster with explicit name. + // + reloadUpdateConfig(t, s1, conf1, ` + listen: "0.0.0.0:-1" + server_name: "A" + + cluster: { + name: "AB" + listen: "0.0.0.0:-1" + } + `) + if s1.ClusterName() != "AB" { + t.Fatalf("Expected update clustername of \"AB\", got %q", s1.ClusterName()) + } + + // + // [B] joins AB cluster with explicit name. + // + template := fmt.Sprintf(` + listen: "0.0.0.0:-1" + server_name: "B" + + cluster: { + name: "AB" + listen: "0.0.0.0:-1" + routes: [ nats://localhost:%d ] # Route to A + } + `, s1.ClusterAddr().Port) + + s2, _, conf2 := runReloadServerWithContent(t, []byte(template)) + defer os.Remove(conf2) + defer s2.Shutdown() + checkClusterFormed(t, s1, s2) + + // + // [A] client sends request to [B] client and should be able to respond. + // + nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%d", "127.0.0.1", s1.Addr().(*net.TCPAddr).Port)) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer nc1.Close() + + nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%d", "127.0.0.1", s2.Addr().(*net.TCPAddr).Port)) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer nc2.Close() + nc1.Subscribe("test", func(m *nats.Msg) { + m.Respond([]byte("pong")) + }) + nc1.Flush() + _, err = nc2.Request("test", []byte("ping"), 2*time.Second) + if err != nil { + t.Fatalf("Error making request to cluster, got: %s", err) + } + + // + // [B] leaves the cluster AB and stops soliciting from [A] + // [A] is still is part of AB cluster. + // + template = fmt.Sprintf(` + listen: "0.0.0.0:-1" + server_name: "B" + + cluster: { + # name: "AB" + listen: "0.0.0.0:-1" + # routes: [ nats://localhost:%d ] # Route to A + } + `, s1.ClusterAddr().Port) + reloadUpdateConfig(t, s2, conf2, template) + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + if numRoutes := s1.NumRoutes(); numRoutes > 0 { + return fmt.Errorf("Expected %d routes for server %q, got %d", 0, s1.ID(), numRoutes) + } + got = s2.ClusterName() + if got == "AB" || got == "" { + return fmt.Errorf("Expected update cluster name to be new, got %q", got) + } + got = s1.ClusterName() + if got != "AB" { + return fmt.Errorf("Expected update cluster name to be AB, got %q", got) + } + + // [B] leaving AB cluster and stop soliciting should dissolve the cluster. + if numRoutes := s1.NumRoutes(); numRoutes != 0 { + return fmt.Errorf("Expected no routes for server %q, got %d", s1.ID(), numRoutes) + } + if numRoutes := s2.NumRoutes(); numRoutes != 0 { + return fmt.Errorf("Expected no routes for server %q, got %d", s2.ID(), numRoutes) + } + return nil + }) + + // + // [A] leaves cluster AB and goes back to dynamic. + // + reloadUpdateConfig(t, s1, conf1, ` + listen: "0.0.0.0:-1" + server_name: "A" + + cluster: { + listen: "0.0.0.0:-1" + } + `) + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + got = s1.ClusterName() + if got == "AB" || got == "" { + return fmt.Errorf("Expected update cluster name to be new, got %q", got) + } + + return nil + }) + + // + // [B] client request fails since not part of cluster. + // + _, err = nc2.Request("test", []byte("failed ping"), 2*time.Second) + if err == nil { + t.Fatalf("Expected error making a request to cluster.") + } + + // + // [C] solicits from [A] and both form dynamic cluster. + // + template = fmt.Sprintf(` + listen: "0.0.0.0:-1" + server_name: "C" + + cluster: { + listen: "0.0.0.0:-1" + routes: [ nats://localhost:%d ] # Route to A + } + `, s1.ClusterAddr().Port) + + s3, _, conf3 := runReloadServerWithContent(t, []byte(template)) + defer os.Remove(conf3) + defer s3.Shutdown() + checkClusterFormed(t, s1, s3) + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + if numRoutes := s1.NumRoutes(); numRoutes < 1 { + return fmt.Errorf("Expected %d routes for server %q, got %d", 0, s1.ID(), numRoutes) + } + + if s1.ClusterName() != s3.ClusterName() { + return fmt.Errorf("Expected cluster names to be the same: %s != %s", s1.ClusterName(), s3.ClusterName()) + } + + if s2.ClusterName() == s3.ClusterName() { + return fmt.Errorf("Expected cluster names to not be the same: %s == %s", s1.ClusterName(), s3.ClusterName()) + } + return nil + }) + + // + // [C] client makes request to service from [A] which should respond OK. + // + nc3, err := nats.Connect(fmt.Sprintf("nats://%s:%d", "127.0.0.1", s3.Addr().(*net.TCPAddr).Port)) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer nc3.Close() + + _, err = nc3.Request("test", []byte("ping"), 2*time.Second) + if err != nil { + t.Fatalf("Error making request to cluster, got: %s", err) + } +} + +func TestConfigLeafnodeClusterMembershipReload(t *testing.T) { + // + // [GROUND] starts in dynamic mode. + // + s0, _, conf0 := runReloadServerWithContent(t, []byte(` + listen: "0.0.0.0:-1" + server_name: "GROUND" + + leafnodes: { + listen: "0.0.0.0:-1" + } + `)) + defer os.Remove(conf0) + defer s0.Shutdown() + + leafConf := fmt.Sprintf(` + leafnodes: { + remotes [{ url: "nats://localhost:%d" }] + } + `, s0.LeafnodeAddr().Port) + + // + // [A] connects to [GROUND] via leafnode port + // + sA, _, confA := runReloadServerWithContent(t, []byte(` + listen: "0.0.0.0:-1" + server_name: "A" + + cluster: { + listen: "0.0.0.0:-1" + } + `+leafConf)) + defer os.Remove(confA) + defer sA.Shutdown() + + // + // [B] connects to [GROUND] via leafnode port + // + sB, _, confB := runReloadServerWithContent(t, []byte(` + listen: "0.0.0.0:-1" + server_name: "B" + + cluster: { + listen: "0.0.0.0:-1" + } + `+leafConf)) + defer os.Remove(confB) + defer sB.Shutdown() + + // + // [A] client can make requests to service at [B] + // with the request traveling through [GROUND] node. + // + ncA, err := nats.Connect(fmt.Sprintf("nats://localhost:%d", sA.Addr().(*net.TCPAddr).Port)) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer ncA.Close() + + ncB, err := nats.Connect(fmt.Sprintf("nats://localhost:%d", sB.Addr().(*net.TCPAddr).Port)) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer ncB.Close() + + ncB.Subscribe("help", func(msg *nats.Msg) { + msg.Respond([]byte("OK")) + }) + ncB.Flush() + time.Sleep(500 * time.Millisecond) + + makeRequest := func(payload []byte) error { + _, err := ncA.Request("help", payload, 5*time.Second) + if err != nil { + return err + } + return nil + } + + // + // Make roundtrip via leafnode connections. + // + // [A] <-- lid --> [GROUND] <-- lid --> [B] + // + // + err = makeRequest([]byte("VIA LEAFNODE")) + if err != nil { + t.Fatal(err) + } + + // + // [GROUND] should have 2 leaf connections and + // a single message flowing in/out. + // + leafz, err := s0.Leafz(nil) + if err != nil { + t.Fatal(err) + } + if len(leafz.Leafs) != 2 { + t.Errorf("Expected 2 leafs but got %d", len(leafz.Leafs)) + } + for _, leaf := range leafz.Leafs { + got := int(leaf.InMsgs) + expected := 1 + if got != expected { + t.Errorf("Expected: %d, got: %d", expected, got) + } + + got = int(leaf.OutMsgs) + expected = 1 + if got != expected { + t.Errorf("Expected: %d, got: %d", expected, got) + } + + got = int(leaf.NumSubs) + expected = 2 + if got != expected { + t.Errorf("Expected: %d, got: %d", expected, got) + } + } + + // + // [B] solicit from [A] to form together a AB cluster named. + // Since [A] is dynamic, this will make [B] use the explicit AB name. + // + clusterConf := fmt.Sprintf(` + cluster { + name: "AB" + listen: "0.0.0.0:-1" + routes: [ nats://localhost:%d ] + } + `, sA.ClusterAddr().Port) + + reloadUpdateConfig(t, sB, confB, ` + listen: "0.0.0.0:-1" + server_name: "B" + `+clusterConf+leafConf) + checkClusterFormed(t, sA, sB) + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + if sA.ClusterName() != "AB" || sB.ClusterName() != "AB" { + return fmt.Errorf("Expected clustername of \"AB\", got %q and %q", sA.ClusterName(), sB.ClusterName()) + } + return nil + }) + + // + // The cluster renaming should have caused a reconnection + // so stats will be reset. + // + checkNoChangeInLeafStats := func(t *testing.T) { + t.Helper() + leafz, err = s0.Leafz(nil) + if err != nil { + t.Fatal(err) + } + for _, leaf := range leafz.Leafs { + got := int(leaf.InMsgs) + expected := 0 + if got != expected { + t.Errorf("Expected: %d InMsgs, got: %d", expected, got) + } + + got = int(leaf.OutMsgs) + expected = 0 + if got != expected { + t.Errorf("Expected: %d OutMsgs, got: %d", expected, got) + } + + got = int(leaf.NumSubs) + expected = 2 + if got != expected { + t.Errorf("Expected: %d NumSubs, got: %d", expected, got) + } + } + } + checkNoChangeInLeafStats(t) + + connzB, err := sB.Connz(nil) + if err != nil { + t.Fatal(err) + } + if len(connzB.Conns) != 1 || connzB.Conns[0].InMsgs != 1 || connzB.Conns[0].OutMsgs != 1 { + t.Fatal("Expected connection to node B to receive messages.") + } + + // + // [A] to [B] roundtrip should be now via the cluster routes, + // not the leafnode connections through the [GROUND] node. + // + err = makeRequest([]byte("AFTER CLUSTER FORMED")) + if err != nil { + t.Fatal(err) + } + time.Sleep(500 * time.Millisecond) + connzB, err = sB.Connz(nil) + if err != nil { + t.Fatal(err) + } + if len(connzB.Conns) != 1 || connzB.Conns[0].InMsgs != 2 || connzB.Conns[0].OutMsgs != 2 { + t.Fatal("Expected connection to node B to receive messages.") + } + + // [B] stops soliciting routes from [A] and goes back to dynamic. + // This will cause another leafnode reconnection. + reloadUpdateConfig(t, sB, confB, ` + listen: "0.0.0.0:-1" + server_name: "B" + + cluster: { + listen: "0.0.0.0:-1" + } + `+leafConf) + + // Confirm that there are no routes to both servers and + // that the cluster name has changed. + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { + if numRoutes := sA.NumRoutes(); numRoutes != 0 { + return fmt.Errorf("Expected no routes for server, got %d", numRoutes) + } + if numRoutes := sB.NumRoutes(); numRoutes != 0 { + return fmt.Errorf("Expected no routes for server, got %d", numRoutes) + } + + nameB := sB.ClusterName() + if nameB == "AB" || nameB == "" { + return fmt.Errorf("Expected clustername to change, got %q", nameB) + } + + // Wait for leafnode connections to reconnect. + leafz, _ = s0.Leafz(nil) + if len(leafz.Leafs) < 2 { + return fmt.Errorf("Expected 2 leafnode connections, got: %d", len(leafz.Leafs)) + } + + return nil + }) + + // + // [A] cluster name will still be AB even though it started with dynamic name. + // + if sA.ClusterName() != "AB" { + t.Errorf("Expected clustername to be AB, got %q", sA.ClusterName()) + } + + // New request should have been through the leafnode again, + // all using a dynamic cluster name on each side. + // + // [A] <-- lid --> [GROUND] <-- lid --> [B] + // + err = makeRequest([]byte("REQUEST VIA LEAFNODE AGAIN")) + if err != nil { + t.Fatalf("Expected response via leafnode, got: %s", err) + } +} + func TestConfigReloadMaxSubsUnsupported(t *testing.T) { s, _, conf := runReloadServerWithContent(t, []byte(`max_subs: 1`)) defer os.Remove(conf) diff --git a/server/server.go b/server/server.go index 71850eb5..113b2ca8 100644 --- a/server/server.go +++ b/server/server.go @@ -436,8 +436,10 @@ func (s *Server) setClusterName(name string) { } s.info.Cluster = name s.routeInfo.Cluster = name + // Regenerate the info byte array s.generateRouteInfoJSON() + // Need to close solicited leaf nodes. The close has to be done outside of the server lock. var leafs []*client for _, c := range s.leafs { @@ -447,6 +449,7 @@ func (s *Server) setClusterName(name string) { } c.mu.Unlock() } + s.mu.Unlock() for _, l := range leafs { l.closeConnection(ClusterNameConflict) @@ -2483,6 +2486,16 @@ func (s *Server) ProfilerAddr() *net.TCPAddr { return s.profiler.Addr().(*net.TCPAddr) } +// LeafnodeAddr returns the net.Addr object for the leafnode listener. +func (s *Server) LeafnodeAddr() *net.TCPAddr { + s.mu.Lock() + defer s.mu.Unlock() + if s.leafNodeListener == nil { + return nil + } + return s.leafNodeListener.Addr().(*net.TCPAddr) +} + // ReadyForConnections returns `true` if the server is ready to accept clients // and, if routing is enabled, route connections. If after the duration // `dur` the server is still not ready, returns `false`.