Add support for cluster name changes from static to dynamic

Signed-off-by: Waldemar Quevedo <wally@synadia.com>
This commit is contained in:
Waldemar Quevedo
2020-08-25 19:59:10 -07:00
parent 6d06e393ee
commit 5c6731a2b3
6 changed files with 770 additions and 40 deletions

View File

@@ -1635,7 +1635,8 @@ func TestLeafNodeOriginClusterInfo(t *testing.T) {
// Now make sure that if we update our cluster name, simulating the settling
// of dynamic cluster names between competing servers.
s.setClusterName("xyz")
s.setClusterName("xyz", false)
// Make sure we disconnect and reconnect.
checkLeafNodeConnectedCount(t, s, 0)
checkLeafNodeConnected(t, s)

View File

@@ -1337,6 +1337,7 @@ func (s *Server) updateVarzRuntimeFields(v *Varz, forceUpdate bool, pcpu float64
for key, val := range s.httpReqStats {
v.HTTPReqStats[key] = val
}
v.Cluster.Name = s.info.Cluster
// Update Gateway remote urls if applicable
gw := s.gateway

View File

@@ -54,9 +54,13 @@ type option interface {
// cluster permissions.
IsClusterPermsChange() bool
// IsJetStreamChange inidicates a change in the servers config for JetStream.
// IsJetStreamChange indicates a change in the servers config for JetStream.
// Account changes will be handled separately in reloadAuthorization.
IsJetStreamChange() bool
// IsClusterNameChange indicates that the cluster name has changed which might
// implicate changes in the cluster membership of the server.
IsClusterNameChange() bool
}
// noopOption is a base struct that provides default no-op behaviors.
@@ -82,6 +86,10 @@ func (n noopOption) IsJetStreamChange() bool {
return false
}
func (n noopOption) IsClusterNameChange() bool {
return false
}
// loggingOption is a base struct that provides default option behaviors for
// logging-related options.
type loggingOption struct {
@@ -292,8 +300,10 @@ func (u *nkeysOption) Apply(server *Server) {
// clusterOption implements the option interface for the `cluster` setting.
type clusterOption struct {
authOption
oldValue ClusterOpts
newValue ClusterOpts
permsChanged bool
nameChanged bool
}
// Apply the cluster change.
@@ -313,8 +323,17 @@ func (c *clusterOption) Apply(s *Server) {
}
s.setRouteInfoHostPortAndIP()
s.mu.Unlock()
if c.newValue.Name != "" && c.newValue.Name != s.ClusterName() {
s.setClusterName(c.newValue.Name)
// Check whether the cluster name has been removed or changed
// as that could affect a cluster membership.
switch {
case c.oldValue.Name != "" && c.newValue.Name == "":
s.setClusterName(s.defaultClusterName, true)
c.nameChanged = true
case c.newValue.Name != "" && c.oldValue.Name != c.newValue.Name:
// Use the new explicit cluster name from the config.
s.setClusterName(c.newValue.Name, false)
c.nameChanged = true
}
s.Noticef("Reloaded: cluster")
if tlsRequired && c.newValue.TLSConfig.InsecureSkipVerify {
@@ -326,6 +345,10 @@ func (c *clusterOption) IsClusterPermsChange() bool {
return c.permsChanged
}
func (c *clusterOption) IsClusterNameChange() bool {
return c.nameChanged
}
// routesOption implements the option interface for the cluster `routes`
// setting.
type routesOption struct {
@@ -847,7 +870,7 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
return nil, err
}
permsChanged := !reflect.DeepEqual(newClusterOpts.Permissions, oldClusterOpts.Permissions)
diffOpts = append(diffOpts, &clusterOption{newValue: newClusterOpts, permsChanged: permsChanged})
diffOpts = append(diffOpts, &clusterOption{oldValue: oldClusterOpts, newValue: newClusterOpts, permsChanged: permsChanged})
case "routes":
add, remove := diffRoutes(oldValue.([]*url.URL), newValue.([]*url.URL))
diffOpts = append(diffOpts, &routesOption{add: add, remove: remove})
@@ -1018,6 +1041,7 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) {
reloadAuth = false
reloadClusterPerms = false
reloadClientTrcLvl = false
reloadClusterName = false
)
for _, opt := range opts {
opt.Apply(s)
@@ -1033,6 +1057,9 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) {
if opt.IsClusterPermsChange() {
reloadClusterPerms = true
}
if opt.IsClusterNameChange() {
reloadClusterName = true
}
}
if reloadLogging {
@@ -1047,6 +1074,9 @@ func (s *Server) applyOptions(ctx *reloadContext, opts []option) {
if reloadClusterPerms {
s.reloadClusterPermissions(ctx.oldClusterPerms)
}
if reloadClusterName {
s.reloadClusterName()
}
s.Noticef("Reloaded server configuration")
}
@@ -1100,6 +1130,24 @@ func (s *Server) reloadClientTraceLevel() {
}
}
// reloadClusterName detects cluster membership changes triggered
// due to a reload where the name changes.
func (s *Server) reloadClusterName() {
s.mu.Lock()
// Get all connected routes and notify the cluster rename.
infoJSON := s.routeInfoJSON
for _, route := range s.routes {
route.mu.Lock()
route.enqueueProto(infoJSON)
route.mu.Unlock()
}
// Reloading without an explicit cluster name makes a server
// be susceptible to dynamic cluster name changes.
s.susceptible = s.opts.Cluster.Name == ""
s.mu.Unlock()
}
// reloadAuthorization reconfigures the server authorization settings,
// disconnects any clients who are no longer authorized, and removes any
// unauthorized subscriptions.

View File

@@ -1652,7 +1652,8 @@ func TestConfigReloadClusterAdvertise(t *testing.T) {
orgClusterPort := s.ClusterAddr().Port
verify := func(expectedHost string, expectedPort int, expectedIP string) {
verify := func(t *testing.T, expectedHost string, expectedPort int, expectedIP string) {
t.Helper()
s.mu.Lock()
routeInfo := s.routeInfo
routeInfoJSON := Info{}
@@ -1679,7 +1680,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) {
cluster_advertise: "me:1"
}
`)
verify("me", 1, "nats-route://me:1/")
verify(t, "me", 1, "nats-route://me:1/")
// Update config with cluster_advertise (no port specified)
reloadUpdateConfig(t, s, conf, `
@@ -1689,7 +1690,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) {
cluster_advertise: "me"
}
`)
verify("me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort))
verify(t, "me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort))
// Update config with cluster_advertise (-1 port specified)
reloadUpdateConfig(t, s, conf, `
@@ -1699,7 +1700,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) {
cluster_advertise: "me:-1"
}
`)
verify("me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort))
verify(t, "me", orgClusterPort, fmt.Sprintf("nats-route://me:%d/", orgClusterPort))
// Update to remove cluster_advertise
reloadUpdateConfig(t, s, conf, `
@@ -1708,7 +1709,7 @@ func TestConfigReloadClusterAdvertise(t *testing.T) {
listen: "0.0.0.0:-1"
}
`)
verify("0.0.0.0", orgClusterPort, "")
verify(t, "0.0.0.0", orgClusterPort, "")
}
func TestConfigReloadClusterNoAdvertise(t *testing.T) {
@@ -1787,6 +1788,519 @@ func TestConfigReloadClusterName(t *testing.T) {
}
}
func TestConfigClusterMembershipReload(t *testing.T) {
//
// [A] starts in dynamic mode.
//
s1, _, conf1 := runReloadServerWithContent(t, []byte(`
listen: "0.0.0.0:-1"
server_name: "A"
cluster: {
listen: "0.0.0.0:-1"
}
`))
defer os.Remove(conf1)
defer s1.Shutdown()
got := s1.ClusterName()
if got == "" {
t.Fatalf("Expected update clustername to be set dynamically")
}
//
// [A] joins AB cluster with explicit name.
//
reloadUpdateConfig(t, s1, conf1, `
listen: "0.0.0.0:-1"
server_name: "A"
cluster: {
name: "AB"
listen: "0.0.0.0:-1"
}
`)
if s1.ClusterName() != "AB" {
t.Fatalf("Expected update clustername of \"AB\", got %q", s1.ClusterName())
}
//
// [B] joins AB cluster with explicit name.
//
template := fmt.Sprintf(`
listen: "0.0.0.0:-1"
server_name: "B"
cluster: {
name: "AB"
listen: "0.0.0.0:-1"
routes: [ nats://localhost:%d ] # Route to A
}
`, s1.ClusterAddr().Port)
s2, _, conf2 := runReloadServerWithContent(t, []byte(template))
defer os.Remove(conf2)
defer s2.Shutdown()
checkClusterFormed(t, s1, s2)
//
// [A] client sends request to [B] client and should be able to respond.
//
nc1, err := nats.Connect(fmt.Sprintf("nats://%s:%d", "127.0.0.1", s1.Addr().(*net.TCPAddr).Port))
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer nc1.Close()
nc2, err := nats.Connect(fmt.Sprintf("nats://%s:%d", "127.0.0.1", s2.Addr().(*net.TCPAddr).Port))
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer nc2.Close()
nc1.Subscribe("test", func(m *nats.Msg) {
m.Respond([]byte("pong"))
})
nc1.Flush()
_, err = nc2.Request("test", []byte("ping"), 2*time.Second)
if err != nil {
t.Fatalf("Error making request to cluster, got: %s", err)
}
//
// [B] leaves the cluster AB and stops soliciting from [A],
// [A] is still is part of AB cluster with explicit name at this point.
//
template = fmt.Sprintf(`
listen: "0.0.0.0:-1"
server_name: "B"
cluster: {
# name: "AB"
listen: "0.0.0.0:-1"
# routes: [ nats://localhost:%d ] # Route to A is gone
}
`, s1.ClusterAddr().Port)
reloadUpdateConfig(t, s2, conf2, template)
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
if numRoutes := s1.NumRoutes(); numRoutes > 0 {
return fmt.Errorf("Expected %d routes for server %q, got %d", 0, s1.ID(), numRoutes)
}
got = s2.ClusterName()
if got == "AB" || got == "" {
return fmt.Errorf("Expected update cluster name to be new, got %q", got)
}
got = s1.ClusterName()
if got != "AB" {
return fmt.Errorf("Expected update cluster name to be AB, got %q", got)
}
// [B] leaving AB cluster and stop soliciting should dissolve the cluster.
if numRoutes := s1.NumRoutes(); numRoutes != 0 {
return fmt.Errorf("Expected no routes for server %q, got %d", s1.ID(), numRoutes)
}
if numRoutes := s2.NumRoutes(); numRoutes != 0 {
return fmt.Errorf("Expected no routes for server %q, got %d", s2.ID(), numRoutes)
}
return nil
})
//
// [A] leaves cluster AB and goes back to dynamic.
//
reloadUpdateConfig(t, s1, conf1, `
listen: "0.0.0.0:-1"
server_name: "A"
cluster: {
listen: "0.0.0.0:-1"
}
`)
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
got = s1.ClusterName()
if got == "AB" || got == "" {
return fmt.Errorf("Expected update cluster name to be new, got %q", got)
}
return nil
})
//
// [B] client request fails since not part of cluster.
//
_, err = nc2.Request("test", []byte("failed ping"), 2*time.Second)
if err == nil {
t.Fatalf("Expected error making a request to cluster.")
}
//
// [C] solicits from [A] and both form a dynamic cluster.
//
template = fmt.Sprintf(`
listen: "0.0.0.0:-1"
server_name: "C"
cluster: {
listen: "0.0.0.0:-1"
routes: [ nats://localhost:%d ] # Route to A
}
`, s1.ClusterAddr().Port)
s3, _, conf3 := runReloadServerWithContent(t, []byte(template))
defer os.Remove(conf3)
defer s3.Shutdown()
checkClusterFormed(t, s1, s3)
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
if numRoutes := s1.NumRoutes(); numRoutes < 1 {
return fmt.Errorf("Expected %d routes for server %q, got %d", 0, s1.ID(), numRoutes)
}
if s1.ClusterName() != s3.ClusterName() {
return fmt.Errorf("Expected cluster names to be the same: %s != %s", s1.ClusterName(), s3.ClusterName())
}
if s2.ClusterName() == s3.ClusterName() {
return fmt.Errorf("Expected cluster names to not be the same: %s == %s", s1.ClusterName(), s3.ClusterName())
}
return nil
})
//
// [C] client makes request to service from [A] which should respond OK.
//
nc3, err := nats.Connect(fmt.Sprintf("nats://%s:%d", "127.0.0.1", s3.Addr().(*net.TCPAddr).Port))
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer nc3.Close()
_, err = nc3.Request("test", []byte("ping"), 2*time.Second)
if err != nil {
t.Fatalf("Error making request to cluster, got: %s", err)
}
}
func TestConfigSolicitClusterMembershipReload(t *testing.T) {
//
// [A] starts in dynamic mode.
//
sA, _, confA := runReloadServerWithContent(t, []byte(`
listen: "0.0.0.0:-1"
server_name: "A"
cluster: {
listen: "0.0.0.0:-1"
}
`))
defer os.Remove(confA)
defer sA.Shutdown()
if sA.ClusterName() == "" {
t.Fatalf("Expected update cluster name to be set dynamically")
}
//
// [B] starts in dynamic mode.
//
sB, _, confB := runReloadServerWithContent(t, []byte(`
listen: "0.0.0.0:-1"
server_name: "B"
cluster: {
listen: "0.0.0.0:-1"
}
`))
defer os.Remove(confB)
defer sB.Shutdown()
if sB.ClusterName() == "" {
t.Fatalf("Expected update cluster name to be set dynamically")
}
//
// [C] starts with ABC cluster name and the 2 routes.
//
clusterConf := fmt.Sprintf(`
cluster {
name: "ABC"
listen: "0.0.0.0:-1"
routes: [
nats://localhost:%d
nats://localhost:%d
]
}
`, sA.ClusterAddr().Port, sB.ClusterAddr().Port)
sC, _, confC := runReloadServerWithContent(t, []byte(`
listen: "0.0.0.0:-1"
server_name: "C"
`+clusterConf))
defer os.Remove(confC)
defer sC.Shutdown()
if sC.ClusterName() != "ABC" {
t.Fatalf("Expected update clustername to be set dynamically")
}
checkClusterFormed(t, sA, sB, sC)
}
func TestConfigLeafnodeClusterMembershipReload(t *testing.T) {
//
// [GROUND] starts in dynamic mode.
//
s0, _, conf0 := runReloadServerWithContent(t, []byte(`
listen: "0.0.0.0:-1"
server_name: "GROUND"
leafnodes: {
listen: "0.0.0.0:-1"
}
`))
defer os.Remove(conf0)
defer s0.Shutdown()
leafConf := fmt.Sprintf(`
leafnodes: {
remotes [{ url: "nats://localhost:%d" }]
}
`, s0.LeafnodeAddr().Port)
//
// [A] connects to [GROUND] via leafnode port
//
sA, _, confA := runReloadServerWithContent(t, []byte(`
listen: "0.0.0.0:-1"
server_name: "A"
cluster: {
listen: "0.0.0.0:-1"
}
`+leafConf))
defer os.Remove(confA)
defer sA.Shutdown()
//
// [B] connects to [GROUND] via leafnode port
//
sB, _, confB := runReloadServerWithContent(t, []byte(`
listen: "0.0.0.0:-1"
server_name: "B"
cluster: {
listen: "0.0.0.0:-1"
}
`+leafConf))
defer os.Remove(confB)
defer sB.Shutdown()
//
// [A] client can make requests to service at [B]
// with the request traveling through [GROUND] node.
//
ncA, err := nats.Connect(fmt.Sprintf("nats://localhost:%d", sA.Addr().(*net.TCPAddr).Port))
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer ncA.Close()
ncB, err := nats.Connect(fmt.Sprintf("nats://localhost:%d", sB.Addr().(*net.TCPAddr).Port))
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer ncB.Close()
ncB.Subscribe("help", func(msg *nats.Msg) {
msg.Respond([]byte("OK"))
})
ncB.Flush()
time.Sleep(500 * time.Millisecond)
makeRequest := func(payload []byte) error {
_, err := ncA.Request("help", payload, 5*time.Second)
if err != nil {
return err
}
return nil
}
//
// Make roundtrip via leafnode connections.
//
// [A] <-- lid --> [GROUND] <-- lid --> [B]
//
//
err = makeRequest([]byte("VIA LEAFNODE"))
if err != nil {
t.Fatal(err)
}
//
// [GROUND] should have 2 leaf connections and
// a single message flowing in/out.
//
leafz, err := s0.Leafz(nil)
if err != nil {
t.Fatal(err)
}
if len(leafz.Leafs) != 2 {
t.Errorf("Expected 2 leafs but got %d", len(leafz.Leafs))
}
for _, leaf := range leafz.Leafs {
got := int(leaf.InMsgs)
expected := 1
if got != expected {
t.Errorf("Expected: %d, got: %d", expected, got)
}
got = int(leaf.OutMsgs)
expected = 1
if got != expected {
t.Errorf("Expected: %d, got: %d", expected, got)
}
got = int(leaf.NumSubs)
expected = 2
if got != expected {
t.Errorf("Expected: %d, got: %d", expected, got)
}
}
//
// [B] solicit from [A] which will make [A] use the explicit
// cluster name AB, since [A] is in dynamic cluster name mode,
// forming a cluster together.
//
clusterConf := fmt.Sprintf(`
cluster {
name: "AB"
listen: "0.0.0.0:-1"
routes: [ nats://localhost:%d ]
}
`, sA.ClusterAddr().Port)
reloadUpdateConfig(t, sB, confB, `
listen: "0.0.0.0:-1"
server_name: "B"
`+clusterConf+leafConf)
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
if sA.ClusterName() != "AB" || sB.ClusterName() != "AB" {
return fmt.Errorf("Expected clustername of \"AB\", got %q and %q", sA.ClusterName(), sB.ClusterName())
}
return nil
})
checkClusterFormed(t, sA, sB)
//
// The cluster renaming should have caused a reconnection
// so stats will be reset.
//
checkNoChangeInLeafStats := func(t *testing.T) {
t.Helper()
leafz, err = s0.Leafz(nil)
if err != nil {
t.Fatal(err)
}
for _, leaf := range leafz.Leafs {
got := int(leaf.InMsgs)
expected := 0
if got != expected {
t.Errorf("Expected: %d InMsgs, got: %d", expected, got)
}
got = int(leaf.OutMsgs)
expected = 0
if got != expected {
t.Errorf("Expected: %d OutMsgs, got: %d", expected, got)
}
got = int(leaf.NumSubs)
expected = 2
if got != expected {
t.Errorf("Expected: %d NumSubs, got: %d", expected, got)
}
}
}
checkNoChangeInLeafStats(t)
connzB, err := sB.Connz(nil)
if err != nil {
t.Fatal(err)
}
if len(connzB.Conns) != 1 || connzB.Conns[0].InMsgs != 1 || connzB.Conns[0].OutMsgs != 1 {
t.Fatal("Expected connection to node B to receive messages.")
}
//
// [A] to [B] roundtrip should be now via the cluster routes,
// not the leafnode connections through the [GROUND] node.
//
err = makeRequest([]byte("AFTER CLUSTER FORMED"))
if err != nil {
t.Fatal(err)
}
time.Sleep(500 * time.Millisecond)
connzB, err = sB.Connz(nil)
if err != nil {
t.Fatal(err)
}
if len(connzB.Conns) != 1 || connzB.Conns[0].InMsgs != 2 || connzB.Conns[0].OutMsgs != 2 {
t.Fatal("Expected connection to node B to receive messages.")
}
// [B] stops soliciting routes from [A] and goes back to dynamic.
// This will cause another leafnode reconnection.
reloadUpdateConfig(t, sB, confB, `
listen: "0.0.0.0:-1"
server_name: "B"
cluster: {
listen: "0.0.0.0:-1"
}
`+leafConf)
// Confirm that there are no routes to both servers and
// that the cluster name has changed.
checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
if numRoutes := sA.NumRoutes(); numRoutes != 0 {
return fmt.Errorf("Expected no routes for server, got %d", numRoutes)
}
if numRoutes := sB.NumRoutes(); numRoutes != 0 {
return fmt.Errorf("Expected no routes for server, got %d", numRoutes)
}
nameB := sB.ClusterName()
if nameB == "AB" || nameB == "" {
return fmt.Errorf("Expected clustername to change, got %q", nameB)
}
// Wait for leafnode connections to reconnect.
leafz, _ = s0.Leafz(nil)
if len(leafz.Leafs) < 2 {
return fmt.Errorf("Expected 2 leafnode connections, got: %d", len(leafz.Leafs))
}
//
// [A] cluster name should not be AB anymore after [B] went back to dynamic mode.
//
if sA.ClusterName() == "AB" {
return fmt.Errorf("Expected cluster name to not be AB!")
}
return nil
})
// New request should have been through the leafnode again,
// all using a dynamic cluster name on each side.
//
// [A] <-- lid --> [GROUND] <-- lid --> [B]
//
err = makeRequest([]byte("REQUEST VIA LEAFNODE AGAIN"))
if err != nil {
t.Fatalf("Expected response via leafnode, got: %s", err)
}
}
func TestConfigReloadMaxSubsUnsupported(t *testing.T) {
s, _, conf := runReloadServerWithContent(t, []byte(`max_subs: 1`))
defer os.Remove(conf)

View File

@@ -81,6 +81,11 @@ type route struct {
gatewayURL string
leafnodeURL string
hash string
// Remember the last cluster name and cluster dynamic state
// from a route in case it has been announced to us.
clusterName string
clusterDynamic bool
}
type connectInfo struct {
@@ -491,6 +496,7 @@ func (c *client) processRouteInfo(info *Info) {
supportsHeaders := c.srv.supportsHeaders()
clusterName := c.srv.ClusterName()
isSusceptible := c.srv.isClusterNameSusceptibleToChanges()
c.mu.Lock()
// Connection can be closed at any time (by auth timeout, etc).
@@ -514,13 +520,75 @@ func (c *client) processRouteInfo(info *Info) {
// Detect if we have a mismatch of cluster names.
if info.Cluster != "" && info.Cluster != clusterName {
c.mu.Unlock()
// If we are dynamic we may update our cluster name.
// Use other if remote is non dynamic or their name is "bigger"
if s.isClusterNameDynamic() && (!info.Dynamic || (strings.Compare(clusterName, info.Cluster) < 0)) {
s.setClusterName(info.Cluster)
isDynamic := s.isClusterNameDynamic()
remoteIsDynamic := info.Dynamic
if isDynamic && !remoteIsDynamic {
// The remote INFO sent to this node contains what should be the name of the cluster.
// Update the name marking that no longer susceptible to dynamic cluster updates since
// there is an explicit name in the cluster and cause other nodes to reconnect to
// this node using the explicit name.
s.setClusterName(info.Cluster, false)
s.removeAllRoutesExcept(c)
c.mu.Lock()
c.route.clusterName = info.Cluster
c.route.clusterDynamic = info.Dynamic
} else if !isDynamic && remoteIsDynamic {
c.srv.mu.Lock()
routeInfoJSON := s.routeInfoJSON
c.srv.mu.Unlock()
c.mu.Lock()
// In case there is a mismatch with what we know about this route,
// we will send it another INFO message with the explicit cluster name
// so that the remote dynamic node uses our cluster name.
if c.route.clusterName != info.Cluster {
c.enqueueProto(routeInfoJSON)
}
// Override the cluster name from this route with ours since it should win.
c.route.clusterName = s.info.Cluster
c.route.clusterDynamic = info.Dynamic
} else if isDynamic && remoteIsDynamic {
if isSusceptible {
// Ignore the message unless we are susceptible to dynamic cluster updates and their
// dynamic cluster name is a better fit than what we have. If we are not susceptible,
// it means that there is an explicit name in the cluster and eventually nodes with the
// explicit cluster name will make this node use the explicit cluster name via INFO messages.
if strings.Compare(clusterName, info.Cluster) < 0 {
s.setClusterName(info.Cluster, true)
s.removeAllRoutesExcept(c)
}
} else if !s.nonDynamicClusterNameRoutePresent(info.ID) {
// If the route moved from being static to dynamic (thus also changing its name),
// then only use their name if no other static, non dynamic nodes exist and it beats
// this node's default unique cluster name. This can happen for example when the last node
// from a cluster of nodes has its config reloaded to remove the name and go back to dynamic mode.
// Find any other route with an explicit name but the one that just went back to dynamic mode.
// NOTE: Sometimes nodes with dynamic names can also send their INFOs but due to races,
// by the time others have received it they have already changed its name to an explicit one
// due to a CONNECT message that was sent. This check also covers that scenario.
// Use the correct name and recluster with that name.
if strings.Compare(s.defaultClusterName, info.Cluster) < 0 {
// Use theirs, let other nodes reconnect using their cluster name.
s.setClusterName(info.Cluster, true)
s.removeAllRoutesExcept(c)
} else {
// Use ours, let other nodes reconnect using our cluster name.
s.setClusterName(s.defaultClusterName, true)
s.removeAllRoutesExcept(c)
}
}
c.mu.Lock()
c.route.clusterName = info.Cluster
c.route.clusterDynamic = info.Dynamic
} else {
// We have an explicit cluster name and reject explicit cluster name changes.
c.closeConnection(ClusterNameConflict)
return
}
@@ -1920,23 +1988,35 @@ func (c *client) processRouteConnect(srv *Server, arg []byte, lang string) error
}
clusterName := srv.ClusterName()
isSusceptible := srv.isClusterNameSusceptibleToChanges()
// If we have a cluster name set, make sure it matches ours.
if proto.Cluster != clusterName {
shouldReject := true
// If we have a dynamic name we will do additional checks.
if srv.isClusterNameDynamic() {
if !proto.Dynamic || strings.Compare(clusterName, proto.Cluster) < 0 {
// We will take on their name since theirs is configured or higher then ours.
srv.setClusterName(proto.Cluster)
if !proto.Dynamic {
srv.getOpts().Cluster.Name = proto.Cluster
}
isDynamic := srv.isClusterNameDynamic()
remoteIsDynamic := proto.Dynamic
if isDynamic && !remoteIsDynamic {
// c.Noticef("Updating the explicit name from the CONNECT message: ", proto.Cluster)
// Use the explicit name and mark current node as not susceptible
// to dynamic cluster name changes.
srv.setClusterName(proto.Cluster, false)
srv.removeAllRoutesExcept(c)
} else if !isDynamic && remoteIsDynamic {
// Do nothing since the remote may use this node's cluster name
// when it is sent an INFO message. Also override the cluster name
// from this route with ours since should win.
c.mu.Lock()
c.route.clusterName = srv.info.Cluster
c.mu.Unlock()
} else if isDynamic && remoteIsDynamic {
// If we haven't learned an explicit name, we will take on their name if theirs is higher then ours.
if isSusceptible && strings.Compare(clusterName, proto.Cluster) < 0 {
srv.setClusterName(proto.Cluster, isSusceptible)
srv.removeAllRoutesExcept(c)
shouldReject = false
}
}
if shouldReject {
} else {
// We have an explicit cluster name and reject explicit cluster name changes.
errTxt := fmt.Sprintf("Rejecting connection, cluster name %q does not match %q", proto.Cluster, srv.info.Cluster)
c.Errorf(errTxt)
c.sendErr(errTxt)
@@ -1953,6 +2033,10 @@ func (c *client) processRouteConnect(srv *Server, arg []byte, lang string) error
c.route.lnoc = proto.LNOC
c.setRoutePermissions(perms)
c.headers = supportsHeaders && proto.Headers
// Remember their cluster name.
c.route.clusterName = proto.Cluster
c.route.clusterDynamic = proto.Dynamic
c.mu.Unlock()
return nil
}
@@ -1973,6 +2057,23 @@ func (s *Server) removeAllRoutesExcept(c *client) {
}
}
// Called when we update our cluster name during negotiations with remotes.
func (s *Server) nonDynamicClusterNameRoutePresent(remoteID string) bool {
s.mu.Lock()
defer s.mu.Unlock()
for _, r := range s.routes {
// Check all routes but this one.
if r.route.remoteID == remoteID {
continue
}
if r.route.clusterName != "" && !r.route.clusterDynamic {
return true
}
}
return false
}
func (s *Server) removeRoute(c *client) {
var rID string
var lnURL string
@@ -1989,6 +2090,25 @@ func (s *Server) removeRoute(c *client) {
}
c.mu.Unlock()
s.mu.Lock()
// Check whether we do not have any other routes that are not using explicit names,
// since in that case this node should go back to using a dynamic cluster name
// that will be negotiated among the remaining members of the cluster.
var revertClusterName bool
if s.isClusterNameDynamic() && !s.susceptible {
var nonDynamicRouteFound bool
for _, r := range s.routes {
if r.route.clusterName != "" && !r.route.clusterDynamic {
nonDynamicRouteFound = true
}
}
// Check whether there are no more routes, if so go back to use the original dynamic cluster name.
if len(s.routes) <= 1 && nonDynamicRouteFound {
revertClusterName = true
}
}
delete(s.routes, cid)
if r != nil {
rc, ok := s.remotes[rID]
@@ -2010,4 +2130,14 @@ func (s *Server) removeRoute(c *client) {
}
s.removeFromTempClients(cid)
s.mu.Unlock()
// When the last route to this cluster is disconnected
// and we were in dynamic mode, then we will go back
// to using the original dynamic name that we had.
if revertClusterName {
s.mu.Lock()
name := s.defaultClusterName
s.mu.Unlock()
s.setClusterName(name, true)
}
}

View File

@@ -218,6 +218,14 @@ type Server struct {
// Websocket structure
websocket srvWebsocket
// susceptible is to mark whether this node is susceptible
// to dynamic cluster name INFO/CONNECT proto updates.
susceptible bool
// defaultClusterName is the default cluster name used
// in case dynamic clustering name is used.
defaultClusterName string
}
// Make sure all are 64bits for atomic use
@@ -289,17 +297,19 @@ func NewServer(opts *Options) (*Server, error) {
now := time.Now()
s := &Server{
kp: kp,
configFile: opts.ConfigFile,
info: info,
prand: rand.New(rand.NewSource(time.Now().UnixNano())),
opts: opts,
done: make(chan bool, 1),
start: now,
configTime: now,
gwLeafSubs: NewSublistWithCache(),
httpBasePath: httpBasePath,
eventIds: nuid.New(),
kp: kp,
configFile: opts.ConfigFile,
info: info,
prand: rand.New(rand.NewSource(time.Now().UnixNano())),
opts: opts,
done: make(chan bool, 1),
start: now,
configTime: now,
gwLeafSubs: NewSublistWithCache(),
httpBasePath: httpBasePath,
eventIds: nuid.New(),
susceptible: opts.Cluster.Name == "",
defaultClusterName: nuid.Next(),
}
// Trusted root operator keys.
@@ -325,9 +335,11 @@ func NewServer(opts *Options) (*Server, error) {
return nil, err
}
// If we have a cluster definition but do not have a cluster name, create one.
// If we have a cluster definition but do not have a cluster name,
// use the default dynamic cluster name for the node.
if opts.Cluster.Port != 0 && opts.Cluster.Name == "" {
s.info.Cluster = nuid.Next()
s.info.Cluster = s.defaultClusterName
s.susceptible = true
}
// This is normally done in the AcceptLoop, once the
@@ -449,7 +461,7 @@ func (s *Server) ClusterName() string {
}
// setClusterName will update the cluster name for this server.
func (s *Server) setClusterName(name string) {
func (s *Server) setClusterName(name string, susceptible bool) {
s.mu.Lock()
var resetCh chan struct{}
if s.sys != nil && s.info.Cluster != name {
@@ -458,8 +470,16 @@ func (s *Server) setClusterName(name string) {
}
s.info.Cluster = name
s.routeInfo.Cluster = name
// Marks whether this is a dynamic name change that is
// susceptible to changes due to INFO/CONNECT messages
// from other nodes in the cluster.
s.susceptible = susceptible
s.routeInfo.Dynamic = susceptible
// Regenerate the info byte array
s.generateRouteInfoJSON()
// Need to close solicited leaf nodes. The close has to be done outside of the server lock.
var leafs []*client
for _, c := range s.leafs {
@@ -477,7 +497,6 @@ func (s *Server) setClusterName(name string) {
resetCh <- struct{}{}
}
s.Noticef("Cluster name updated to %s", name)
}
// Return whether the cluster name is dynamic.
@@ -485,6 +504,13 @@ func (s *Server) isClusterNameDynamic() bool {
return s.getOpts().Cluster.Name == ""
}
// Return whether the cluster name is susceptible to cluster name changes.
func (s *Server) isClusterNameSusceptibleToChanges() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.susceptible
}
// ClientURL returns the URL used to connect clients. Helpful in testing
// when we designate a random client port (-1).
func (s *Server) ClientURL() string {
@@ -2545,6 +2571,16 @@ func (s *Server) ProfilerAddr() *net.TCPAddr {
return s.profiler.Addr().(*net.TCPAddr)
}
// LeafnodeAddr returns the net.Addr object for the leafnode listener.
func (s *Server) LeafnodeAddr() *net.TCPAddr {
s.mu.Lock()
defer s.mu.Unlock()
if s.leafNodeListener == nil {
return nil
}
return s.leafNodeListener.Addr().(*net.TCPAddr)
}
// ReadyForConnections returns `true` if the server is ready to accept clients
// and, if routing is enabled, route connections. If after the duration
// `dur` the server is still not ready, returns `false`.