Allow cluster name to be dynamic after unset on reload

In case the cluster name is removed on a config reload,
the members of that cluster should renegotiate a new
dynamic cluster name in case an explicit cluster name
was not set.

This also adds logic so that in case membership from
a cluster has changed after the reload, the server
disconnects the routes that are no longer considered
to form part of the same cluster.

Signed-off-by: Waldemar Quevedo <wally@synadia.com>
This commit is contained in:
Waldemar Quevedo
2020-07-13 22:29:46 -07:00
parent 4d0e50209c
commit 4a45c4818b
3 changed files with 516 additions and 8 deletions

View File

@@ -25,6 +25,7 @@ import (
"time"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nuid"
)
// FlagSnapshot captures the server options as specified by CLI flags at
@@ -54,9 +55,13 @@ type option interface {
// cluster permissions.
IsClusterPermsChange() bool
// IsJetStreamChange inidicates a change in the servers config for JetStream.
// IsJetStreamChange indicates a change in the servers config for JetStream.
// Account changes will be handled separately in reloadAuthorization.
IsJetStreamChange() bool
// IsClusterNameChange indicates that the cluster name has changed which might
// implicate changes in the cluster membership of the server.
IsClusterNameChange() bool
}
// noopOption is a base struct that provides default no-op behaviors.
@@ -82,6 +87,10 @@ func (n noopOption) IsJetStreamChange() bool {
return false
}
func (n noopOption) IsClusterNameChange() bool {
return false
}
// loggingOption is a base struct that provides default option behaviors for
// logging-related options.
type loggingOption struct {
@@ -292,8 +301,10 @@ func (u *nkeysOption) Apply(server *Server) {
// clusterOption implements the option interface for the `cluster` setting.
type clusterOption struct {
authOption
oldValue ClusterOpts
newValue ClusterOpts
permsChanged bool
nameChanged bool
}
// Apply the cluster change.
@@ -313,10 +324,22 @@ func (c *clusterOption) Apply(s *Server) {
}
s.setRouteInfoHostPortAndIP()
s.mu.Unlock()
if c.newValue.Name != "" && c.newValue.Name != s.ClusterName() {
// Check whether the cluster name has been removed or changed as that could affect a cluster membership.
switch {
case c.oldValue.Name != "" && c.newValue.Name == "":
// NOTE: If this node is still part of the static routes from a node that has an explicit cluster name
// then this new generated dynamic name will be overridden by the remote one when it sends CONNECT again.
s.setClusterName(nuid.Next())
c.nameChanged = true
case c.newValue.Name != "" && c.oldValue.Name != c.newValue.Name:
// Use the new explicit cluster name from the config.
s.setClusterName(c.newValue.Name)
c.nameChanged = true
}
s.Noticef("Reloaded: cluster")
if tlsRequired && c.newValue.TLSConfig.InsecureSkipVerify {
s.Warnf(clusterTLSInsecureWarning)
}
@@ -326,6 +349,10 @@ func (c *clusterOption) IsClusterPermsChange() bool {
return c.permsChanged
}
func (c *clusterOption) IsClusterNameChange() bool {
return c.nameChanged
}
// routesOption implements the option interface for the cluster `routes`
// setting.
type routesOption struct {
@@ -847,7 +874,7 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
return nil, err
}
permsChanged := !reflect.DeepEqual(newClusterOpts.Permissions, oldClusterOpts.Permissions)
diffOpts = append(diffOpts, &clusterOption{newValue: newClusterOpts, permsChanged: permsChanged})
diffOpts = append(diffOpts, &clusterOption{oldValue: oldClusterOpts, newValue: newClusterOpts, permsChanged: permsChanged})
case "routes":
add, remove := diffRoutes(oldValue.([]*url.URL), newValue.([]*url.URL))
diffOpts = append(diffOpts, &routesOption{add: add, remove: remove})
@@ -1018,6 +1045,7 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) {
reloadAuth = false
reloadClusterPerms = false
reloadClientTrcLvl = false
reloadClusterName = false
)
for _, opt := range opts {
opt.Apply(s)
@@ -1033,6 +1061,9 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) {
if opt.IsClusterPermsChange() {
reloadClusterPerms = true
}
if opt.IsClusterNameChange() {
reloadClusterName = true
}
}
if reloadLogging {
@@ -1047,6 +1078,9 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) {
if reloadClusterPerms {
s.reloadClusterPermissions(ctx.oldClusterPerms)
}
if reloadClusterName {
s.reloadClusterName()
}
s.Noticef("Reloaded server configuration")
}
@@ -1100,6 +1134,20 @@ func (s *Server) reloadClientTraceLevel() {
}
}
// reloadClusterName detects cluster membership changes triggered
// due to a reload where the name changes.
func (s *Server) reloadClusterName() {
s.mu.Lock()
// Get all connected routes and notify the cluster rename.
infoJSON := s.routeInfoJSON
for _, route := range s.routes {
route.mu.Lock()
route.enqueueProto(infoJSON)
route.mu.Unlock()
}
s.mu.Unlock()
}
// reloadAuthorization reconfigures the server authorization settings,
// disconnects any clients who are no longer authorized, and removes any
// unauthorized subscriptions.

View File

@@ -1652,7 +1652,8 @@ func TestConfigReloadClusterAdvertise(t *testing.T) {
orgClusterPort := s.ClusterAddr().Port
verify := func(expectedHost string, expectedPort int, expectedIP string) {
verify := func(t *testing.T, expectedHost string, expectedPort int, expectedIP string) {
t.Helper()
s.mu.Lock()
routeInfo := s.routeInfo
routeInfoJSON := Info{}
@@ -1679,7 +1680,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) {
cluster_advertise: "me:1"
}
`)
verify("me", 1, "nats-route://me:1/")
verify(t, "me", 1, "nats-route://me:1/")
// Update config with cluster_advertise (no port specified)
reloadUpdateConfig(t, s, conf, `
@@ -1689,7 +1690,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) {
cluster_advertise: "me"
}
`)
verify("me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort))
verify(t, "me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort))
// Update config with cluster_advertise (-1 port specified)
reloadUpdateConfig(t, s, conf, `
@@ -1699,7 +1700,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) {
cluster_advertise: "me:-1"
}
`)
verify("me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort))
verify(t, "me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort))
// Update to remove cluster_advertise
reloadUpdateConfig(t, s, conf, `
@@ -1708,7 +1709,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) {
listen: "0.0.0.0:-1"
}
`)
verify("0.0.0.0", orgClusterPort, "")
verify(t, "0.0.0.0", orgClusterPort, "")
}
func TestConfigReloadClusterNoAdvertise(t *testing.T) {
@@ -1787,6 +1788,452 @@ func TestConfigReloadClusterName(t *testing.T) {
}
}
func TestConfigClusterMembershipReload(t *testing.T) {
//
// [A] starts in dynamic mode.
//
s1, _, conf1 := runReloadServerWithContent(t, []byte(`
listen: "0.0.0.0:-1"
server_name: "A"
cluster: {
listen: "0.0.0.0:-1"
}
`))
defer os.Remove(conf1)
defer s1.Shutdown()
got := s1.ClusterName()
if got == "" {
t.Fatalf("Expected update clustername to be set dynamically")
}
//
// [A] joins AB cluster with explicit name.
//
reloadUpdateConfig(t, s1, conf1, `
listen: "0.0.0.0:-1"
server_name: "A"
cluster: {
name: "AB"
listen: "0.0.0.0:-1"
}
`)
if s1.ClusterName() != "AB" {
t.Fatalf("Expected update clustername of \"AB\", got %q", s1.ClusterName())
}
//
// [B] joins AB cluster with explicit name.
//
template := fmt.Sprintf(`
listen: "0.0.0.0:-1"
server_name: "B"
cluster: {
name: "AB"
listen: "0.0.0.0:-1"
routes: [ nats://localhost:%d ] # Route to A
}
`, s1.ClusterAddr().Port)
s2, _, conf2 := runReloadServerWithContent(t, []byte(template))
defer os.Remove(conf2)
defer s2.Shutdown()
checkClusterFormed(t, s1, s2)
//
// [A] client sends request to [B] client and should be able to respond.
//
nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%d", "127.0.0.1", s1.Addr().(*net.TCPAddr).Port))
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer nc1.Close()
nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%d", "127.0.0.1", s2.Addr().(*net.TCPAddr).Port))
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer nc2.Close()
nc1.Subscribe("test", func(m *nats.Msg) {
m.Respond([]byte("pong"))
})
nc1.Flush()
_, err = nc2.Request("test", []byte("ping"), 2*time.Second)
if err != nil {
t.Fatalf("Error making request to cluster, got: %s", err)
}
//
// [B] leaves the cluster AB and stops soliciting from [A]
// [A] is still is part of AB cluster.
//
template = fmt.Sprintf(`
listen: "0.0.0.0:-1"
server_name: "B"
cluster: {
# name: "AB"
listen: "0.0.0.0:-1"
# routes: [ nats://localhost:%d ] # Route to A
}
`, s1.ClusterAddr().Port)
reloadUpdateConfig(t, s2, conf2, template)
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
if numRoutes := s1.NumRoutes(); numRoutes > 0 {
return fmt.Errorf("Expected %d routes for server %q, got %d", 0, s1.ID(), numRoutes)
}
got = s2.ClusterName()
if got == "AB" || got == "" {
return fmt.Errorf("Expected update cluster name to be new, got %q", got)
}
got = s1.ClusterName()
if got != "AB" {
return fmt.Errorf("Expected update cluster name to be AB, got %q", got)
}
// [B] leaving AB cluster and stop soliciting should dissolve the cluster.
if numRoutes := s1.NumRoutes(); numRoutes != 0 {
return fmt.Errorf("Expected no routes for server %q, got %d", s1.ID(), numRoutes)
}
if numRoutes := s2.NumRoutes(); numRoutes != 0 {
return fmt.Errorf("Expected no routes for server %q, got %d", s2.ID(), numRoutes)
}
return nil
})
//
// [A] leaves cluster AB and goes back to dynamic.
//
reloadUpdateConfig(t, s1, conf1, `
listen: "0.0.0.0:-1"
server_name: "A"
cluster: {
listen: "0.0.0.0:-1"
}
`)
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
got = s1.ClusterName()
if got == "AB" || got == "" {
return fmt.Errorf("Expected update cluster name to be new, got %q", got)
}
return nil
})
//
// [B] client request fails since not part of cluster.
//
_, err = nc2.Request("test", []byte("failed ping"), 2*time.Second)
if err == nil {
t.Fatalf("Expected error making a request to cluster.")
}
//
// [C] solicits from [A] and both form dynamic cluster.
//
template = fmt.Sprintf(`
listen: "0.0.0.0:-1"
server_name: "C"
cluster: {
listen: "0.0.0.0:-1"
routes: [ nats://localhost:%d ] # Route to A
}
`, s1.ClusterAddr().Port)
s3, _, conf3 := runReloadServerWithContent(t, []byte(template))
defer os.Remove(conf3)
defer s3.Shutdown()
checkClusterFormed(t, s1, s3)
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
if numRoutes := s1.NumRoutes(); numRoutes < 1 {
return fmt.Errorf("Expected %d routes for server %q, got %d", 0, s1.ID(), numRoutes)
}
if s1.ClusterName() != s3.ClusterName() {
return fmt.Errorf("Expected cluster names to be the same: %s != %s", s1.ClusterName(), s3.ClusterName())
}
if s2.ClusterName() == s3.ClusterName() {
return fmt.Errorf("Expected cluster names to not be the same: %s == %s", s1.ClusterName(), s3.ClusterName())
}
return nil
})
//
// [C] client makes request to service from [A] which should respond OK.
//
nc3, err := nats.Connect(fmt.Sprintf("nats://%s:%d", "127.0.0.1", s3.Addr().(*net.TCPAddr).Port))
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer nc3.Close()
_, err = nc3.Request("test", []byte("ping"), 2*time.Second)
if err != nil {
t.Fatalf("Error making request to cluster, got: %s", err)
}
}
func TestConfigLeafnodeClusterMembershipReload(t *testing.T) {
//
// [GROUND] starts in dynamic mode.
//
s0, _, conf0 := runReloadServerWithContent(t, []byte(`
listen: "0.0.0.0:-1"
server_name: "GROUND"
leafnodes: {
listen: "0.0.0.0:-1"
}
`))
defer os.Remove(conf0)
defer s0.Shutdown()
leafConf := fmt.Sprintf(`
leafnodes: {
remotes [{ url: "nats://localhost:%d" }]
}
`, s0.LeafnodeAddr().Port)
//
// [A] connects to [GROUND] via leafnode port
//
sA, _, confA := runReloadServerWithContent(t, []byte(`
listen: "0.0.0.0:-1"
server_name: "A"
cluster: {
listen: "0.0.0.0:-1"
}
`+leafConf))
defer os.Remove(confA)
defer sA.Shutdown()
//
// [B] connects to [GROUND] via leafnode port
//
sB, _, confB := runReloadServerWithContent(t, []byte(`
listen: "0.0.0.0:-1"
server_name: "B"
cluster: {
listen: "0.0.0.0:-1"
}
`+leafConf))
defer os.Remove(confB)
defer sB.Shutdown()
//
// [A] client can make requests to service at [B]
// with the request traveling through [GROUND] node.
//
ncA, err := nats.Connect(fmt.Sprintf("nats://localhost:%d", sA.Addr().(*net.TCPAddr).Port))
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer ncA.Close()
ncB, err := nats.Connect(fmt.Sprintf("nats://localhost:%d", sB.Addr().(*net.TCPAddr).Port))
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer ncB.Close()
ncB.Subscribe("help", func(msg *nats.Msg) {
msg.Respond([]byte("OK"))
})
ncB.Flush()
time.Sleep(500 * time.Millisecond)
makeRequest := func(payload []byte) error {
_, err := ncA.Request("help", payload, 5*time.Second)
if err != nil {
return err
}
return nil
}
//
// Make roundtrip via leafnode connections.
//
// [A] <-- lid --> [GROUND] <-- lid --> [B]
//
//
err = makeRequest([]byte("VIA LEAFNODE"))
if err != nil {
t.Fatal(err)
}
//
// [GROUND] should have 2 leaf connections and
// a single message flowing in/out.
//
leafz, err := s0.Leafz(nil)
if err != nil {
t.Fatal(err)
}
if len(leafz.Leafs) != 2 {
t.Errorf("Expected 2 leafs but got %d", len(leafz.Leafs))
}
for _, leaf := range leafz.Leafs {
got := int(leaf.InMsgs)
expected := 1
if got != expected {
t.Errorf("Expected: %d, got: %d", expected, got)
}
got = int(leaf.OutMsgs)
expected = 1
if got != expected {
t.Errorf("Expected: %d, got: %d", expected, got)
}
got = int(leaf.NumSubs)
expected = 2
if got != expected {
t.Errorf("Expected: %d, got: %d", expected, got)
}
}
//
// [B] solicit from [A] to form together a AB cluster named.
// Since [A] is dynamic, this will make [B] use the explicit AB name.
//
clusterConf := fmt.Sprintf(`
cluster {
name: "AB"
listen: "0.0.0.0:-1"
routes: [ nats://localhost:%d ]
}
`, sA.ClusterAddr().Port)
reloadUpdateConfig(t, sB, confB, `
listen: "0.0.0.0:-1"
server_name: "B"
`+clusterConf+leafConf)
checkClusterFormed(t, sA, sB)
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
if sA.ClusterName() != "AB" || sB.ClusterName() != "AB" {
return fmt.Errorf("Expected clustername of \"AB\", got %q and %q", sA.ClusterName(), sB.ClusterName())
}
return nil
})
//
// The cluster renaming should have caused a reconnection
// so stats will be reset.
//
checkNoChangeInLeafStats := func(t *testing.T) {
t.Helper()
leafz, err = s0.Leafz(nil)
if err != nil {
t.Fatal(err)
}
for _, leaf := range leafz.Leafs {
got := int(leaf.InMsgs)
expected := 0
if got != expected {
t.Errorf("Expected: %d InMsgs, got: %d", expected, got)
}
got = int(leaf.OutMsgs)
expected = 0
if got != expected {
t.Errorf("Expected: %d OutMsgs, got: %d", expected, got)
}
got = int(leaf.NumSubs)
expected = 2
if got != expected {
t.Errorf("Expected: %d NumSubs, got: %d", expected, got)
}
}
}
checkNoChangeInLeafStats(t)
connzB, err := sB.Connz(nil)
if err != nil {
t.Fatal(err)
}
if len(connzB.Conns) != 1 || connzB.Conns[0].InMsgs != 1 || connzB.Conns[0].OutMsgs != 1 {
t.Fatal("Expected connection to node B to receive messages.")
}
//
// [A] to [B] roundtrip should be now via the cluster routes,
// not the leafnode connections through the [GROUND] node.
//
err = makeRequest([]byte("AFTER CLUSTER FORMED"))
if err != nil {
t.Fatal(err)
}
time.Sleep(500 * time.Millisecond)
connzB, err = sB.Connz(nil)
if err != nil {
t.Fatal(err)
}
if len(connzB.Conns) != 1 || connzB.Conns[0].InMsgs != 2 || connzB.Conns[0].OutMsgs != 2 {
t.Fatal("Expected connection to node B to receive messages.")
}
// [B] stops soliciting routes from [A] and goes back to dynamic.
// This will cause another leafnode reconnection.
reloadUpdateConfig(t, sB, confB, `
listen: "0.0.0.0:-1"
server_name: "B"
cluster: {
listen: "0.0.0.0:-1"
}
`+leafConf)
// Confirm that there are no routes to both servers and
// that the cluster name has changed.
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
if numRoutes := sA.NumRoutes(); numRoutes != 0 {
return fmt.Errorf("Expected no routes for server, got %d", numRoutes)
}
if numRoutes := sB.NumRoutes(); numRoutes != 0 {
return fmt.Errorf("Expected no routes for server, got %d", numRoutes)
}
nameB := sB.ClusterName()
if nameB == "AB" || nameB == "" {
return fmt.Errorf("Expected clustername to change, got %q", nameB)
}
// Wait for leafnode connections to reconnect.
leafz, _ = s0.Leafz(nil)
if len(leafz.Leafs) < 2 {
return fmt.Errorf("Expected 2 leafnode connections, got: %d", len(leafz.Leafs))
}
return nil
})
//
// [A] cluster name will still be AB even though it started with dynamic name.
//
if sA.ClusterName() != "AB" {
t.Errorf("Expected clustername to be AB, got %q", sA.ClusterName())
}
// New request should have been through the leafnode again,
// all using a dynamic cluster name on each side.
//
// [A] <-- lid --> [GROUND] <-- lid --> [B]
//
err = makeRequest([]byte("REQUEST VIA LEAFNODE AGAIN"))
if err != nil {
t.Fatalf("Expected response via leafnode, got: %s", err)
}
}
func TestConfigReloadMaxSubsUnsupported(t *testing.T) {
s, _, conf := runReloadServerWithContent(t, []byte(`max_subs: 1`))
defer os.Remove(conf)

View File

@@ -436,8 +436,10 @@ func (s *Server) setClusterName(name string) {
}
s.info.Cluster = name
s.routeInfo.Cluster = name
// Regenerate the info byte array
s.generateRouteInfoJSON()
// Need to close solicited leaf nodes. The close has to be done outside of the server lock.
var leafs []*client
for _, c := range s.leafs {
@@ -447,6 +449,7 @@ func (s *Server) setClusterName(name string) {
}
c.mu.Unlock()
}
s.mu.Unlock()
for _, l := range leafs {
l.closeConnection(ClusterNameConflict)
@@ -2483,6 +2486,16 @@ func (s *Server) ProfilerAddr() *net.TCPAddr {
return s.profiler.Addr().(*net.TCPAddr)
}
// LeafnodeAddr returns the net.Addr object for the leafnode listener.
func (s *Server) LeafnodeAddr() *net.TCPAddr {
s.mu.Lock()
defer s.mu.Unlock()
if s.leafNodeListener == nil {
return nil
}
return s.leafNodeListener.Addr().(*net.TCPAddr)
}
// ReadyForConnections returns `true` if the server is ready to accept clients
// and, if routing is enabled, route connections. If after the duration
// `dur` the server is still not ready, returns `false`.