mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Several strategies which are listed below. 1. Checking a RaftNode to see if it is the leader now uses atomics. 2. Checking if we are the JetStream meta leader from the server now uses an atomic. 3. Accessing the JetStream context no longer requires a server lock, uses atomic.Pointer. 4. Filestore syncBlocks would hold msgBlock locks during sync, now does not. Signed-off-by: Derek Collison <derek@nats.io>
2436 lines
74 KiB
Go
2436 lines
74 KiB
Go
// Copyright 2017-2023 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package server
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"errors"
|
|
"fmt"
|
|
"net/url"
|
|
"reflect"
|
|
"sort"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/klauspost/compress/s2"
|
|
|
|
"github.com/nats-io/jwt/v2"
|
|
"github.com/nats-io/nuid"
|
|
)
|
|
|
|
// FlagSnapshot captures the server options as specified by CLI flags at
|
|
// startup. This should not be modified once the server has started.
|
|
var FlagSnapshot *Options
|
|
|
|
type reloadContext struct {
|
|
oldClusterPerms *RoutePermissions
|
|
}
|
|
|
|
// option is a hot-swappable configuration setting.
|
|
type option interface {
|
|
// Apply the server option.
|
|
Apply(server *Server)
|
|
|
|
// IsLoggingChange indicates if this option requires reloading the logger.
|
|
IsLoggingChange() bool
|
|
|
|
// IsTraceLevelChange indicates if this option requires reloading cached trace level.
|
|
// Clients store trace level separately.
|
|
IsTraceLevelChange() bool
|
|
|
|
// IsAuthChange indicates if this option requires reloading authorization.
|
|
IsAuthChange() bool
|
|
|
|
// IsTLSChange indicates if this option requires reloading TLS.
|
|
IsTLSChange() bool
|
|
|
|
// IsClusterPermsChange indicates if this option requires reloading
|
|
// cluster permissions.
|
|
IsClusterPermsChange() bool
|
|
|
|
// IsClusterPoolSizeOrAccountsChange indicates if this option requires
|
|
// special handling for changes in cluster's pool size or accounts list.
|
|
IsClusterPoolSizeOrAccountsChange() bool
|
|
|
|
// IsJetStreamChange inidicates a change in the servers config for JetStream.
|
|
// Account changes will be handled separately in reloadAuthorization.
|
|
IsJetStreamChange() bool
|
|
|
|
// Indicates a change in the server that requires publishing the server's statz
|
|
IsStatszChange() bool
|
|
}
|
|
|
|
// noopOption is a base struct that provides default no-op behaviors.
|
|
type noopOption struct{}
|
|
|
|
func (n noopOption) IsLoggingChange() bool {
|
|
return false
|
|
}
|
|
|
|
func (n noopOption) IsTraceLevelChange() bool {
|
|
return false
|
|
}
|
|
|
|
func (n noopOption) IsAuthChange() bool {
|
|
return false
|
|
}
|
|
|
|
func (n noopOption) IsTLSChange() bool {
|
|
return false
|
|
}
|
|
|
|
func (n noopOption) IsClusterPermsChange() bool {
|
|
return false
|
|
}
|
|
|
|
func (n noopOption) IsClusterPoolSizeOrAccountsChange() bool {
|
|
return false
|
|
}
|
|
|
|
func (n noopOption) IsJetStreamChange() bool {
|
|
return false
|
|
}
|
|
|
|
func (n noopOption) IsStatszChange() bool {
|
|
return false
|
|
}
|
|
|
|
// loggingOption is a base struct that provides default option behaviors for
|
|
// logging-related options.
|
|
type loggingOption struct {
|
|
noopOption
|
|
}
|
|
|
|
func (l loggingOption) IsLoggingChange() bool {
|
|
return true
|
|
}
|
|
|
|
// traceLevelOption is a base struct that provides default option behaviors for
|
|
// tracelevel-related options.
|
|
type traceLevelOption struct {
|
|
loggingOption
|
|
}
|
|
|
|
func (l traceLevelOption) IsTraceLevelChange() bool {
|
|
return true
|
|
}
|
|
|
|
// traceOption implements the option interface for the `trace` setting.
|
|
type traceOption struct {
|
|
traceLevelOption
|
|
newValue bool
|
|
}
|
|
|
|
// Apply is a no-op because logging will be reloaded after options are applied.
|
|
func (t *traceOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: trace = %v", t.newValue)
|
|
}
|
|
|
|
// traceOption implements the option interface for the `trace` setting.
|
|
type traceVerboseOption struct {
|
|
traceLevelOption
|
|
newValue bool
|
|
}
|
|
|
|
// Apply is a no-op because logging will be reloaded after options are applied.
|
|
func (t *traceVerboseOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: trace_verbose = %v", t.newValue)
|
|
}
|
|
|
|
// debugOption implements the option interface for the `debug` setting.
|
|
type debugOption struct {
|
|
loggingOption
|
|
newValue bool
|
|
}
|
|
|
|
// Apply is mostly a no-op because logging will be reloaded after options are applied.
|
|
// However we will kick the raft nodes if they exist to reload.
|
|
func (d *debugOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: debug = %v", d.newValue)
|
|
server.reloadDebugRaftNodes(d.newValue)
|
|
}
|
|
|
|
// logtimeOption implements the option interface for the `logtime` setting.
|
|
type logtimeOption struct {
|
|
loggingOption
|
|
newValue bool
|
|
}
|
|
|
|
// Apply is a no-op because logging will be reloaded after options are applied.
|
|
func (l *logtimeOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: logtime = %v", l.newValue)
|
|
}
|
|
|
|
// logtimeUTCOption implements the option interface for the `logtime_utc` setting.
|
|
type logtimeUTCOption struct {
|
|
loggingOption
|
|
newValue bool
|
|
}
|
|
|
|
// Apply is a no-op because logging will be reloaded after options are applied.
|
|
func (l *logtimeUTCOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: logtime_utc = %v", l.newValue)
|
|
}
|
|
|
|
// logfileOption implements the option interface for the `log_file` setting.
|
|
type logfileOption struct {
|
|
loggingOption
|
|
newValue string
|
|
}
|
|
|
|
// Apply is a no-op because logging will be reloaded after options are applied.
|
|
func (l *logfileOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: log_file = %v", l.newValue)
|
|
}
|
|
|
|
// syslogOption implements the option interface for the `syslog` setting.
|
|
type syslogOption struct {
|
|
loggingOption
|
|
newValue bool
|
|
}
|
|
|
|
// Apply is a no-op because logging will be reloaded after options are applied.
|
|
func (s *syslogOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: syslog = %v", s.newValue)
|
|
}
|
|
|
|
// remoteSyslogOption implements the option interface for the `remote_syslog`
|
|
// setting.
|
|
type remoteSyslogOption struct {
|
|
loggingOption
|
|
newValue string
|
|
}
|
|
|
|
// Apply is a no-op because logging will be reloaded after options are applied.
|
|
func (r *remoteSyslogOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: remote_syslog = %v", r.newValue)
|
|
}
|
|
|
|
// tlsOption implements the option interface for the `tls` setting.
|
|
type tlsOption struct {
|
|
noopOption
|
|
newValue *tls.Config
|
|
}
|
|
|
|
// Apply the tls change.
|
|
func (t *tlsOption) Apply(server *Server) {
|
|
server.mu.Lock()
|
|
tlsRequired := t.newValue != nil
|
|
server.info.TLSRequired = tlsRequired && !server.getOpts().AllowNonTLS
|
|
message := "disabled"
|
|
if tlsRequired {
|
|
server.info.TLSVerify = (t.newValue.ClientAuth == tls.RequireAndVerifyClientCert)
|
|
message = "enabled"
|
|
}
|
|
server.mu.Unlock()
|
|
server.Noticef("Reloaded: tls = %s", message)
|
|
}
|
|
|
|
func (t *tlsOption) IsTLSChange() bool {
|
|
return true
|
|
}
|
|
|
|
// tlsTimeoutOption implements the option interface for the tls `timeout`
|
|
// setting.
|
|
type tlsTimeoutOption struct {
|
|
noopOption
|
|
newValue float64
|
|
}
|
|
|
|
// Apply is a no-op because the timeout will be reloaded after options are
|
|
// applied.
|
|
func (t *tlsTimeoutOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: tls timeout = %v", t.newValue)
|
|
}
|
|
|
|
// tlsPinnedCertOption implements the option interface for the tls `pinned_certs` setting.
|
|
type tlsPinnedCertOption struct {
|
|
noopOption
|
|
newValue PinnedCertSet
|
|
}
|
|
|
|
// Apply is a no-op because the pinned certs will be reloaded after options are applied.
|
|
func (t *tlsPinnedCertOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: %d pinned_certs", len(t.newValue))
|
|
}
|
|
|
|
// authOption is a base struct that provides default option behaviors.
|
|
type authOption struct {
|
|
noopOption
|
|
}
|
|
|
|
func (o authOption) IsAuthChange() bool {
|
|
return true
|
|
}
|
|
|
|
// usernameOption implements the option interface for the `username` setting.
|
|
type usernameOption struct {
|
|
authOption
|
|
}
|
|
|
|
// Apply is a no-op because authorization will be reloaded after options are
|
|
// applied.
|
|
func (u *usernameOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: authorization username")
|
|
}
|
|
|
|
// passwordOption implements the option interface for the `password` setting.
|
|
type passwordOption struct {
|
|
authOption
|
|
}
|
|
|
|
// Apply is a no-op because authorization will be reloaded after options are
|
|
// applied.
|
|
func (p *passwordOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: authorization password")
|
|
}
|
|
|
|
// authorizationOption implements the option interface for the `token`
|
|
// authorization setting.
|
|
type authorizationOption struct {
|
|
authOption
|
|
}
|
|
|
|
// Apply is a no-op because authorization will be reloaded after options are
|
|
// applied.
|
|
func (a *authorizationOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: authorization token")
|
|
}
|
|
|
|
// authTimeoutOption implements the option interface for the authorization
|
|
// `timeout` setting.
|
|
type authTimeoutOption struct {
|
|
noopOption // Not authOption because this is a no-op; will be reloaded with options.
|
|
newValue float64
|
|
}
|
|
|
|
// Apply is a no-op because the timeout will be reloaded after options are
|
|
// applied.
|
|
func (a *authTimeoutOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: authorization timeout = %v", a.newValue)
|
|
}
|
|
|
|
// tagsOption implements the option interface for the `tags` setting.
|
|
type tagsOption struct {
|
|
noopOption // Not authOption because this is a no-op; will be reloaded with options.
|
|
}
|
|
|
|
func (u *tagsOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: tags")
|
|
}
|
|
|
|
func (u *tagsOption) IsStatszChange() bool {
|
|
return true
|
|
}
|
|
|
|
// usersOption implements the option interface for the authorization `users`
|
|
// setting.
|
|
type usersOption struct {
|
|
authOption
|
|
}
|
|
|
|
func (u *usersOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: authorization users")
|
|
}
|
|
|
|
// nkeysOption implements the option interface for the authorization `users`
|
|
// setting.
|
|
type nkeysOption struct {
|
|
authOption
|
|
}
|
|
|
|
func (u *nkeysOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: authorization nkey users")
|
|
}
|
|
|
|
// clusterOption implements the option interface for the `cluster` setting.
|
|
type clusterOption struct {
|
|
authOption
|
|
newValue ClusterOpts
|
|
permsChanged bool
|
|
accsAdded []string
|
|
accsRemoved []string
|
|
poolSizeChanged bool
|
|
compressChanged bool
|
|
}
|
|
|
|
// Apply the cluster change.
|
|
func (c *clusterOption) Apply(s *Server) {
|
|
// TODO: support enabling/disabling clustering.
|
|
s.mu.Lock()
|
|
tlsRequired := c.newValue.TLSConfig != nil
|
|
s.routeInfo.TLSRequired = tlsRequired
|
|
s.routeInfo.TLSVerify = tlsRequired
|
|
s.routeInfo.AuthRequired = c.newValue.Username != ""
|
|
if c.newValue.NoAdvertise {
|
|
s.routeInfo.ClientConnectURLs = nil
|
|
s.routeInfo.WSConnectURLs = nil
|
|
} else {
|
|
s.routeInfo.ClientConnectURLs = s.clientConnectURLs
|
|
s.routeInfo.WSConnectURLs = s.websocket.connectURLs
|
|
}
|
|
s.setRouteInfoHostPortAndIP()
|
|
var routes []*client
|
|
if c.compressChanged {
|
|
co := &s.getOpts().Cluster.Compression
|
|
newMode := co.Mode
|
|
s.forEachRoute(func(r *client) {
|
|
r.mu.Lock()
|
|
// Skip routes that are "not supported" (because they will never do
|
|
// compression) or the routes that have already the new compression
|
|
// mode.
|
|
if r.route.compression == CompressionNotSupported || r.route.compression == newMode {
|
|
r.mu.Unlock()
|
|
return
|
|
}
|
|
// We need to close the route if it had compression "off" or the new
|
|
// mode is compression "off", or if the new mode is "accept", because
|
|
// these require negotiation.
|
|
if r.route.compression == CompressionOff || newMode == CompressionOff || newMode == CompressionAccept {
|
|
routes = append(routes, r)
|
|
} else if newMode == CompressionS2Auto {
|
|
// If the mode is "s2_auto", we need to check if there is really
|
|
// need to change, and at any rate, we want to save the actual
|
|
// compression level here, not s2_auto.
|
|
r.updateS2AutoCompressionLevel(co, &r.route.compression)
|
|
} else {
|
|
// Simply change the compression writer
|
|
r.out.cw = s2.NewWriter(nil, s2WriterOptions(newMode)...)
|
|
r.route.compression = newMode
|
|
}
|
|
r.mu.Unlock()
|
|
})
|
|
}
|
|
s.mu.Unlock()
|
|
if c.newValue.Name != "" && c.newValue.Name != s.ClusterName() {
|
|
s.setClusterName(c.newValue.Name)
|
|
}
|
|
for _, r := range routes {
|
|
r.closeConnection(ClientClosed)
|
|
}
|
|
s.Noticef("Reloaded: cluster")
|
|
if tlsRequired && c.newValue.TLSConfig.InsecureSkipVerify {
|
|
s.Warnf(clusterTLSInsecureWarning)
|
|
}
|
|
}
|
|
|
|
func (c *clusterOption) IsClusterPermsChange() bool {
|
|
return c.permsChanged
|
|
}
|
|
|
|
func (c *clusterOption) IsClusterPoolSizeOrAccountsChange() bool {
|
|
return c.poolSizeChanged || len(c.accsAdded) > 0 || len(c.accsRemoved) > 0
|
|
}
|
|
|
|
func (c *clusterOption) diffPoolAndAccounts(old *ClusterOpts) {
|
|
c.poolSizeChanged = c.newValue.PoolSize != old.PoolSize
|
|
addLoop:
|
|
for _, na := range c.newValue.PinnedAccounts {
|
|
for _, oa := range old.PinnedAccounts {
|
|
if na == oa {
|
|
continue addLoop
|
|
}
|
|
}
|
|
c.accsAdded = append(c.accsAdded, na)
|
|
}
|
|
removeLoop:
|
|
for _, oa := range old.PinnedAccounts {
|
|
for _, na := range c.newValue.PinnedAccounts {
|
|
if oa == na {
|
|
continue removeLoop
|
|
}
|
|
}
|
|
c.accsRemoved = append(c.accsRemoved, oa)
|
|
}
|
|
}
|
|
|
|
// routesOption implements the option interface for the cluster `routes`
|
|
// setting.
|
|
type routesOption struct {
|
|
noopOption
|
|
add []*url.URL
|
|
remove []*url.URL
|
|
}
|
|
|
|
// Apply the route changes by adding and removing the necessary routes.
|
|
func (r *routesOption) Apply(server *Server) {
|
|
server.mu.Lock()
|
|
routes := make([]*client, server.numRoutes())
|
|
i := 0
|
|
server.forEachRoute(func(r *client) {
|
|
routes[i] = r
|
|
i++
|
|
})
|
|
// If there was a change, notify monitoring code that it should
|
|
// update the route URLs if /varz endpoint is inspected.
|
|
if len(r.add)+len(r.remove) > 0 {
|
|
server.varzUpdateRouteURLs = true
|
|
}
|
|
server.mu.Unlock()
|
|
|
|
// Remove routes.
|
|
for _, remove := range r.remove {
|
|
for _, client := range routes {
|
|
var url *url.URL
|
|
client.mu.Lock()
|
|
if client.route != nil {
|
|
url = client.route.url
|
|
}
|
|
client.mu.Unlock()
|
|
if url != nil && urlsAreEqual(url, remove) {
|
|
// Do not attempt to reconnect when route is removed.
|
|
client.setNoReconnect()
|
|
client.closeConnection(RouteRemoved)
|
|
server.Noticef("Removed route %v", remove)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Add routes.
|
|
server.mu.Lock()
|
|
server.solicitRoutes(r.add, server.getOpts().Cluster.PinnedAccounts)
|
|
server.mu.Unlock()
|
|
|
|
server.Noticef("Reloaded: cluster routes")
|
|
}
|
|
|
|
// maxConnOption implements the option interface for the `max_connections`
|
|
// setting.
|
|
type maxConnOption struct {
|
|
noopOption
|
|
newValue int
|
|
}
|
|
|
|
// Apply the max connections change by closing random connections til we are
|
|
// below the limit if necessary.
|
|
func (m *maxConnOption) Apply(server *Server) {
|
|
server.mu.Lock()
|
|
var (
|
|
clients = make([]*client, len(server.clients))
|
|
i = 0
|
|
)
|
|
// Map iteration is random, which allows us to close random connections.
|
|
for _, client := range server.clients {
|
|
clients[i] = client
|
|
i++
|
|
}
|
|
server.mu.Unlock()
|
|
|
|
if m.newValue > 0 && len(clients) > m.newValue {
|
|
// Close connections til we are within the limit.
|
|
var (
|
|
numClose = len(clients) - m.newValue
|
|
closed = 0
|
|
)
|
|
for _, client := range clients {
|
|
client.maxConnExceeded()
|
|
closed++
|
|
if closed >= numClose {
|
|
break
|
|
}
|
|
}
|
|
server.Noticef("Closed %d connections to fall within max_connections", closed)
|
|
}
|
|
server.Noticef("Reloaded: max_connections = %v", m.newValue)
|
|
}
|
|
|
|
// pidFileOption implements the option interface for the `pid_file` setting.
|
|
type pidFileOption struct {
|
|
noopOption
|
|
newValue string
|
|
}
|
|
|
|
// Apply the setting by logging the pid to the new file.
|
|
func (p *pidFileOption) Apply(server *Server) {
|
|
if p.newValue == "" {
|
|
return
|
|
}
|
|
if err := server.logPid(); err != nil {
|
|
server.Errorf("Failed to write pidfile: %v", err)
|
|
}
|
|
server.Noticef("Reloaded: pid_file = %v", p.newValue)
|
|
}
|
|
|
|
// portsFileDirOption implements the option interface for the `portFileDir` setting.
|
|
type portsFileDirOption struct {
|
|
noopOption
|
|
oldValue string
|
|
newValue string
|
|
}
|
|
|
|
func (p *portsFileDirOption) Apply(server *Server) {
|
|
server.deletePortsFile(p.oldValue)
|
|
server.logPorts()
|
|
server.Noticef("Reloaded: ports_file_dir = %v", p.newValue)
|
|
}
|
|
|
|
// maxControlLineOption implements the option interface for the
|
|
// `max_control_line` setting.
|
|
type maxControlLineOption struct {
|
|
noopOption
|
|
newValue int32
|
|
}
|
|
|
|
// Apply the setting by updating each client.
|
|
func (m *maxControlLineOption) Apply(server *Server) {
|
|
mcl := int32(m.newValue)
|
|
server.mu.Lock()
|
|
for _, client := range server.clients {
|
|
atomic.StoreInt32(&client.mcl, mcl)
|
|
}
|
|
server.mu.Unlock()
|
|
server.Noticef("Reloaded: max_control_line = %d", mcl)
|
|
}
|
|
|
|
// maxPayloadOption implements the option interface for the `max_payload`
|
|
// setting.
|
|
type maxPayloadOption struct {
|
|
noopOption
|
|
newValue int32
|
|
}
|
|
|
|
// Apply the setting by updating the server info and each client.
|
|
func (m *maxPayloadOption) Apply(server *Server) {
|
|
server.mu.Lock()
|
|
server.info.MaxPayload = m.newValue
|
|
for _, client := range server.clients {
|
|
atomic.StoreInt32(&client.mpay, int32(m.newValue))
|
|
}
|
|
server.mu.Unlock()
|
|
server.Noticef("Reloaded: max_payload = %d", m.newValue)
|
|
}
|
|
|
|
// pingIntervalOption implements the option interface for the `ping_interval`
|
|
// setting.
|
|
type pingIntervalOption struct {
|
|
noopOption
|
|
newValue time.Duration
|
|
}
|
|
|
|
// Apply is a no-op because the ping interval will be reloaded after options
|
|
// are applied.
|
|
func (p *pingIntervalOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: ping_interval = %s", p.newValue)
|
|
}
|
|
|
|
// maxPingsOutOption implements the option interface for the `ping_max`
|
|
// setting.
|
|
type maxPingsOutOption struct {
|
|
noopOption
|
|
newValue int
|
|
}
|
|
|
|
// Apply is a no-op because the ping interval will be reloaded after options
|
|
// are applied.
|
|
func (m *maxPingsOutOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: ping_max = %d", m.newValue)
|
|
}
|
|
|
|
// writeDeadlineOption implements the option interface for the `write_deadline`
|
|
// setting.
|
|
type writeDeadlineOption struct {
|
|
noopOption
|
|
newValue time.Duration
|
|
}
|
|
|
|
// Apply is a no-op because the write deadline will be reloaded after options
|
|
// are applied.
|
|
func (w *writeDeadlineOption) Apply(server *Server) {
|
|
server.Noticef("Reloaded: write_deadline = %s", w.newValue)
|
|
}
|
|
|
|
// clientAdvertiseOption implements the option interface for the `client_advertise` setting.
|
|
type clientAdvertiseOption struct {
|
|
noopOption
|
|
newValue string
|
|
}
|
|
|
|
// Apply the setting by updating the server info and regenerate the infoJSON byte array.
|
|
func (c *clientAdvertiseOption) Apply(server *Server) {
|
|
server.mu.Lock()
|
|
server.setInfoHostPort()
|
|
server.mu.Unlock()
|
|
server.Noticef("Reload: client_advertise = %s", c.newValue)
|
|
}
|
|
|
|
// accountsOption implements the option interface.
|
|
// Ensure that authorization code is executed if any change in accounts
|
|
type accountsOption struct {
|
|
authOption
|
|
}
|
|
|
|
// Apply is a no-op. Changes will be applied in reloadAuthorization
|
|
func (a *accountsOption) Apply(s *Server) {
|
|
s.Noticef("Reloaded: accounts")
|
|
}
|
|
|
|
// For changes to a server's config.
|
|
type jetStreamOption struct {
|
|
noopOption
|
|
newValue bool
|
|
}
|
|
|
|
func (a *jetStreamOption) Apply(s *Server) {
|
|
s.Noticef("Reloaded: JetStream")
|
|
}
|
|
|
|
func (jso jetStreamOption) IsJetStreamChange() bool {
|
|
return true
|
|
}
|
|
|
|
func (jso jetStreamOption) IsStatszChange() bool {
|
|
return true
|
|
}
|
|
|
|
type ocspOption struct {
|
|
tlsOption
|
|
newValue *OCSPConfig
|
|
}
|
|
|
|
func (a *ocspOption) Apply(s *Server) {
|
|
s.Noticef("Reloaded: OCSP")
|
|
}
|
|
|
|
type ocspResponseCacheOption struct {
|
|
tlsOption
|
|
newValue *OCSPResponseCacheConfig
|
|
}
|
|
|
|
func (a *ocspResponseCacheOption) Apply(s *Server) {
|
|
s.Noticef("Reloaded OCSP peer cache")
|
|
}
|
|
|
|
// connectErrorReports implements the option interface for the `connect_error_reports`
|
|
// setting.
|
|
type connectErrorReports struct {
|
|
noopOption
|
|
newValue int
|
|
}
|
|
|
|
// Apply is a no-op because the value will be reloaded after options are applied.
|
|
func (c *connectErrorReports) Apply(s *Server) {
|
|
s.Noticef("Reloaded: connect_error_reports = %v", c.newValue)
|
|
}
|
|
|
|
// connectErrorReports implements the option interface for the `connect_error_reports`
|
|
// setting.
|
|
type reconnectErrorReports struct {
|
|
noopOption
|
|
newValue int
|
|
}
|
|
|
|
// Apply is a no-op because the value will be reloaded after options are applied.
|
|
func (r *reconnectErrorReports) Apply(s *Server) {
|
|
s.Noticef("Reloaded: reconnect_error_reports = %v", r.newValue)
|
|
}
|
|
|
|
// maxTracedMsgLenOption implements the option interface for the `max_traced_msg_len` setting.
|
|
type maxTracedMsgLenOption struct {
|
|
noopOption
|
|
newValue int
|
|
}
|
|
|
|
// Apply the setting by updating the maximum traced message length.
|
|
func (m *maxTracedMsgLenOption) Apply(server *Server) {
|
|
server.mu.Lock()
|
|
defer server.mu.Unlock()
|
|
server.opts.MaxTracedMsgLen = m.newValue
|
|
server.Noticef("Reloaded: max_traced_msg_len = %d", m.newValue)
|
|
}
|
|
|
|
type mqttAckWaitReload struct {
|
|
noopOption
|
|
newValue time.Duration
|
|
}
|
|
|
|
func (o *mqttAckWaitReload) Apply(s *Server) {
|
|
s.Noticef("Reloaded: MQTT ack_wait = %v", o.newValue)
|
|
}
|
|
|
|
type mqttMaxAckPendingReload struct {
|
|
noopOption
|
|
newValue uint16
|
|
}
|
|
|
|
func (o *mqttMaxAckPendingReload) Apply(s *Server) {
|
|
s.mqttUpdateMaxAckPending(o.newValue)
|
|
s.Noticef("Reloaded: MQTT max_ack_pending = %v", o.newValue)
|
|
}
|
|
|
|
type mqttStreamReplicasReload struct {
|
|
noopOption
|
|
newValue int
|
|
}
|
|
|
|
func (o *mqttStreamReplicasReload) Apply(s *Server) {
|
|
s.Noticef("Reloaded: MQTT stream_replicas = %v", o.newValue)
|
|
}
|
|
|
|
type mqttConsumerReplicasReload struct {
|
|
noopOption
|
|
newValue int
|
|
}
|
|
|
|
func (o *mqttConsumerReplicasReload) Apply(s *Server) {
|
|
s.Noticef("Reloaded: MQTT consumer_replicas = %v", o.newValue)
|
|
}
|
|
|
|
type mqttConsumerMemoryStorageReload struct {
|
|
noopOption
|
|
newValue bool
|
|
}
|
|
|
|
func (o *mqttConsumerMemoryStorageReload) Apply(s *Server) {
|
|
s.Noticef("Reloaded: MQTT consumer_memory_storage = %v", o.newValue)
|
|
}
|
|
|
|
type mqttInactiveThresholdReload struct {
|
|
noopOption
|
|
newValue time.Duration
|
|
}
|
|
|
|
func (o *mqttInactiveThresholdReload) Apply(s *Server) {
|
|
s.Noticef("Reloaded: MQTT consumer_inactive_threshold = %v", o.newValue)
|
|
}
|
|
|
|
type profBlockRateReload struct {
|
|
noopOption
|
|
newValue int
|
|
}
|
|
|
|
func (o *profBlockRateReload) Apply(s *Server) {
|
|
s.setBlockProfileRate(o.newValue)
|
|
s.Noticef("Reloaded: block_prof_rate = %v", o.newValue)
|
|
}
|
|
|
|
type leafNodeOption struct {
|
|
noopOption
|
|
tlsFirstChanged bool
|
|
compressionChanged bool
|
|
}
|
|
|
|
func (l *leafNodeOption) Apply(s *Server) {
|
|
opts := s.getOpts()
|
|
if l.tlsFirstChanged {
|
|
s.Noticef("Reloaded: LeafNode TLS HandshakeFirst value is: %v", opts.LeafNode.TLSHandshakeFirst)
|
|
for _, r := range opts.LeafNode.Remotes {
|
|
s.Noticef("Reloaded: LeafNode Remote to %v TLS HandshakeFirst value is: %v", r.URLs, r.TLSHandshakeFirst)
|
|
}
|
|
}
|
|
if l.compressionChanged {
|
|
var leafs []*client
|
|
acceptSideCompOpts := &opts.LeafNode.Compression
|
|
|
|
s.mu.RLock()
|
|
// First, update our internal leaf remote configurations with the new
|
|
// compress options.
|
|
// Since changing the remotes (as in adding/removing) is currently not
|
|
// supported, we know that we should have the same number in Options
|
|
// than in leafRemoteCfgs, but to be sure, use the max size.
|
|
max := len(opts.LeafNode.Remotes)
|
|
if l := len(s.leafRemoteCfgs); l < max {
|
|
max = l
|
|
}
|
|
for i := 0; i < max; i++ {
|
|
lr := s.leafRemoteCfgs[i]
|
|
lr.Lock()
|
|
lr.Compression = opts.LeafNode.Remotes[i].Compression
|
|
lr.Unlock()
|
|
}
|
|
|
|
for _, l := range s.leafs {
|
|
var co *CompressionOpts
|
|
|
|
l.mu.Lock()
|
|
if r := l.leaf.remote; r != nil {
|
|
co = &r.Compression
|
|
} else {
|
|
co = acceptSideCompOpts
|
|
}
|
|
newMode := co.Mode
|
|
// Skip leaf connections that are "not supported" (because they
|
|
// will never do compression) or the ones that have already the
|
|
// new compression mode.
|
|
if l.leaf.compression == CompressionNotSupported || l.leaf.compression == newMode {
|
|
l.mu.Unlock()
|
|
continue
|
|
}
|
|
// We need to close the connections if it had compression "off" or the new
|
|
// mode is compression "off", or if the new mode is "accept", because
|
|
// these require negotiation.
|
|
if l.leaf.compression == CompressionOff || newMode == CompressionOff || newMode == CompressionAccept {
|
|
leafs = append(leafs, l)
|
|
} else if newMode == CompressionS2Auto {
|
|
// If the mode is "s2_auto", we need to check if there is really
|
|
// need to change, and at any rate, we want to save the actual
|
|
// compression level here, not s2_auto.
|
|
l.updateS2AutoCompressionLevel(co, &l.leaf.compression)
|
|
} else {
|
|
// Simply change the compression writer
|
|
l.out.cw = s2.NewWriter(nil, s2WriterOptions(newMode)...)
|
|
l.leaf.compression = newMode
|
|
}
|
|
l.mu.Unlock()
|
|
}
|
|
s.mu.RUnlock()
|
|
// Close the connections for which negotiation is required.
|
|
for _, l := range leafs {
|
|
l.closeConnection(ClientClosed)
|
|
}
|
|
s.Noticef("Reloaded: LeafNode compression settings")
|
|
}
|
|
}
|
|
|
|
// Compares options and disconnects clients that are no longer listed in pinned certs. Lock must not be held.
|
|
func (s *Server) recheckPinnedCerts(curOpts *Options, newOpts *Options) {
|
|
s.mu.Lock()
|
|
disconnectClients := []*client{}
|
|
protoToPinned := map[int]PinnedCertSet{}
|
|
if !reflect.DeepEqual(newOpts.TLSPinnedCerts, curOpts.TLSPinnedCerts) {
|
|
protoToPinned[NATS] = curOpts.TLSPinnedCerts
|
|
}
|
|
if !reflect.DeepEqual(newOpts.MQTT.TLSPinnedCerts, curOpts.MQTT.TLSPinnedCerts) {
|
|
protoToPinned[MQTT] = curOpts.MQTT.TLSPinnedCerts
|
|
}
|
|
if !reflect.DeepEqual(newOpts.Websocket.TLSPinnedCerts, curOpts.Websocket.TLSPinnedCerts) {
|
|
protoToPinned[WS] = curOpts.Websocket.TLSPinnedCerts
|
|
}
|
|
for _, c := range s.clients {
|
|
if c.kind != CLIENT {
|
|
continue
|
|
}
|
|
if pinned, ok := protoToPinned[c.clientType()]; ok {
|
|
if !c.matchesPinnedCert(pinned) {
|
|
disconnectClients = append(disconnectClients, c)
|
|
}
|
|
}
|
|
}
|
|
checkClients := func(kind int, clients map[uint64]*client, set PinnedCertSet) {
|
|
for _, c := range clients {
|
|
if c.kind == kind && !c.matchesPinnedCert(set) {
|
|
disconnectClients = append(disconnectClients, c)
|
|
}
|
|
}
|
|
}
|
|
if !reflect.DeepEqual(newOpts.LeafNode.TLSPinnedCerts, curOpts.LeafNode.TLSPinnedCerts) {
|
|
checkClients(LEAF, s.leafs, newOpts.LeafNode.TLSPinnedCerts)
|
|
}
|
|
if !reflect.DeepEqual(newOpts.Cluster.TLSPinnedCerts, curOpts.Cluster.TLSPinnedCerts) {
|
|
s.forEachRoute(func(c *client) {
|
|
if !c.matchesPinnedCert(newOpts.Cluster.TLSPinnedCerts) {
|
|
disconnectClients = append(disconnectClients, c)
|
|
}
|
|
})
|
|
}
|
|
if s.gateway.enabled && reflect.DeepEqual(newOpts.Gateway.TLSPinnedCerts, curOpts.Gateway.TLSPinnedCerts) {
|
|
gw := s.gateway
|
|
gw.RLock()
|
|
for _, c := range gw.out {
|
|
if !c.matchesPinnedCert(newOpts.Gateway.TLSPinnedCerts) {
|
|
disconnectClients = append(disconnectClients, c)
|
|
}
|
|
}
|
|
checkClients(GATEWAY, gw.in, newOpts.Gateway.TLSPinnedCerts)
|
|
gw.RUnlock()
|
|
}
|
|
s.mu.Unlock()
|
|
if len(disconnectClients) > 0 {
|
|
s.Noticef("Disconnect %d clients due to pinned certs reload", len(disconnectClients))
|
|
for _, c := range disconnectClients {
|
|
c.closeConnection(TLSHandshakeError)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Reload reads the current configuration file and calls out to ReloadOptions
|
|
// to apply the changes. This returns an error if the server was not started
|
|
// with a config file or an option which doesn't support hot-swapping was changed.
|
|
func (s *Server) Reload() error {
|
|
s.mu.Lock()
|
|
configFile := s.configFile
|
|
s.mu.Unlock()
|
|
if configFile == "" {
|
|
return errors.New("can only reload config when a file is provided using -c or --config")
|
|
}
|
|
|
|
newOpts, err := ProcessConfigFile(configFile)
|
|
if err != nil {
|
|
// TODO: Dump previous good config to a .bak file?
|
|
return err
|
|
}
|
|
return s.ReloadOptions(newOpts)
|
|
}
|
|
|
|
// ReloadOptions applies any supported options from the provided Option
|
|
// type. This returns an error if an option which doesn't support
|
|
// hot-swapping was changed.
|
|
func (s *Server) ReloadOptions(newOpts *Options) error {
|
|
s.mu.Lock()
|
|
|
|
curOpts := s.getOpts()
|
|
|
|
// Wipe trusted keys if needed when we have an operator.
|
|
if len(curOpts.TrustedOperators) > 0 && len(curOpts.TrustedKeys) > 0 {
|
|
curOpts.TrustedKeys = nil
|
|
}
|
|
|
|
clientOrgPort := curOpts.Port
|
|
clusterOrgPort := curOpts.Cluster.Port
|
|
gatewayOrgPort := curOpts.Gateway.Port
|
|
leafnodesOrgPort := curOpts.LeafNode.Port
|
|
websocketOrgPort := curOpts.Websocket.Port
|
|
mqttOrgPort := curOpts.MQTT.Port
|
|
|
|
s.mu.Unlock()
|
|
|
|
// In case "-cluster ..." was provided through the command line, this will
|
|
// properly set the Cluster.Host/Port etc...
|
|
if l := curOpts.Cluster.ListenStr; l != _EMPTY_ {
|
|
newOpts.Cluster.ListenStr = l
|
|
overrideCluster(newOpts)
|
|
}
|
|
|
|
// Apply flags over config file settings.
|
|
newOpts = MergeOptions(newOpts, FlagSnapshot)
|
|
|
|
// Need more processing for boolean flags...
|
|
if FlagSnapshot != nil {
|
|
applyBoolFlags(newOpts, FlagSnapshot)
|
|
}
|
|
|
|
setBaselineOptions(newOpts)
|
|
|
|
// setBaselineOptions sets Port to 0 if set to -1 (RANDOM port)
|
|
// If that's the case, set it to the saved value when the accept loop was
|
|
// created.
|
|
if newOpts.Port == 0 {
|
|
newOpts.Port = clientOrgPort
|
|
}
|
|
// We don't do that for cluster, so check against -1.
|
|
if newOpts.Cluster.Port == -1 {
|
|
newOpts.Cluster.Port = clusterOrgPort
|
|
}
|
|
if newOpts.Gateway.Port == -1 {
|
|
newOpts.Gateway.Port = gatewayOrgPort
|
|
}
|
|
if newOpts.LeafNode.Port == -1 {
|
|
newOpts.LeafNode.Port = leafnodesOrgPort
|
|
}
|
|
if newOpts.Websocket.Port == -1 {
|
|
newOpts.Websocket.Port = websocketOrgPort
|
|
}
|
|
if newOpts.MQTT.Port == -1 {
|
|
newOpts.MQTT.Port = mqttOrgPort
|
|
}
|
|
|
|
if err := s.reloadOptions(curOpts, newOpts); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.recheckPinnedCerts(curOpts, newOpts)
|
|
|
|
s.mu.Lock()
|
|
s.configTime = time.Now().UTC()
|
|
s.updateVarzConfigReloadableFields(s.varz)
|
|
s.mu.Unlock()
|
|
return nil
|
|
}
|
|
func applyBoolFlags(newOpts, flagOpts *Options) {
|
|
// Reset fields that may have been set to `true` in
|
|
// MergeOptions() when some of the flags default to `true`
|
|
// but have not been explicitly set and therefore value
|
|
// from config file should take precedence.
|
|
for name, val := range newOpts.inConfig {
|
|
f := reflect.ValueOf(newOpts).Elem()
|
|
names := strings.Split(name, ".")
|
|
for _, name := range names {
|
|
f = f.FieldByName(name)
|
|
}
|
|
f.SetBool(val)
|
|
}
|
|
// Now apply value (true or false) from flags that have
|
|
// been explicitly set in command line
|
|
for name, val := range flagOpts.inCmdLine {
|
|
f := reflect.ValueOf(newOpts).Elem()
|
|
names := strings.Split(name, ".")
|
|
for _, name := range names {
|
|
f = f.FieldByName(name)
|
|
}
|
|
f.SetBool(val)
|
|
}
|
|
}
|
|
|
|
// reloadOptions reloads the server config with the provided options. If an
|
|
// option that doesn't support hot-swapping is changed, this returns an error.
|
|
func (s *Server) reloadOptions(curOpts, newOpts *Options) error {
|
|
// Apply to the new options some of the options that may have been set
|
|
// that can't be configured in the config file (this can happen in
|
|
// applications starting NATS Server programmatically).
|
|
newOpts.CustomClientAuthentication = curOpts.CustomClientAuthentication
|
|
newOpts.CustomRouterAuthentication = curOpts.CustomRouterAuthentication
|
|
|
|
changed, err := s.diffOptions(newOpts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(changed) != 0 {
|
|
if err := validateOptions(newOpts); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Create a context that is used to pass special info that we may need
|
|
// while applying the new options.
|
|
ctx := reloadContext{oldClusterPerms: curOpts.Cluster.Permissions}
|
|
s.setOpts(newOpts)
|
|
s.applyOptions(&ctx, changed)
|
|
return nil
|
|
}
|
|
|
|
// For the purpose of comparing, impose a order on slice data types where order does not matter
|
|
func imposeOrder(value interface{}) error {
|
|
switch value := value.(type) {
|
|
case []*Account:
|
|
sort.Slice(value, func(i, j int) bool {
|
|
return value[i].Name < value[j].Name
|
|
})
|
|
for _, a := range value {
|
|
sort.Slice(a.imports.streams, func(i, j int) bool {
|
|
return a.imports.streams[i].acc.Name < a.imports.streams[j].acc.Name
|
|
})
|
|
}
|
|
case []*User:
|
|
sort.Slice(value, func(i, j int) bool {
|
|
return value[i].Username < value[j].Username
|
|
})
|
|
case []*NkeyUser:
|
|
sort.Slice(value, func(i, j int) bool {
|
|
return value[i].Nkey < value[j].Nkey
|
|
})
|
|
case []*url.URL:
|
|
sort.Slice(value, func(i, j int) bool {
|
|
return value[i].String() < value[j].String()
|
|
})
|
|
case []string:
|
|
sort.Strings(value)
|
|
case []*jwt.OperatorClaims:
|
|
sort.Slice(value, func(i, j int) bool {
|
|
return value[i].Issuer < value[j].Issuer
|
|
})
|
|
case GatewayOpts:
|
|
sort.Slice(value.Gateways, func(i, j int) bool {
|
|
return value.Gateways[i].Name < value.Gateways[j].Name
|
|
})
|
|
case WebsocketOpts:
|
|
sort.Strings(value.AllowedOrigins)
|
|
case string, bool, uint8, int, int32, int64, time.Duration, float64, nil, LeafNodeOpts, ClusterOpts, *tls.Config, PinnedCertSet,
|
|
*URLAccResolver, *MemAccResolver, *DirAccResolver, *CacheDirAccResolver, Authentication, MQTTOpts, jwt.TagList,
|
|
*OCSPConfig, map[string]string, JSLimitOpts, StoreCipher, *OCSPResponseCacheConfig:
|
|
// explicitly skipped types
|
|
case *AuthCallout:
|
|
default:
|
|
// this will fail during unit tests
|
|
return fmt.Errorf("OnReload, sort or explicitly skip type: %s",
|
|
reflect.TypeOf(value))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// diffOptions returns a slice containing options which have been changed. If
|
|
// an option that doesn't support hot-swapping is changed, this returns an
|
|
// error.
|
|
func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
|
|
var (
|
|
oldConfig = reflect.ValueOf(s.getOpts()).Elem()
|
|
newConfig = reflect.ValueOf(newOpts).Elem()
|
|
diffOpts = []option{}
|
|
|
|
// Need to keep track of whether JS is being disabled
|
|
// to prevent changing limits at runtime.
|
|
jsEnabled = s.JetStreamEnabled()
|
|
disableJS bool
|
|
jsMemLimitsChanged bool
|
|
jsFileLimitsChanged bool
|
|
jsStoreDirChanged bool
|
|
)
|
|
for i := 0; i < oldConfig.NumField(); i++ {
|
|
field := oldConfig.Type().Field(i)
|
|
// field.PkgPath is empty for exported fields, and is not for unexported ones.
|
|
// We skip the unexported fields.
|
|
if field.PkgPath != _EMPTY_ {
|
|
continue
|
|
}
|
|
var (
|
|
oldValue = oldConfig.Field(i).Interface()
|
|
newValue = newConfig.Field(i).Interface()
|
|
)
|
|
if err := imposeOrder(oldValue); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := imposeOrder(newValue); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
optName := strings.ToLower(field.Name)
|
|
// accounts and users (referencing accounts) will always differ as accounts
|
|
// contain internal state, say locks etc..., so we don't bother here.
|
|
// This also avoids races with atomic stats counters
|
|
if optName != "accounts" && optName != "users" {
|
|
if changed := !reflect.DeepEqual(oldValue, newValue); !changed {
|
|
// Check to make sure we are running JetStream if we think we should be.
|
|
if optName == "jetstream" && newValue.(bool) {
|
|
if !jsEnabled {
|
|
diffOpts = append(diffOpts, &jetStreamOption{newValue: true})
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
switch optName {
|
|
case "traceverbose":
|
|
diffOpts = append(diffOpts, &traceVerboseOption{newValue: newValue.(bool)})
|
|
case "trace":
|
|
diffOpts = append(diffOpts, &traceOption{newValue: newValue.(bool)})
|
|
case "debug":
|
|
diffOpts = append(diffOpts, &debugOption{newValue: newValue.(bool)})
|
|
case "logtime":
|
|
diffOpts = append(diffOpts, &logtimeOption{newValue: newValue.(bool)})
|
|
case "logtimeutc":
|
|
diffOpts = append(diffOpts, &logtimeUTCOption{newValue: newValue.(bool)})
|
|
case "logfile":
|
|
diffOpts = append(diffOpts, &logfileOption{newValue: newValue.(string)})
|
|
case "syslog":
|
|
diffOpts = append(diffOpts, &syslogOption{newValue: newValue.(bool)})
|
|
case "remotesyslog":
|
|
diffOpts = append(diffOpts, &remoteSyslogOption{newValue: newValue.(string)})
|
|
case "tlsconfig":
|
|
diffOpts = append(diffOpts, &tlsOption{newValue: newValue.(*tls.Config)})
|
|
case "tlstimeout":
|
|
diffOpts = append(diffOpts, &tlsTimeoutOption{newValue: newValue.(float64)})
|
|
case "tlspinnedcerts":
|
|
diffOpts = append(diffOpts, &tlsPinnedCertOption{newValue: newValue.(PinnedCertSet)})
|
|
case "username":
|
|
diffOpts = append(diffOpts, &usernameOption{})
|
|
case "password":
|
|
diffOpts = append(diffOpts, &passwordOption{})
|
|
case "tags":
|
|
diffOpts = append(diffOpts, &tagsOption{})
|
|
case "authorization":
|
|
diffOpts = append(diffOpts, &authorizationOption{})
|
|
case "authtimeout":
|
|
diffOpts = append(diffOpts, &authTimeoutOption{newValue: newValue.(float64)})
|
|
case "users":
|
|
diffOpts = append(diffOpts, &usersOption{})
|
|
case "nkeys":
|
|
diffOpts = append(diffOpts, &nkeysOption{})
|
|
case "cluster":
|
|
newClusterOpts := newValue.(ClusterOpts)
|
|
oldClusterOpts := oldValue.(ClusterOpts)
|
|
if err := validateClusterOpts(oldClusterOpts, newClusterOpts); err != nil {
|
|
return nil, err
|
|
}
|
|
co := &clusterOption{
|
|
newValue: newClusterOpts,
|
|
permsChanged: !reflect.DeepEqual(newClusterOpts.Permissions, oldClusterOpts.Permissions),
|
|
compressChanged: !reflect.DeepEqual(oldClusterOpts.Compression, newClusterOpts.Compression),
|
|
}
|
|
co.diffPoolAndAccounts(&oldClusterOpts)
|
|
// If there are added accounts, first make sure that we can look them up.
|
|
// If we can't let's fail the reload.
|
|
for _, acc := range co.accsAdded {
|
|
if _, err := s.LookupAccount(acc); err != nil {
|
|
return nil, fmt.Errorf("unable to add account %q to the list of dedicated routes: %v", acc, err)
|
|
}
|
|
}
|
|
// If pool_size has been set to negative (but was not before), then let's
|
|
// add the system account to the list of removed accounts (we don't have
|
|
// to check if already there, duplicates are ok in that case).
|
|
if newClusterOpts.PoolSize < 0 && oldClusterOpts.PoolSize >= 0 {
|
|
if sys := s.SystemAccount(); sys != nil {
|
|
co.accsRemoved = append(co.accsRemoved, sys.GetName())
|
|
}
|
|
}
|
|
diffOpts = append(diffOpts, co)
|
|
case "routes":
|
|
add, remove := diffRoutes(oldValue.([]*url.URL), newValue.([]*url.URL))
|
|
diffOpts = append(diffOpts, &routesOption{add: add, remove: remove})
|
|
case "maxconn":
|
|
diffOpts = append(diffOpts, &maxConnOption{newValue: newValue.(int)})
|
|
case "pidfile":
|
|
diffOpts = append(diffOpts, &pidFileOption{newValue: newValue.(string)})
|
|
case "portsfiledir":
|
|
diffOpts = append(diffOpts, &portsFileDirOption{newValue: newValue.(string), oldValue: oldValue.(string)})
|
|
case "maxcontrolline":
|
|
diffOpts = append(diffOpts, &maxControlLineOption{newValue: newValue.(int32)})
|
|
case "maxpayload":
|
|
diffOpts = append(diffOpts, &maxPayloadOption{newValue: newValue.(int32)})
|
|
case "pinginterval":
|
|
diffOpts = append(diffOpts, &pingIntervalOption{newValue: newValue.(time.Duration)})
|
|
case "maxpingsout":
|
|
diffOpts = append(diffOpts, &maxPingsOutOption{newValue: newValue.(int)})
|
|
case "writedeadline":
|
|
diffOpts = append(diffOpts, &writeDeadlineOption{newValue: newValue.(time.Duration)})
|
|
case "clientadvertise":
|
|
cliAdv := newValue.(string)
|
|
if cliAdv != "" {
|
|
// Validate ClientAdvertise syntax
|
|
if _, _, err := parseHostPort(cliAdv, 0); err != nil {
|
|
return nil, fmt.Errorf("invalid ClientAdvertise value of %s, err=%v", cliAdv, err)
|
|
}
|
|
}
|
|
diffOpts = append(diffOpts, &clientAdvertiseOption{newValue: cliAdv})
|
|
case "accounts":
|
|
diffOpts = append(diffOpts, &accountsOption{})
|
|
case "resolver", "accountresolver", "accountsresolver":
|
|
// We can't move from no resolver to one. So check for that.
|
|
if (oldValue == nil && newValue != nil) ||
|
|
(oldValue != nil && newValue == nil) {
|
|
return nil, fmt.Errorf("config reload does not support moving to or from an account resolver")
|
|
}
|
|
diffOpts = append(diffOpts, &accountsOption{})
|
|
case "accountresolvertlsconfig":
|
|
diffOpts = append(diffOpts, &accountsOption{})
|
|
case "gateway":
|
|
// Not supported for now, but report warning if configuration of gateway
|
|
// is actually changed so that user knows that it won't take effect.
|
|
|
|
// Any deep-equal is likely to fail for when there is a TLSConfig. so
|
|
// remove for the test.
|
|
tmpOld := oldValue.(GatewayOpts)
|
|
tmpNew := newValue.(GatewayOpts)
|
|
tmpOld.TLSConfig = nil
|
|
tmpNew.TLSConfig = nil
|
|
tmpOld.tlsConfigOpts = nil
|
|
tmpNew.tlsConfigOpts = nil
|
|
|
|
// Need to do the same for remote gateways' TLS configs.
|
|
// But we can't just set remotes' TLSConfig to nil otherwise this
|
|
// would lose the real TLS configuration.
|
|
tmpOld.Gateways = copyRemoteGWConfigsWithoutTLSConfig(tmpOld.Gateways)
|
|
tmpNew.Gateways = copyRemoteGWConfigsWithoutTLSConfig(tmpNew.Gateways)
|
|
|
|
// If there is really a change prevents reload.
|
|
if !reflect.DeepEqual(tmpOld, tmpNew) {
|
|
// See TODO(ik) note below about printing old/new values.
|
|
return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
|
|
field.Name, oldValue, newValue)
|
|
}
|
|
case "leafnode":
|
|
// Similar to gateways
|
|
tmpOld := oldValue.(LeafNodeOpts)
|
|
tmpNew := newValue.(LeafNodeOpts)
|
|
tmpOld.TLSConfig = nil
|
|
tmpNew.TLSConfig = nil
|
|
tmpOld.tlsConfigOpts = nil
|
|
tmpNew.tlsConfigOpts = nil
|
|
// We will allow TLSHandshakeFirst to me config reloaded. First,
|
|
// we just want to detect if there was a change in the leafnodes{}
|
|
// block, and if not, we will check the remotes.
|
|
handshakeFirstChanged := tmpOld.TLSHandshakeFirst != tmpNew.TLSHandshakeFirst
|
|
// If changed, set them (in the temporary variables) to false so that the
|
|
// rest of the comparison does not fail.
|
|
if handshakeFirstChanged {
|
|
tmpOld.TLSHandshakeFirst, tmpNew.TLSHandshakeFirst = false, false
|
|
} else if len(tmpOld.Remotes) == len(tmpNew.Remotes) {
|
|
// Since we don't support changes in the remotes, we will do a
|
|
// simple pass to see if there was a change of this field.
|
|
for i := 0; i < len(tmpOld.Remotes); i++ {
|
|
if tmpOld.Remotes[i].TLSHandshakeFirst != tmpNew.Remotes[i].TLSHandshakeFirst {
|
|
handshakeFirstChanged = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
// We also support config reload for compression. Check if it changed before
|
|
// blanking them out for the deep-equal check at the end.
|
|
compressionChanged := !reflect.DeepEqual(tmpOld.Compression, tmpNew.Compression)
|
|
if compressionChanged {
|
|
tmpOld.Compression, tmpNew.Compression = CompressionOpts{}, CompressionOpts{}
|
|
} else if len(tmpOld.Remotes) == len(tmpNew.Remotes) {
|
|
// Same that for tls first check, do the remotes now.
|
|
for i := 0; i < len(tmpOld.Remotes); i++ {
|
|
if !reflect.DeepEqual(tmpOld.Remotes[i].Compression, tmpNew.Remotes[i].Compression) {
|
|
compressionChanged = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Need to do the same for remote leafnodes' TLS configs.
|
|
// But we can't just set remotes' TLSConfig to nil otherwise this
|
|
// would lose the real TLS configuration.
|
|
tmpOld.Remotes = copyRemoteLNConfigForReloadCompare(tmpOld.Remotes)
|
|
tmpNew.Remotes = copyRemoteLNConfigForReloadCompare(tmpNew.Remotes)
|
|
|
|
// Special check for leafnode remotes changes which are not supported right now.
|
|
leafRemotesChanged := func(a, b LeafNodeOpts) bool {
|
|
if len(a.Remotes) != len(b.Remotes) {
|
|
return true
|
|
}
|
|
|
|
// Check whether all remotes URLs are still the same.
|
|
for _, oldRemote := range a.Remotes {
|
|
var found bool
|
|
|
|
if oldRemote.LocalAccount == _EMPTY_ {
|
|
oldRemote.LocalAccount = globalAccountName
|
|
}
|
|
|
|
for _, newRemote := range b.Remotes {
|
|
// Bind to global account in case not defined.
|
|
if newRemote.LocalAccount == _EMPTY_ {
|
|
newRemote.LocalAccount = globalAccountName
|
|
}
|
|
|
|
if reflect.DeepEqual(oldRemote, newRemote) {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// First check whether remotes changed at all. If they did not,
|
|
// skip them in the complete equal check.
|
|
if !leafRemotesChanged(tmpOld, tmpNew) {
|
|
tmpOld.Remotes = nil
|
|
tmpNew.Remotes = nil
|
|
}
|
|
|
|
// Special check for auth users to detect changes.
|
|
// If anything is off will fall through and fail below.
|
|
// If we detect they are semantically the same we nil them out
|
|
// to pass the check below.
|
|
if tmpOld.Users != nil || tmpNew.Users != nil {
|
|
if len(tmpOld.Users) == len(tmpNew.Users) {
|
|
oua := make(map[string]*User, len(tmpOld.Users))
|
|
nua := make(map[string]*User, len(tmpOld.Users))
|
|
for _, u := range tmpOld.Users {
|
|
oua[u.Username] = u
|
|
}
|
|
for _, u := range tmpNew.Users {
|
|
nua[u.Username] = u
|
|
}
|
|
same := true
|
|
for uname, u := range oua {
|
|
// If we can not find new one with same name, drop through to fail.
|
|
nu, ok := nua[uname]
|
|
if !ok {
|
|
same = false
|
|
break
|
|
}
|
|
// If username or password or account different break.
|
|
if u.Username != nu.Username || u.Password != nu.Password || u.Account.GetName() != nu.Account.GetName() {
|
|
same = false
|
|
break
|
|
}
|
|
}
|
|
// We can nil out here.
|
|
if same {
|
|
tmpOld.Users, tmpNew.Users = nil, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// If there is really a change prevents reload.
|
|
if !reflect.DeepEqual(tmpOld, tmpNew) {
|
|
// See TODO(ik) note below about printing old/new values.
|
|
return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
|
|
field.Name, oldValue, newValue)
|
|
}
|
|
|
|
diffOpts = append(diffOpts, &leafNodeOption{
|
|
tlsFirstChanged: handshakeFirstChanged,
|
|
compressionChanged: compressionChanged,
|
|
})
|
|
case "jetstream":
|
|
new := newValue.(bool)
|
|
old := oldValue.(bool)
|
|
if new != old {
|
|
diffOpts = append(diffOpts, &jetStreamOption{newValue: new})
|
|
}
|
|
|
|
// Mark whether JS will be disabled.
|
|
disableJS = !new
|
|
case "storedir":
|
|
new := newValue.(string)
|
|
old := oldValue.(string)
|
|
modified := new != old
|
|
|
|
// Check whether JS is being disabled and/or storage dir attempted to change.
|
|
if jsEnabled && modified {
|
|
if new == _EMPTY_ {
|
|
// This means that either JS is being disabled or it is using an temp dir.
|
|
// Allow the change but error in case JS was not disabled.
|
|
jsStoreDirChanged = true
|
|
} else {
|
|
return nil, fmt.Errorf("config reload not supported for jetstream storage directory")
|
|
}
|
|
}
|
|
case "jetstreammaxmemory", "jetstreammaxstore":
|
|
old := oldValue.(int64)
|
|
new := newValue.(int64)
|
|
|
|
// Check whether JS is being disabled and/or limits are being changed.
|
|
var (
|
|
modified = new != old
|
|
fromUnset = old == -1
|
|
fromSet = !fromUnset
|
|
toUnset = new == -1
|
|
toSet = !toUnset
|
|
)
|
|
if jsEnabled && modified {
|
|
// Cannot change limits from dynamic storage at runtime.
|
|
switch {
|
|
case fromSet && toUnset:
|
|
// Limits changed but it may mean that JS is being disabled,
|
|
// keep track of the change and error in case it is not.
|
|
switch optName {
|
|
case "jetstreammaxmemory":
|
|
jsMemLimitsChanged = true
|
|
case "jetstreammaxstore":
|
|
jsFileLimitsChanged = true
|
|
default:
|
|
return nil, fmt.Errorf("config reload not supported for jetstream max memory and store")
|
|
}
|
|
case fromUnset && toSet:
|
|
// Prevent changing from dynamic max memory / file at runtime.
|
|
return nil, fmt.Errorf("config reload not supported for jetstream dynamic max memory and store")
|
|
default:
|
|
return nil, fmt.Errorf("config reload not supported for jetstream max memory and store")
|
|
}
|
|
}
|
|
case "websocket":
|
|
// Similar to gateways
|
|
tmpOld := oldValue.(WebsocketOpts)
|
|
tmpNew := newValue.(WebsocketOpts)
|
|
tmpOld.TLSConfig, tmpOld.tlsConfigOpts = nil, nil
|
|
tmpNew.TLSConfig, tmpNew.tlsConfigOpts = nil, nil
|
|
// If there is really a change prevents reload.
|
|
if !reflect.DeepEqual(tmpOld, tmpNew) {
|
|
// See TODO(ik) note below about printing old/new values.
|
|
return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
|
|
field.Name, oldValue, newValue)
|
|
}
|
|
case "mqtt":
|
|
diffOpts = append(diffOpts, &mqttAckWaitReload{newValue: newValue.(MQTTOpts).AckWait})
|
|
diffOpts = append(diffOpts, &mqttMaxAckPendingReload{newValue: newValue.(MQTTOpts).MaxAckPending})
|
|
diffOpts = append(diffOpts, &mqttStreamReplicasReload{newValue: newValue.(MQTTOpts).StreamReplicas})
|
|
diffOpts = append(diffOpts, &mqttConsumerReplicasReload{newValue: newValue.(MQTTOpts).ConsumerReplicas})
|
|
diffOpts = append(diffOpts, &mqttConsumerMemoryStorageReload{newValue: newValue.(MQTTOpts).ConsumerMemoryStorage})
|
|
diffOpts = append(diffOpts, &mqttInactiveThresholdReload{newValue: newValue.(MQTTOpts).ConsumerInactiveThreshold})
|
|
|
|
// Nil out/set to 0 the options that we allow to be reloaded so that
|
|
// we only fail reload if some that we don't support are changed.
|
|
tmpOld := oldValue.(MQTTOpts)
|
|
tmpNew := newValue.(MQTTOpts)
|
|
tmpOld.TLSConfig, tmpOld.tlsConfigOpts, tmpOld.AckWait, tmpOld.MaxAckPending, tmpOld.StreamReplicas, tmpOld.ConsumerReplicas, tmpOld.ConsumerMemoryStorage = nil, nil, 0, 0, 0, 0, false
|
|
tmpOld.ConsumerInactiveThreshold = 0
|
|
tmpNew.TLSConfig, tmpNew.tlsConfigOpts, tmpNew.AckWait, tmpNew.MaxAckPending, tmpNew.StreamReplicas, tmpNew.ConsumerReplicas, tmpNew.ConsumerMemoryStorage = nil, nil, 0, 0, 0, 0, false
|
|
tmpNew.ConsumerInactiveThreshold = 0
|
|
|
|
if !reflect.DeepEqual(tmpOld, tmpNew) {
|
|
// See TODO(ik) note below about printing old/new values.
|
|
return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
|
|
field.Name, oldValue, newValue)
|
|
}
|
|
tmpNew.AckWait = newValue.(MQTTOpts).AckWait
|
|
tmpNew.MaxAckPending = newValue.(MQTTOpts).MaxAckPending
|
|
tmpNew.StreamReplicas = newValue.(MQTTOpts).StreamReplicas
|
|
tmpNew.ConsumerReplicas = newValue.(MQTTOpts).ConsumerReplicas
|
|
tmpNew.ConsumerMemoryStorage = newValue.(MQTTOpts).ConsumerMemoryStorage
|
|
tmpNew.ConsumerInactiveThreshold = newValue.(MQTTOpts).ConsumerInactiveThreshold
|
|
case "connecterrorreports":
|
|
diffOpts = append(diffOpts, &connectErrorReports{newValue: newValue.(int)})
|
|
case "reconnecterrorreports":
|
|
diffOpts = append(diffOpts, &reconnectErrorReports{newValue: newValue.(int)})
|
|
case "nolog", "nosigs":
|
|
// Ignore NoLog and NoSigs options since they are not parsed and only used in
|
|
// testing.
|
|
continue
|
|
case "disableshortfirstping":
|
|
newOpts.DisableShortFirstPing = oldValue.(bool)
|
|
continue
|
|
case "maxtracedmsglen":
|
|
diffOpts = append(diffOpts, &maxTracedMsgLenOption{newValue: newValue.(int)})
|
|
case "port":
|
|
// check to see if newValue == 0 and continue if so.
|
|
if newValue == 0 {
|
|
// ignore RANDOM_PORT
|
|
continue
|
|
}
|
|
fallthrough
|
|
case "noauthuser":
|
|
if oldValue != _EMPTY_ && newValue == _EMPTY_ {
|
|
for _, user := range newOpts.Users {
|
|
if user.Username == oldValue {
|
|
return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
|
|
field.Name, oldValue, newValue)
|
|
}
|
|
}
|
|
} else {
|
|
return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
|
|
field.Name, oldValue, newValue)
|
|
}
|
|
case "systemaccount":
|
|
if oldValue != DEFAULT_SYSTEM_ACCOUNT || newValue != _EMPTY_ {
|
|
return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
|
|
field.Name, oldValue, newValue)
|
|
}
|
|
case "ocspconfig":
|
|
diffOpts = append(diffOpts, &ocspOption{newValue: newValue.(*OCSPConfig)})
|
|
case "ocspcacheconfig":
|
|
diffOpts = append(diffOpts, &ocspResponseCacheOption{newValue: newValue.(*OCSPResponseCacheConfig)})
|
|
case "profblockrate":
|
|
new := newValue.(int)
|
|
old := oldValue.(int)
|
|
if new != old {
|
|
diffOpts = append(diffOpts, &profBlockRateReload{newValue: new})
|
|
}
|
|
default:
|
|
// TODO(ik): Implement String() on those options to have a nice print.
|
|
// %v is difficult to figure what's what, %+v print private fields and
|
|
// would print passwords. Tried json.Marshal but it is too verbose for
|
|
// the URL array.
|
|
|
|
// Bail out if attempting to reload any unsupported options.
|
|
return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
|
|
field.Name, oldValue, newValue)
|
|
}
|
|
}
|
|
|
|
// If not disabling JS but limits have changed then it is an error.
|
|
if !disableJS {
|
|
if jsMemLimitsChanged || jsFileLimitsChanged {
|
|
return nil, fmt.Errorf("config reload not supported for jetstream max memory and max store")
|
|
}
|
|
if jsStoreDirChanged {
|
|
return nil, fmt.Errorf("config reload not supported for jetstream storage dir")
|
|
}
|
|
}
|
|
|
|
return diffOpts, nil
|
|
}
|
|
|
|
func copyRemoteGWConfigsWithoutTLSConfig(current []*RemoteGatewayOpts) []*RemoteGatewayOpts {
|
|
l := len(current)
|
|
if l == 0 {
|
|
return nil
|
|
}
|
|
rgws := make([]*RemoteGatewayOpts, 0, l)
|
|
for _, rcfg := range current {
|
|
cp := *rcfg
|
|
cp.TLSConfig = nil
|
|
cp.tlsConfigOpts = nil
|
|
rgws = append(rgws, &cp)
|
|
}
|
|
return rgws
|
|
}
|
|
|
|
func copyRemoteLNConfigForReloadCompare(current []*RemoteLeafOpts) []*RemoteLeafOpts {
|
|
l := len(current)
|
|
if l == 0 {
|
|
return nil
|
|
}
|
|
rlns := make([]*RemoteLeafOpts, 0, l)
|
|
for _, rcfg := range current {
|
|
cp := *rcfg
|
|
cp.TLSConfig = nil
|
|
cp.tlsConfigOpts = nil
|
|
cp.TLSHandshakeFirst = false
|
|
// This is set only when processing a CONNECT, so reset here so that we
|
|
// don't fail the DeepEqual comparison.
|
|
cp.TLS = false
|
|
// For now, remove DenyImports/Exports since those get modified at runtime
|
|
// to add JS APIs.
|
|
cp.DenyImports, cp.DenyExports = nil, nil
|
|
// Remove compression mode
|
|
cp.Compression = CompressionOpts{}
|
|
rlns = append(rlns, &cp)
|
|
}
|
|
return rlns
|
|
}
|
|
|
|
func (s *Server) applyOptions(ctx *reloadContext, opts []option) {
|
|
var (
|
|
reloadLogging = false
|
|
reloadAuth = false
|
|
reloadClusterPerms = false
|
|
reloadClientTrcLvl = false
|
|
reloadJetstream = false
|
|
jsEnabled = false
|
|
reloadTLS = false
|
|
isStatszChange = false
|
|
co *clusterOption
|
|
)
|
|
for _, opt := range opts {
|
|
opt.Apply(s)
|
|
if opt.IsLoggingChange() {
|
|
reloadLogging = true
|
|
}
|
|
if opt.IsTraceLevelChange() {
|
|
reloadClientTrcLvl = true
|
|
}
|
|
if opt.IsAuthChange() {
|
|
reloadAuth = true
|
|
}
|
|
if opt.IsTLSChange() {
|
|
reloadTLS = true
|
|
}
|
|
if opt.IsClusterPoolSizeOrAccountsChange() {
|
|
co = opt.(*clusterOption)
|
|
}
|
|
if opt.IsClusterPermsChange() {
|
|
reloadClusterPerms = true
|
|
}
|
|
if opt.IsJetStreamChange() {
|
|
reloadJetstream = true
|
|
jsEnabled = opt.(*jetStreamOption).newValue
|
|
}
|
|
if opt.IsStatszChange() {
|
|
isStatszChange = true
|
|
}
|
|
}
|
|
|
|
if reloadLogging {
|
|
s.ConfigureLogger()
|
|
}
|
|
if reloadClientTrcLvl {
|
|
s.reloadClientTraceLevel()
|
|
}
|
|
if reloadAuth {
|
|
s.reloadAuthorization()
|
|
}
|
|
if reloadClusterPerms {
|
|
s.reloadClusterPermissions(ctx.oldClusterPerms)
|
|
}
|
|
newOpts := s.getOpts()
|
|
// If we need to reload cluster pool/per-account, then co will be not nil
|
|
if co != nil {
|
|
s.reloadClusterPoolAndAccounts(co, newOpts)
|
|
}
|
|
if reloadJetstream {
|
|
if !jsEnabled {
|
|
s.DisableJetStream()
|
|
} else if !s.JetStreamEnabled() {
|
|
if err := s.restartJetStream(); err != nil {
|
|
s.Warnf("Can't start JetStream: %v", err)
|
|
}
|
|
}
|
|
// Make sure to reset the internal loop's version of JS.
|
|
s.resetInternalLoopInfo()
|
|
}
|
|
if isStatszChange {
|
|
s.sendStatszUpdate()
|
|
}
|
|
|
|
// For remote gateways and leafnodes, make sure that their TLS configuration
|
|
// is updated (since the config is "captured" early and changes would otherwise
|
|
// not be visible).
|
|
if s.gateway.enabled {
|
|
s.gateway.updateRemotesTLSConfig(newOpts)
|
|
}
|
|
if len(newOpts.LeafNode.Remotes) > 0 {
|
|
s.updateRemoteLeafNodesTLSConfig(newOpts)
|
|
}
|
|
|
|
// This will fire if TLS enabled at root (NATS listener) -or- if ocsp or ocsp_cache
|
|
// appear in the config.
|
|
if reloadTLS {
|
|
// Restart OCSP monitoring.
|
|
if err := s.reloadOCSP(); err != nil {
|
|
s.Warnf("Can't restart OCSP features: %v", err)
|
|
}
|
|
}
|
|
|
|
s.Noticef("Reloaded server configuration")
|
|
}
|
|
|
|
// This will send a reset to the internal send loop.
|
|
func (s *Server) resetInternalLoopInfo() {
|
|
var resetCh chan struct{}
|
|
s.mu.Lock()
|
|
if s.sys != nil {
|
|
// can't hold the lock as go routine reading it may be waiting for lock as well
|
|
resetCh = s.sys.resetCh
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
if resetCh != nil {
|
|
resetCh <- struct{}{}
|
|
}
|
|
}
|
|
|
|
// Update all cached debug and trace settings for every client
|
|
func (s *Server) reloadClientTraceLevel() {
|
|
opts := s.getOpts()
|
|
|
|
if opts.NoLog {
|
|
return
|
|
}
|
|
|
|
// Create a list of all clients.
|
|
// Update their trace level when not holding server or gateway lock
|
|
|
|
s.mu.Lock()
|
|
clientCnt := 1 + len(s.clients) + len(s.grTmpClients) + s.numRoutes() + len(s.leafs)
|
|
s.mu.Unlock()
|
|
|
|
s.gateway.RLock()
|
|
clientCnt += len(s.gateway.in) + len(s.gateway.outo)
|
|
s.gateway.RUnlock()
|
|
|
|
clients := make([]*client, 0, clientCnt)
|
|
|
|
s.mu.Lock()
|
|
if s.eventsEnabled() {
|
|
clients = append(clients, s.sys.client)
|
|
}
|
|
|
|
cMaps := []map[uint64]*client{s.clients, s.grTmpClients, s.leafs}
|
|
for _, m := range cMaps {
|
|
for _, c := range m {
|
|
clients = append(clients, c)
|
|
}
|
|
}
|
|
s.forEachRoute(func(c *client) {
|
|
clients = append(clients, c)
|
|
})
|
|
s.mu.Unlock()
|
|
|
|
s.gateway.RLock()
|
|
for _, c := range s.gateway.in {
|
|
clients = append(clients, c)
|
|
}
|
|
clients = append(clients, s.gateway.outo...)
|
|
s.gateway.RUnlock()
|
|
|
|
for _, c := range clients {
|
|
// client.trace is commonly read while holding the lock
|
|
c.mu.Lock()
|
|
c.setTraceLevel()
|
|
c.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
// reloadAuthorization reconfigures the server authorization settings,
|
|
// disconnects any clients who are no longer authorized, and removes any
|
|
// unauthorized subscriptions.
|
|
func (s *Server) reloadAuthorization() {
|
|
// This map will contain the names of accounts that have their streams
|
|
// import configuration changed.
|
|
var awcsti map[string]struct{}
|
|
checkJetStream := false
|
|
opts := s.getOpts()
|
|
s.mu.Lock()
|
|
|
|
deletedAccounts := make(map[string]*Account)
|
|
|
|
// This can not be changed for now so ok to check server's trustedKeys unlocked.
|
|
// If plain configured accounts, process here.
|
|
if s.trustedKeys == nil {
|
|
// Make a map of the configured account names so we figure out the accounts
|
|
// that should be removed later on.
|
|
configAccs := make(map[string]struct{}, len(opts.Accounts))
|
|
for _, acc := range opts.Accounts {
|
|
configAccs[acc.GetName()] = struct{}{}
|
|
}
|
|
// Now range over existing accounts and keep track of the ones deleted
|
|
// so some cleanup can be made after releasing the server lock.
|
|
s.accounts.Range(func(k, v interface{}) bool {
|
|
an, acc := k.(string), v.(*Account)
|
|
// Exclude default and system account from this test since those
|
|
// may not actually be in opts.Accounts.
|
|
if an == DEFAULT_GLOBAL_ACCOUNT || an == DEFAULT_SYSTEM_ACCOUNT {
|
|
return true
|
|
}
|
|
// Check check if existing account is still in opts.Accounts.
|
|
if _, ok := configAccs[an]; !ok {
|
|
deletedAccounts[an] = acc
|
|
s.accounts.Delete(k)
|
|
}
|
|
return true
|
|
})
|
|
// This will update existing and add new ones.
|
|
awcsti, _ = s.configureAccounts(true)
|
|
s.configureAuthorization()
|
|
// Double check any JetStream configs.
|
|
checkJetStream = s.getJetStream() != nil
|
|
} else if opts.AccountResolver != nil {
|
|
s.configureResolver()
|
|
if _, ok := s.accResolver.(*MemAccResolver); ok {
|
|
// Check preloads so we can issue warnings etc if needed.
|
|
s.checkResolvePreloads()
|
|
// With a memory resolver we want to do something similar to configured accounts.
|
|
// We will walk the accounts and delete them if they are no longer present via fetch.
|
|
// If they are present we will force a claim update to process changes.
|
|
s.accounts.Range(func(k, v interface{}) bool {
|
|
acc := v.(*Account)
|
|
// Skip global account.
|
|
if acc == s.gacc {
|
|
return true
|
|
}
|
|
accName := acc.GetName()
|
|
// Release server lock for following actions
|
|
s.mu.Unlock()
|
|
accClaims, claimJWT, _ := s.fetchAccountClaims(accName)
|
|
if accClaims != nil {
|
|
if err := s.updateAccountWithClaimJWT(acc, claimJWT); err != nil {
|
|
s.Noticef("Reloaded: deleting account [bad claims]: %q", accName)
|
|
s.accounts.Delete(k)
|
|
}
|
|
} else {
|
|
s.Noticef("Reloaded: deleting account [removed]: %q", accName)
|
|
s.accounts.Delete(k)
|
|
}
|
|
// Regrab server lock.
|
|
s.mu.Lock()
|
|
return true
|
|
})
|
|
}
|
|
}
|
|
|
|
var (
|
|
cclientsa [64]*client
|
|
cclients = cclientsa[:0]
|
|
clientsa [64]*client
|
|
clients = clientsa[:0]
|
|
routesa [64]*client
|
|
routes = routesa[:0]
|
|
)
|
|
|
|
// Gather clients that changed accounts. We will close them and they
|
|
// will reconnect, doing the right thing.
|
|
for _, client := range s.clients {
|
|
if s.clientHasMovedToDifferentAccount(client) {
|
|
cclients = append(cclients, client)
|
|
} else {
|
|
clients = append(clients, client)
|
|
}
|
|
}
|
|
s.forEachRoute(func(route *client) {
|
|
routes = append(routes, route)
|
|
})
|
|
// Check here for any system/internal clients which will not be in the servers map of normal clients.
|
|
if s.sys != nil && s.sys.account != nil && !opts.NoSystemAccount {
|
|
s.accounts.Store(s.sys.account.Name, s.sys.account)
|
|
}
|
|
|
|
s.accounts.Range(func(k, v interface{}) bool {
|
|
acc := v.(*Account)
|
|
acc.mu.RLock()
|
|
// Check for sysclients accounting, ignore the system account.
|
|
if acc.sysclients > 0 && (s.sys == nil || s.sys.account != acc) {
|
|
for c := range acc.clients {
|
|
if c.kind != CLIENT && c.kind != LEAF {
|
|
clients = append(clients, c)
|
|
}
|
|
}
|
|
}
|
|
acc.mu.RUnlock()
|
|
return true
|
|
})
|
|
|
|
var resetCh chan struct{}
|
|
if s.sys != nil {
|
|
// can't hold the lock as go routine reading it may be waiting for lock as well
|
|
resetCh = s.sys.resetCh
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
// Clear some timers and remove service import subs for deleted accounts.
|
|
for _, acc := range deletedAccounts {
|
|
acc.mu.Lock()
|
|
clearTimer(&acc.etmr)
|
|
clearTimer(&acc.ctmr)
|
|
for _, se := range acc.exports.services {
|
|
se.clearResponseThresholdTimer()
|
|
}
|
|
acc.mu.Unlock()
|
|
acc.removeAllServiceImportSubs()
|
|
}
|
|
|
|
if resetCh != nil {
|
|
resetCh <- struct{}{}
|
|
}
|
|
|
|
// Check that publish retained messages sources are still allowed to publish.
|
|
s.mqttCheckPubRetainedPerms()
|
|
|
|
// Close clients that have moved accounts
|
|
for _, client := range cclients {
|
|
client.closeConnection(ClientClosed)
|
|
}
|
|
|
|
for _, c := range clients {
|
|
// Disconnect any unauthorized clients.
|
|
// Ignore internal clients.
|
|
if (c.kind == CLIENT || c.kind == LEAF) && !s.isClientAuthorized(c) {
|
|
c.authViolation()
|
|
continue
|
|
}
|
|
// Check to make sure account is correct.
|
|
c.swapAccountAfterReload()
|
|
// Remove any unauthorized subscriptions and check for account imports.
|
|
c.processSubsOnConfigReload(awcsti)
|
|
}
|
|
|
|
for _, route := range routes {
|
|
// Disconnect any unauthorized routes.
|
|
// Do this only for routes that were accepted, not initiated
|
|
// because in the later case, we don't have the user name/password
|
|
// of the remote server.
|
|
if !route.isSolicitedRoute() && !s.isRouterAuthorized(route) {
|
|
route.setNoReconnect()
|
|
route.authViolation()
|
|
}
|
|
}
|
|
|
|
if res := s.AccountResolver(); res != nil {
|
|
res.Reload()
|
|
}
|
|
|
|
// We will double check all JetStream configs on a reload.
|
|
if checkJetStream {
|
|
if err := s.enableJetStreamAccounts(); err != nil {
|
|
s.Errorf(err.Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
// Returns true if given client current account has changed (or user
|
|
// no longer exist) in the new config, false if the user did not
|
|
// change accounts.
|
|
// Server lock is held on entry.
|
|
func (s *Server) clientHasMovedToDifferentAccount(c *client) bool {
|
|
var (
|
|
nu *NkeyUser
|
|
u *User
|
|
)
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if c.opts.Nkey != _EMPTY_ {
|
|
if s.nkeys != nil {
|
|
nu = s.nkeys[c.opts.Nkey]
|
|
}
|
|
} else if c.opts.Username != _EMPTY_ {
|
|
if s.users != nil {
|
|
u = s.users[c.opts.Username]
|
|
}
|
|
} else {
|
|
return false
|
|
}
|
|
// Get the current account name
|
|
var curAccName string
|
|
if c.acc != nil {
|
|
curAccName = c.acc.Name
|
|
}
|
|
if nu != nil && nu.Account != nil {
|
|
return curAccName != nu.Account.Name
|
|
} else if u != nil && u.Account != nil {
|
|
return curAccName != u.Account.Name
|
|
}
|
|
// user/nkey no longer exists.
|
|
return true
|
|
}
|
|
|
|
// reloadClusterPermissions reconfigures the cluster's permssions
|
|
// and set the permissions to all existing routes, sending an
|
|
// update INFO protocol so that remote can resend their local
|
|
// subs if needed, and sending local subs matching cluster's
|
|
// import subjects.
|
|
func (s *Server) reloadClusterPermissions(oldPerms *RoutePermissions) {
|
|
s.mu.Lock()
|
|
newPerms := s.getOpts().Cluster.Permissions
|
|
routes := make(map[uint64]*client, s.numRoutes())
|
|
// Get all connected routes
|
|
s.forEachRoute(func(route *client) {
|
|
route.mu.Lock()
|
|
routes[route.cid] = route
|
|
route.mu.Unlock()
|
|
})
|
|
// If new permissions is nil, then clear routeInfo import/export
|
|
if newPerms == nil {
|
|
s.routeInfo.Import = nil
|
|
s.routeInfo.Export = nil
|
|
} else {
|
|
s.routeInfo.Import = newPerms.Import
|
|
s.routeInfo.Export = newPerms.Export
|
|
}
|
|
infoJSON := generateInfoJSON(&s.routeInfo)
|
|
s.mu.Unlock()
|
|
|
|
// Close connections for routes that don't understand async INFO.
|
|
for _, route := range routes {
|
|
route.mu.Lock()
|
|
close := route.opts.Protocol < RouteProtoInfo
|
|
cid := route.cid
|
|
route.mu.Unlock()
|
|
if close {
|
|
route.closeConnection(RouteRemoved)
|
|
delete(routes, cid)
|
|
}
|
|
}
|
|
|
|
// If there are no route left, we are done
|
|
if len(routes) == 0 {
|
|
return
|
|
}
|
|
|
|
// Fake clients to test cluster permissions
|
|
oldPermsTester := &client{}
|
|
oldPermsTester.setRoutePermissions(oldPerms)
|
|
newPermsTester := &client{}
|
|
newPermsTester.setRoutePermissions(newPerms)
|
|
|
|
var (
|
|
_localSubs [4096]*subscription
|
|
subsNeedSUB = map[*client][]*subscription{}
|
|
subsNeedUNSUB = map[*client][]*subscription{}
|
|
deleteRoutedSubs []*subscription
|
|
)
|
|
|
|
getRouteForAccount := func(accName string, poolIdx int) *client {
|
|
for _, r := range routes {
|
|
r.mu.Lock()
|
|
ok := (poolIdx >= 0 && poolIdx == r.route.poolIdx) || (string(r.route.accName) == accName) || r.route.noPool
|
|
r.mu.Unlock()
|
|
if ok {
|
|
return r
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// First set the new permissions on all routes.
|
|
for _, route := range routes {
|
|
route.mu.Lock()
|
|
route.setRoutePermissions(newPerms)
|
|
route.mu.Unlock()
|
|
}
|
|
|
|
// Then, go over all accounts and gather local subscriptions that need to be
|
|
// sent over as SUB or removed as UNSUB, and routed subscriptions that need
|
|
// to be dropped due to export permissions.
|
|
s.accounts.Range(func(_, v interface{}) bool {
|
|
acc := v.(*Account)
|
|
acc.mu.RLock()
|
|
accName, sl, poolIdx := acc.Name, acc.sl, acc.routePoolIdx
|
|
acc.mu.RUnlock()
|
|
// Get the route handling this account. If no route or sublist, bail out.
|
|
route := getRouteForAccount(accName, poolIdx)
|
|
if route == nil || sl == nil {
|
|
return true
|
|
}
|
|
localSubs := _localSubs[:0]
|
|
sl.localSubs(&localSubs, false)
|
|
|
|
// Go through all local subscriptions
|
|
for _, sub := range localSubs {
|
|
// Get all subs that can now be imported
|
|
subj := string(sub.subject)
|
|
couldImportThen := oldPermsTester.canImport(subj)
|
|
canImportNow := newPermsTester.canImport(subj)
|
|
if canImportNow {
|
|
// If we could not before, then will need to send a SUB protocol.
|
|
if !couldImportThen {
|
|
subsNeedSUB[route] = append(subsNeedSUB[route], sub)
|
|
}
|
|
} else if couldImportThen {
|
|
// We were previously able to import this sub, but now
|
|
// we can't so we need to send an UNSUB protocol
|
|
subsNeedUNSUB[route] = append(subsNeedUNSUB[route], sub)
|
|
}
|
|
}
|
|
deleteRoutedSubs = deleteRoutedSubs[:0]
|
|
route.mu.Lock()
|
|
for key, sub := range route.subs {
|
|
if an := strings.Fields(key)[0]; an != accName {
|
|
continue
|
|
}
|
|
// If we can't export, we need to drop the subscriptions that
|
|
// we have on behalf of this route.
|
|
subj := string(sub.subject)
|
|
if !route.canExport(subj) {
|
|
delete(route.subs, string(sub.sid))
|
|
deleteRoutedSubs = append(deleteRoutedSubs, sub)
|
|
}
|
|
}
|
|
route.mu.Unlock()
|
|
// Remove as a batch all the subs that we have removed from each route.
|
|
sl.RemoveBatch(deleteRoutedSubs)
|
|
return true
|
|
})
|
|
|
|
// Send an update INFO, which will allow remote server to show
|
|
// our current route config in monitoring and resend subscriptions
|
|
// that we now possibly allow with a change of Export permissions.
|
|
for _, route := range routes {
|
|
route.mu.Lock()
|
|
route.enqueueProto(infoJSON)
|
|
// Now send SUB and UNSUB protocols as needed.
|
|
if subs, ok := subsNeedSUB[route]; ok && len(subs) > 0 {
|
|
route.sendRouteSubProtos(subs, false, nil)
|
|
}
|
|
if unsubs, ok := subsNeedUNSUB[route]; ok && len(unsubs) > 0 {
|
|
route.sendRouteUnSubProtos(unsubs, false, nil)
|
|
}
|
|
route.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
func (s *Server) reloadClusterPoolAndAccounts(co *clusterOption, opts *Options) {
|
|
s.mu.Lock()
|
|
// Prevent adding new routes until we are ready to do so.
|
|
s.routesReject = true
|
|
var ch chan struct{}
|
|
// For accounts that have been added to the list of dedicated routes,
|
|
// send a protocol to their current assigned routes to allow the
|
|
// other side to prepare for the changes.
|
|
if len(co.accsAdded) > 0 {
|
|
protosSent := 0
|
|
s.accAddedReqID = nuid.Next()
|
|
for _, an := range co.accsAdded {
|
|
if s.accRoutes == nil {
|
|
s.accRoutes = make(map[string]map[string]*client)
|
|
}
|
|
// In case a config reload was first done on another server,
|
|
// we may have already switched this account to a dedicated route.
|
|
// But we still want to send the protocol over the routes that
|
|
// would have otherwise handled it.
|
|
if _, ok := s.accRoutes[an]; !ok {
|
|
s.accRoutes[an] = make(map[string]*client)
|
|
}
|
|
if a, ok := s.accounts.Load(an); ok {
|
|
acc := a.(*Account)
|
|
acc.mu.Lock()
|
|
sl := acc.sl
|
|
// Get the current route pool index before calling setRouteInfo.
|
|
rpi := acc.routePoolIdx
|
|
// Switch to per-account route if not already done.
|
|
if rpi >= 0 {
|
|
s.setRouteInfo(acc)
|
|
} else {
|
|
// If it was transitioning, make sure we set it to the state
|
|
// that indicates that it has a dedicated route
|
|
if rpi == accTransitioningToDedicatedRoute {
|
|
acc.routePoolIdx = accDedicatedRoute
|
|
}
|
|
// Otherwise get the route pool index it would have been before
|
|
// the move so we can send the protocol to those routes.
|
|
rpi = s.computeRoutePoolIdx(acc)
|
|
}
|
|
acc.mu.Unlock()
|
|
// Generate the INFO protocol to send indicating that this account
|
|
// is being moved to a dedicated route.
|
|
ri := Info{
|
|
RoutePoolSize: s.routesPoolSize,
|
|
RouteAccount: an,
|
|
RouteAccReqID: s.accAddedReqID,
|
|
}
|
|
proto := generateInfoJSON(&ri)
|
|
// Go over each remote's route at pool index `rpi` and remove
|
|
// remote subs for this account and send the protocol.
|
|
s.forEachRouteIdx(rpi, func(r *client) bool {
|
|
r.mu.Lock()
|
|
// Exclude routes to servers that don't support pooling.
|
|
if !r.route.noPool {
|
|
if subs := r.removeRemoteSubsForAcc(an); len(subs) > 0 {
|
|
sl.RemoveBatch(subs)
|
|
}
|
|
r.enqueueProto(proto)
|
|
protosSent++
|
|
}
|
|
r.mu.Unlock()
|
|
return true
|
|
})
|
|
}
|
|
}
|
|
if protosSent > 0 {
|
|
s.accAddedCh = make(chan struct{}, protosSent)
|
|
ch = s.accAddedCh
|
|
}
|
|
}
|
|
// Collect routes that need to be closed.
|
|
routes := make(map[*client]struct{})
|
|
// Collect the per-account routes that need to be closed.
|
|
if len(co.accsRemoved) > 0 {
|
|
for _, an := range co.accsRemoved {
|
|
if remotes, ok := s.accRoutes[an]; ok && remotes != nil {
|
|
for _, r := range remotes {
|
|
if r != nil {
|
|
r.setNoReconnect()
|
|
routes[r] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// If the pool size has changed, we need to close all pooled routes.
|
|
if co.poolSizeChanged {
|
|
s.forEachNonPerAccountRoute(func(r *client) {
|
|
routes[r] = struct{}{}
|
|
})
|
|
}
|
|
// If there are routes to close, we need to release the server lock.
|
|
// Same if we need to wait on responses from the remotes when
|
|
// processing new per-account routes.
|
|
if len(routes) > 0 || len(ch) > 0 {
|
|
s.mu.Unlock()
|
|
|
|
for done := false; !done && len(ch) > 0; {
|
|
select {
|
|
case <-ch:
|
|
case <-time.After(2 * time.Second):
|
|
s.Warnf("Timed out waiting for confirmation from all routes regarding per-account routes changes")
|
|
done = true
|
|
}
|
|
}
|
|
|
|
for r := range routes {
|
|
r.closeConnection(RouteRemoved)
|
|
}
|
|
|
|
s.mu.Lock()
|
|
}
|
|
// Clear the accAddedCh/ReqID fields in case they were set.
|
|
s.accAddedReqID, s.accAddedCh = _EMPTY_, nil
|
|
// Now that per-account routes that needed to be closed are closed,
|
|
// remove them from s.accRoutes. Doing so before would prevent
|
|
// removeRoute() to do proper cleanup because the route would not
|
|
// be found in s.accRoutes.
|
|
for _, an := range co.accsRemoved {
|
|
delete(s.accRoutes, an)
|
|
// Do not lookup and call setRouteInfo() on the accounts here.
|
|
// We need first to set the new s.routesPoolSize value and
|
|
// anyway, there is no need to do here if the pool size has
|
|
// changed (since it will be called for all accounts).
|
|
}
|
|
// We have already added the accounts to s.accRoutes that needed to
|
|
// be added.
|
|
|
|
// We should always have at least the system account with a dedicated route,
|
|
// but in case we have a configuration that disables pooling and without
|
|
// a system account, possibly set the accRoutes to nil.
|
|
if len(opts.Cluster.PinnedAccounts) == 0 {
|
|
s.accRoutes = nil
|
|
}
|
|
// Now deal with pool size updates.
|
|
if ps := opts.Cluster.PoolSize; ps > 0 {
|
|
s.routesPoolSize = ps
|
|
s.routeInfo.RoutePoolSize = ps
|
|
} else {
|
|
s.routesPoolSize = 1
|
|
s.routeInfo.RoutePoolSize = 0
|
|
}
|
|
// If the pool size has changed, we need to recompute all accounts' route
|
|
// pool index. Note that the added/removed accounts will be reset there
|
|
// too, but that's ok (we could use a map to exclude them, but not worth it).
|
|
if co.poolSizeChanged {
|
|
s.accounts.Range(func(_, v interface{}) bool {
|
|
acc := v.(*Account)
|
|
acc.mu.Lock()
|
|
s.setRouteInfo(acc)
|
|
acc.mu.Unlock()
|
|
return true
|
|
})
|
|
} else if len(co.accsRemoved) > 0 {
|
|
// For accounts that no longer have a dedicated route, we need to send
|
|
// the subsriptions on the existing pooled routes for those accounts.
|
|
for _, an := range co.accsRemoved {
|
|
if a, ok := s.accounts.Load(an); ok {
|
|
acc := a.(*Account)
|
|
acc.mu.Lock()
|
|
// First call this which will assign a new route pool index.
|
|
s.setRouteInfo(acc)
|
|
// Get the value so we can send the subscriptions interest
|
|
// on all routes with this pool index.
|
|
rpi := acc.routePoolIdx
|
|
acc.mu.Unlock()
|
|
s.forEachRouteIdx(rpi, func(r *client) bool {
|
|
// We have the guarantee that if the route exists, it
|
|
// is not a new one that would have been created when
|
|
// we released the server lock if some routes needed
|
|
// to be closed, because we have set s.routesReject
|
|
// to `true` at the top of this function.
|
|
s.sendSubsToRoute(r, rpi, an)
|
|
return true
|
|
})
|
|
}
|
|
}
|
|
}
|
|
// Allow routes to be accepted now.
|
|
s.routesReject = false
|
|
// If there is a pool size change or added accounts, solicit routes now.
|
|
if co.poolSizeChanged || len(co.accsAdded) > 0 {
|
|
s.solicitRoutes(opts.Routes, co.accsAdded)
|
|
}
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// validateClusterOpts ensures the new ClusterOpts does not change some of the
|
|
// fields that do not support reload.
|
|
func validateClusterOpts(old, new ClusterOpts) error {
|
|
if old.Host != new.Host {
|
|
return fmt.Errorf("config reload not supported for cluster host: old=%s, new=%s",
|
|
old.Host, new.Host)
|
|
}
|
|
if old.Port != new.Port {
|
|
return fmt.Errorf("config reload not supported for cluster port: old=%d, new=%d",
|
|
old.Port, new.Port)
|
|
}
|
|
// Validate Cluster.Advertise syntax
|
|
if new.Advertise != "" {
|
|
if _, _, err := parseHostPort(new.Advertise, 0); err != nil {
|
|
return fmt.Errorf("invalid Cluster.Advertise value of %s, err=%v", new.Advertise, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// diffRoutes diffs the old routes and the new routes and returns the ones that
|
|
// should be added and removed from the server.
|
|
func diffRoutes(old, new []*url.URL) (add, remove []*url.URL) {
|
|
// Find routes to remove.
|
|
removeLoop:
|
|
for _, oldRoute := range old {
|
|
for _, newRoute := range new {
|
|
if urlsAreEqual(oldRoute, newRoute) {
|
|
continue removeLoop
|
|
}
|
|
}
|
|
remove = append(remove, oldRoute)
|
|
}
|
|
|
|
// Find routes to add.
|
|
addLoop:
|
|
for _, newRoute := range new {
|
|
for _, oldRoute := range old {
|
|
if urlsAreEqual(oldRoute, newRoute) {
|
|
continue addLoop
|
|
}
|
|
}
|
|
add = append(add, newRoute)
|
|
}
|
|
|
|
return add, remove
|
|
}
|