From 56ab619498a68e7f08252210bbd4bb8772b77d15 Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Fri, 16 Jun 2017 15:53:07 -0500 Subject: [PATCH] First pass at implementing cluster reload --- server/client.go | 5 +++ server/opts.go | 7 +++ server/reload.go | 110 ++++++++++++++++++++++++++++++++++++++++++++++ server/route.go | 10 ++--- server/server.go | 9 ++++ util/tls.go | 32 ++------------ util/tls_pre17.go | 2 +- util/tls_pre18.go | 37 ++++++++++++++++ 8 files changed, 178 insertions(+), 34 deletions(-) create mode 100644 util/tls_pre18.go diff --git a/server/client.go b/server/client.go index 144d42a9..05a50970 100644 --- a/server/client.go +++ b/server/client.go @@ -1342,6 +1342,11 @@ func (c *client) closeConnection() { } } + // Don't reconnect routes that are being closed. + if c.route != nil && c.route.closed { + return + } + // Check for a solicited route. If it was, start up a reconnect unless // we are already connected to the other end. if c.isSolicitedRoute() || retryImplicit { diff --git a/server/opts.go b/server/opts.go index 719fa47b..59664a0f 100644 --- a/server/opts.go +++ b/server/opts.go @@ -15,6 +15,7 @@ import ( "time" "github.com/nats-io/gnatsd/conf" + "github.com/nats-io/gnatsd/util" ) // Options for clusters. @@ -94,6 +95,12 @@ func (o *Options) Clone() *Options { clone.Routes[i] = routeCopy } } + if o.TLSConfig != nil { + clone.TLSConfig = util.CloneTLSConfig(o.TLSConfig) + } + if o.Cluster.TLSConfig != nil { + clone.Cluster.TLSConfig = util.CloneTLSConfig(o.Cluster.TLSConfig) + } return clone } diff --git a/server/reload.go b/server/reload.go index 848fb997..4cf4ae44 100644 --- a/server/reload.go +++ b/server/reload.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "errors" "fmt" + "net/url" "reflect" "strings" ) @@ -176,6 +177,64 @@ func (u *usersOption) Apply(server *Server) { server.Noticef("Reloaded: authorization users") } +// clusterOption implements the option interface for the `cluster` setting. +type clusterOption struct { + noopOption + newValue ClusterOpts +} + +// Apply the cluster change. +func (c *clusterOption) Apply(server *Server) { + server.mu.Lock() + tlsRequired := c.newValue.TLSConfig != nil + server.routeInfo.TLSRequired = tlsRequired + server.routeInfo.SSLRequired = tlsRequired + server.routeInfo.TLSVerify = tlsRequired + server.routeInfo.AuthRequired = c.newValue.Username != "" + server.generateRouteInfoJSON() + server.mu.Unlock() + server.Noticef("Reloaded: cluster") +} + +// routesOption implements the option interface for the cluster `routes` +// setting. +type routesOption struct { + noopOption + add []*url.URL + remove []*url.URL +} + +// Apply the route changes by adding and removing the necessary routes. +func (r *routesOption) Apply(server *Server) { + server.mu.Lock() + routes := make([]*client, len(server.routes)) + i := 0 + for _, client := range server.routes { + routes[i] = client + i++ + } + server.mu.Unlock() + + // Remove routes. + for _, remove := range r.remove { + for _, client := range routes { + if client.route.url == remove { + client.mu.Lock() + // Do not attempt to reconnect when route is removed. + client.route.closed = true + client.mu.Unlock() + client.closeConnection() + server.Noticef("Removed route %v", remove) + } + } + } + + // Add routes. + server.solicitRoutes(r.add) + + server.Noticef("Reloaded: cluster routes") +} + // Reload reads the current configuration file and applies any supported // changes. This returns an error if the server was not started with a config // file or an option which doesn't support hot-swapping was changed. @@ -250,6 +309,15 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { diffOpts = append(diffOpts, &authTimeoutOption{newValue: newValue.(float64)}) case "users": diffOpts = append(diffOpts, &usersOption{newValue: newValue.([]*User)}) + case "cluster": + newClusterOpts := newValue.(ClusterOpts) + if err := validateClusterOpts(oldValue.(ClusterOpts), newClusterOpts); err != nil { + return nil, err + } + diffOpts = append(diffOpts, &clusterOption{newValue: newClusterOpts}) + case "routes": + add, remove := diffRoutes(oldValue.([]*url.URL), newValue.([]*url.URL)) + diffOpts = append(diffOpts, &routesOption{add: add, remove: remove}) case "port": // check to see if newValue == 0 and continue if so. if newValue == 0 { @@ -316,3 +384,45 @@ func (s *Server) reloadAuthorization() { s.removeUnauthorizedSubs(client) } } + +// validateClusterOpts ensures the new ClusterOpts does not change host or +// port, which do not support reload. +func validateClusterOpts(old, new ClusterOpts) error { + if old.Host != new.Host { + return fmt.Errorf("Config reload not supported for cluster host: old=%s, new=%s", + old.Host, new.Host) + } + if old.Port != new.Port { + return fmt.Errorf("Config reload not supported for cluster port: old=%d, new=%d", + old.Port, new.Port) + } + return nil +} + +// diffRoutes diffs the old routes and the new routes and returns the ones that +// should be added and removed from the server. +func diffRoutes(old, new []*url.URL) (add, remove []*url.URL) { + // Find routes to remove. +removeLoop: + for _, oldRoute := range old { + for _, newRoute := range new { + if oldRoute == newRoute { + continue removeLoop + } + } + remove = append(remove, oldRoute) + } + + // Find routes to add. +addLoop: + for _, newRoute := range new { + for _, oldRoute := range old { + if oldRoute == newRoute { + continue addLoop + } + } + add = append(add, newRoute) + } + + return add, remove +} diff --git a/server/route.go b/server/route.go index 43109482..a6b60183 100644 --- a/server/route.go +++ b/server/route.go @@ -37,6 +37,7 @@ type route struct { url *url.URL authRequired bool tlsRequired bool + closed bool } type connectInfo struct { @@ -643,8 +644,7 @@ func (s *Server) routeAcceptLoop(ch chan struct{}) { info.AuthRequired = true } s.routeInfo = info - b, _ := json.Marshal(info) - s.routeInfoJSON = []byte(fmt.Sprintf(InfoProto, b)) + s.generateRouteInfoJSON() // Setup state that can enable shutdown s.mu.Lock() @@ -697,7 +697,7 @@ func (s *Server) StartRouting(clientListenReady chan struct{}) { <-ch // Solicit Routes if needed. - s.solicitRoutes() + s.solicitRoutes(s.getOpts().Routes) } func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType) { @@ -748,8 +748,8 @@ func (c *client) isSolicitedRoute() bool { return c.typ == ROUTER && c.route != nil && c.route.didSolicit } -func (s *Server) solicitRoutes() { - for _, r := range s.getOpts().Routes { +func (s *Server) solicitRoutes(routes []*url.URL) { + for _, r := range routes { route := r s.startGoRoutine(func() { s.connectToRoute(route, true) }) } diff --git a/server/server.go b/server/server.go index 01570501..cec881e3 100644 --- a/server/server.go +++ b/server/server.go @@ -177,6 +177,15 @@ func (s *Server) generateServerInfoJSON() { s.infoJSON = []byte(fmt.Sprintf("INFO %s %s", b, CR_LF)) } +func (s *Server) generateRouteInfoJSON() { + b, err := json.Marshal(s.routeInfo) + if err != nil { + s.Fatalf("Error marshaling route INFO JSON: %+v\n", err) + return + } + s.routeInfoJSON = []byte(fmt.Sprintf(InfoProto, b)) +} + // PrintAndDie is exported for access in other packages. func PrintAndDie(msg string) { fmt.Fprintf(os.Stderr, "%s\n", msg) diff --git a/util/tls.go b/util/tls.go index 51da0b88..2ec86006 100644 --- a/util/tls.go +++ b/util/tls.go @@ -1,5 +1,5 @@ -// Copyright 2016 Apcera Inc. All rights reserved. -// +build go1.7 +// Copyright 2017 Apcera Inc. All rights reserved. +// +build go1.8 package util @@ -7,31 +7,7 @@ import ( "crypto/tls" ) -// CloneTLSConfig returns a copy of c. Only the exported fields are copied. -// This is temporary, until this is provided by the language. -// https://go-review.googlesource.com/#/c/28075/ +// CloneTLSConfig returns a copy of c. func CloneTLSConfig(c *tls.Config) *tls.Config { - return &tls.Config{ - Rand: c.Rand, - Time: c.Time, - Certificates: c.Certificates, - NameToCertificate: c.NameToCertificate, - GetCertificate: c.GetCertificate, - RootCAs: c.RootCAs, - NextProtos: c.NextProtos, - ServerName: c.ServerName, - ClientAuth: c.ClientAuth, - ClientCAs: c.ClientCAs, - InsecureSkipVerify: c.InsecureSkipVerify, - CipherSuites: c.CipherSuites, - PreferServerCipherSuites: c.PreferServerCipherSuites, - SessionTicketsDisabled: c.SessionTicketsDisabled, - SessionTicketKey: c.SessionTicketKey, - ClientSessionCache: c.ClientSessionCache, - MinVersion: c.MinVersion, - MaxVersion: c.MaxVersion, - CurvePreferences: c.CurvePreferences, - DynamicRecordSizingDisabled: c.DynamicRecordSizingDisabled, - Renegotiation: c.Renegotiation, - } + return c.Clone() } diff --git a/util/tls_pre17.go b/util/tls_pre17.go index db198ae3..86feee8a 100644 --- a/util/tls_pre17.go +++ b/util/tls_pre17.go @@ -1,4 +1,4 @@ -// Copyright 2016 Apcera Inc. All rights reserved. +// Copyright 2017 Apcera Inc. All rights reserved. // +build go1.5,!go1.7 package util diff --git a/util/tls_pre18.go b/util/tls_pre18.go new file mode 100644 index 00000000..50dc45fe --- /dev/null +++ b/util/tls_pre18.go @@ -0,0 +1,37 @@ +// Copyright 2017 Apcera Inc. All rights reserved. +// +build go1.7,!go1.8 + +package util + +import ( + "crypto/tls" +) + +// CloneTLSConfig returns a copy of c. Only the exported fields are copied. +// This is temporary, until this is provided by the language. +// https://go-review.googlesource.com/#/c/28075/ +func CloneTLSConfig(c *tls.Config) *tls.Config { + return &tls.Config{ + Rand: c.Rand, + Time: c.Time, + Certificates: c.Certificates, + NameToCertificate: c.NameToCertificate, + GetCertificate: c.GetCertificate, + RootCAs: c.RootCAs, + NextProtos: c.NextProtos, + ServerName: c.ServerName, + ClientAuth: c.ClientAuth, + ClientCAs: c.ClientCAs, + InsecureSkipVerify: c.InsecureSkipVerify, + CipherSuites: c.CipherSuites, + PreferServerCipherSuites: c.PreferServerCipherSuites, + SessionTicketsDisabled: c.SessionTicketsDisabled, + SessionTicketKey: c.SessionTicketKey, + ClientSessionCache: c.ClientSessionCache, + MinVersion: c.MinVersion, + MaxVersion: c.MaxVersion, + CurvePreferences: c.CurvePreferences, + DynamicRecordSizingDisabled: c.DynamicRecordSizingDisabled, + Renegotiation: c.Renegotiation, + } +}