Files
nats-server/server/server.go
Derek Collison 8bd290c77a Fix for #1864.
When trying to make sure we properly created all subs for service imports we would check the internal client to see if we should process.
With JS enabled on the server we would place system imports that would break that check and orphan other service imports.

Signed-off-by: Derek Collison <derek@nats.io>
2021-01-29 17:51:14 -08:00

3315 lines
88 KiB
Go

// Copyright 2012-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net"
"net/http"
// Allow dynamic profiling.
_ "net/http/pprof"
"os"
"path"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/nats-io/jwt/v2"
"github.com/nats-io/nkeys"
"github.com/nats-io/nuid"
"github.com/nats-io/nats-server/v2/logger"
)
const (
// Interval for the first PING for non client connections.
firstPingInterval = time.Second
// This is for the first ping for client connections.
firstClientPingInterval = 2 * time.Second
)
// Info is the information sent to clients, routes, gateways, and leaf nodes,
// to help them understand information about this server.
type Info struct {
ID string `json:"server_id"`
Name string `json:"server_name"`
Version string `json:"version"`
Proto int `json:"proto"`
GitCommit string `json:"git_commit,omitempty"`
GoVersion string `json:"go"`
Host string `json:"host"`
Port int `json:"port"`
Headers bool `json:"headers"`
AuthRequired bool `json:"auth_required,omitempty"`
TLSRequired bool `json:"tls_required,omitempty"`
TLSVerify bool `json:"tls_verify,omitempty"`
TLSAvailable bool `json:"tls_available,omitempty"`
MaxPayload int32 `json:"max_payload"`
JetStream bool `json:"jetstream,omitempty"`
IP string `json:"ip,omitempty"`
CID uint64 `json:"client_id,omitempty"`
ClientIP string `json:"client_ip,omitempty"`
Nonce string `json:"nonce,omitempty"`
Cluster string `json:"cluster,omitempty"`
Dynamic bool `json:"cluster_dynamic,omitempty"`
ClientConnectURLs []string `json:"connect_urls,omitempty"` // Contains URLs a client can connect to.
WSConnectURLs []string `json:"ws_connect_urls,omitempty"` // Contains URLs a ws client can connect to.
LameDuckMode bool `json:"ldm,omitempty"`
// Route Specific
Import *SubjectPermission `json:"import,omitempty"`
Export *SubjectPermission `json:"export,omitempty"`
LNOC bool `json:"lnoc,omitempty"`
// Gateways Specific
Gateway string `json:"gateway,omitempty"` // Name of the origin Gateway (sent by gateway's INFO)
GatewayURLs []string `json:"gateway_urls,omitempty"` // Gateway URLs in the originating cluster (sent by gateway's INFO)
GatewayURL string `json:"gateway_url,omitempty"` // Gateway URL on that server (sent by route's INFO)
GatewayCmd byte `json:"gateway_cmd,omitempty"` // Command code for the receiving server to know what to do
GatewayCmdPayload []byte `json:"gateway_cmd_payload,omitempty"` // Command payload when needed
GatewayNRP bool `json:"gateway_nrp,omitempty"` // Uses new $GNR. prefix for mapped replies
// LeafNode Specific
LeafNodeURLs []string `json:"leafnode_urls,omitempty"` // LeafNode URLs that the server can reconnect to.
}
// Server is our main struct.
type Server struct {
gcid uint64
stats
mu sync.Mutex
kp nkeys.KeyPair
prand *rand.Rand
info Info
configFile string
optsMu sync.RWMutex
opts *Options
running bool
shutdown bool
reloading bool
listener net.Listener
gacc *Account
sys *internal
js *jetStream
accounts sync.Map
tmpAccounts sync.Map // Temporarily stores accounts that are being built
activeAccounts int32
accResolver AccountResolver
clients map[uint64]*client
routes map[uint64]*client
routesByHash sync.Map
remotes map[string]*client
leafs map[uint64]*client
users map[string]*User
nkeys map[string]*NkeyUser
totalClients uint64
closed *closedRingBuffer
done chan bool
start time.Time
http net.Listener
httpHandler http.Handler
httpBasePath string
profiler net.Listener
httpReqStats map[string]uint64
routeListener net.Listener
routeInfo Info
routeInfoJSON []byte
routeResolver netResolver
routesToSelf map[string]struct{}
leafNodeListener net.Listener
leafNodeInfo Info
leafNodeInfoJSON []byte
leafURLsMap refCountedUrlSet
leafNodeOpts struct {
resolver netResolver
dialTimeout time.Duration
}
leafRemoteCfgs []*leafNodeCfg
quitCh chan struct{}
shutdownComplete chan struct{}
// Tracking Go routines
grMu sync.Mutex
grTmpClients map[uint64]*client
grRunning bool
grWG sync.WaitGroup // to wait on various go routines
cproto int64 // number of clients supporting async INFO
configTime time.Time // last time config was loaded
logging struct {
sync.RWMutex
logger Logger
trace int32
debug int32
traceSysAcc int32
}
clientConnectURLs []string
// Used internally for quick look-ups.
clientConnectURLsMap refCountedUrlSet
lastCURLsUpdate int64
// For Gateways
gatewayListener net.Listener // Accept listener
gateway *srvGateway
// Used by tests to check that http.Servers do
// not set any timeout.
monitoringServer *http.Server
profilingServer *http.Server
// LameDuck mode
ldm bool
ldmCh chan bool
// Trusted public operator keys.
trustedKeys []string
// map of trusted keys to operator setting StrictSigningKeyUsage
strictSigningKeyUsage map[string]struct{}
// We use this to minimize mem copies for requests to monitoring
// endpoint /varz (when it comes from http).
varzMu sync.Mutex
varz *Varz
// This is set during a config reload if we detect that we have
// added/removed routes. The monitoring code then check that
// to know if it should update the cluster's URLs array.
varzUpdateRouteURLs bool
// Keeps a sublist of of subscriptions attached to leafnode connections
// for the $GNR.*.*.*.> subject so that a server can send back a mapped
// gateway reply.
gwLeafSubs *Sublist
// Used for expiration of mapped GW replies
gwrm struct {
w int32
ch chan time.Duration
m sync.Map
}
// For eventIDs
eventIds *nuid.NUID
// Websocket structure
websocket srvWebsocket
// MQTT structure
mqtt srvMQTT
// exporting account name the importer experienced issues with
incompleteAccExporterMap sync.Map
// Holds cluster name under different lock for mapping
cnMu sync.RWMutex
cn string
// For registering raft nodes with the server.
rnMu sync.RWMutex
raftNodes map[string]RaftNode
// For mapping from a node name back to a server name.
// Normal server lock here.
nodeToName map[string]string
}
// Make sure all are 64bits for atomic use
type stats struct {
inMsgs int64
outMsgs int64
inBytes int64
outBytes int64
slowConsumers int64
}
// New will setup a new server struct after parsing the options.
// DEPRECATED: Use NewServer(opts)
func New(opts *Options) *Server {
s, _ := NewServer(opts)
return s
}
// NewServer will setup a new server struct after parsing the options.
// Could return an error if options can not be validated.
func NewServer(opts *Options) (*Server, error) {
setBaselineOptions(opts)
// Process TLS options, including whether we require client certificates.
tlsReq := opts.TLSConfig != nil
verify := (tlsReq && opts.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert)
// Created server's nkey identity.
kp, _ := nkeys.CreateServer()
pub, _ := kp.PublicKey()
serverName := pub
if opts.ServerName != _EMPTY_ {
serverName = opts.ServerName
}
httpBasePath := normalizeBasePath(opts.HTTPBasePath)
// Validate some options. This is here because we cannot assume that
// server will always be started with configuration parsing (that could
// report issues). Its options can be (incorrectly) set by hand when
// server is embedded. If there is an error, return nil.
if err := validateOptions(opts); err != nil {
return nil, err
}
info := Info{
ID: pub,
Version: VERSION,
Proto: PROTO,
GitCommit: gitCommit,
GoVersion: runtime.Version(),
Name: serverName,
Host: opts.Host,
Port: opts.Port,
AuthRequired: false,
TLSRequired: tlsReq && !opts.AllowNonTLS,
TLSVerify: verify,
MaxPayload: opts.MaxPayload,
JetStream: opts.JetStream,
Headers: !opts.NoHeaderSupport,
Cluster: opts.Cluster.Name,
}
if tlsReq && !info.TLSRequired {
info.TLSAvailable = true
}
now := time.Now()
s := &Server{
kp: kp,
configFile: opts.ConfigFile,
info: info,
prand: rand.New(rand.NewSource(time.Now().UnixNano())),
opts: opts,
done: make(chan bool, 1),
start: now,
configTime: now,
gwLeafSubs: NewSublistWithCache(),
httpBasePath: httpBasePath,
eventIds: nuid.New(),
routesToSelf: make(map[string]struct{}),
nodeToName: make(map[string]string),
}
// Trusted root operator keys.
if !s.processTrustedKeys() {
return nil, fmt.Errorf("Error processing trusted operator keys")
}
s.mu.Lock()
defer s.mu.Unlock()
// Place ourselves.
s.nodeToName[string(getHash(serverName))] = serverName
s.routeResolver = opts.Cluster.resolver
if s.routeResolver == nil {
s.routeResolver = net.DefaultResolver
}
// Used internally for quick look-ups.
s.clientConnectURLsMap = make(refCountedUrlSet)
s.websocket.connectURLsMap = make(refCountedUrlSet)
s.leafURLsMap = make(refCountedUrlSet)
// Ensure that non-exported options (used in tests) are properly set.
s.setLeafNodeNonExportedOptions()
// Call this even if there is no gateway defined. It will
// initialize the structure so we don't have to check for
// it to be nil or not in various places in the code.
if err := s.newGateway(opts); err != nil {
return nil, err
}
// If we have a cluster definition but do not have a cluster name, create one.
if opts.Cluster.Port != 0 && opts.Cluster.Name == "" {
s.info.Cluster = nuid.Next()
}
// This is normally done in the AcceptLoop, once the
// listener has been created (possibly with random port),
// but since some tests may expect the INFO to be properly
// set after New(), let's do it now.
s.setInfoHostPort()
// For tracking clients
s.clients = make(map[uint64]*client)
// For tracking closed clients.
s.closed = newClosedRingBuffer(opts.MaxClosedClients)
// For tracking connections that are not yet registered
// in s.routes, but for which readLoop has started.
s.grTmpClients = make(map[uint64]*client)
// For tracking routes and their remote ids
s.routes = make(map[uint64]*client)
s.remotes = make(map[string]*client)
// For tracking leaf nodes.
s.leafs = make(map[uint64]*client)
// Used to kick out all go routines possibly waiting on server
// to shutdown.
s.quitCh = make(chan struct{})
// Closed when Shutdown() is complete. Allows WaitForShutdown() to block
// waiting for complete shutdown.
s.shutdownComplete = make(chan struct{})
// Check for configured account resolvers.
if err := s.configureResolver(); err != nil {
return nil, err
}
// If there is an URL account resolver, do basic test to see if anyone is home.
if ar := opts.AccountResolver; ar != nil {
if ur, ok := ar.(*URLAccResolver); ok {
if _, err := ur.Fetch(""); err != nil {
return nil, err
}
}
}
// For other resolver:
// In operator mode, when the account resolver depends on an external system and
// the system account can't fetched, inject a temporary one.
if ar := s.accResolver; len(opts.TrustedOperators) == 1 && ar != nil &&
opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT {
if _, ok := ar.(*MemAccResolver); !ok {
s.mu.Unlock()
var a *Account
// perform direct lookup to avoid warning trace
if _, err := fetchAccount(ar, s.opts.SystemAccount); err == nil {
a, _ = s.fetchAccount(s.opts.SystemAccount)
}
s.mu.Lock()
if a == nil {
sac := NewAccount(s.opts.SystemAccount)
sac.Issuer = opts.TrustedOperators[0].Issuer
sac.signingKeys = map[string]jwt.Scope{}
sac.signingKeys[s.opts.SystemAccount] = nil
s.registerAccountNoLock(sac)
}
}
}
// For tracking accounts
if err := s.configureAccounts(); err != nil {
return nil, err
}
// Used to setup Authorization.
s.configureAuthorization()
// Start signal handler
s.handleSignals()
return s, nil
}
// clusterName returns our cluster name which could be dynamic.
func (s *Server) ClusterName() string {
s.mu.Lock()
cn := s.info.Cluster
s.mu.Unlock()
return cn
}
// Grabs cluster name with cluster name specific lock.
func (s *Server) cachedClusterName() string {
s.cnMu.RLock()
cn := s.cn
s.cnMu.RUnlock()
return cn
}
// setClusterName will update the cluster name for this server.
func (s *Server) setClusterName(name string) {
s.mu.Lock()
var resetCh chan struct{}
if s.sys != nil && s.info.Cluster != name {
// can't hold the lock as go routine reading it may be waiting for lock as well
resetCh = s.sys.resetCh
}
s.info.Cluster = name
s.routeInfo.Cluster = name
// Regenerate the info byte array
s.generateRouteInfoJSON()
// Need to close solicited leaf nodes. The close has to be done outside of the server lock.
var leafs []*client
for _, c := range s.leafs {
c.mu.Lock()
if c.leaf != nil && c.leaf.remote != nil {
leafs = append(leafs, c)
}
c.mu.Unlock()
}
s.mu.Unlock()
// Also place into mapping cn with cnMu lock.
s.cnMu.Lock()
s.cn = name
s.cnMu.Unlock()
for _, l := range leafs {
l.closeConnection(ClusterNameConflict)
}
if resetCh != nil {
resetCh <- struct{}{}
}
s.Noticef("Cluster name updated to %s", name)
}
// Return whether the cluster name is dynamic.
func (s *Server) isClusterNameDynamic() bool {
return s.getOpts().Cluster.Name == ""
}
// ClientURL returns the URL used to connect clients. Helpful in testing
// when we designate a random client port (-1).
func (s *Server) ClientURL() string {
// FIXME(dlc) - should we add in user and pass if defined single?
opts := s.getOpts()
scheme := "nats://"
if opts.TLSConfig != nil {
scheme = "tls://"
}
return fmt.Sprintf("%s%s:%d", scheme, opts.Host, opts.Port)
}
func validateClusterName(o *Options) error {
// Check that cluster name if defined matches any gateway name.
if o.Gateway.Name != "" && o.Gateway.Name != o.Cluster.Name {
if o.Cluster.Name != "" {
return ErrClusterNameConfigConflict
}
// Set this here so we do not consider it dynamic.
o.Cluster.Name = o.Gateway.Name
}
return nil
}
func validateOptions(o *Options) error {
if o.LameDuckDuration > 0 && o.LameDuckGracePeriod >= o.LameDuckDuration {
return fmt.Errorf("lame duck grace period (%v) should be strictly lower than lame duck duration (%v)",
o.LameDuckGracePeriod, o.LameDuckDuration)
}
// Check that the trust configuration is correct.
if err := validateTrustedOperators(o); err != nil {
return err
}
// Check on leaf nodes which will require a system
// account when gateways are also configured.
if err := validateLeafNode(o); err != nil {
return err
}
// Check that authentication is properly configured.
if err := validateAuth(o); err != nil {
return err
}
// Check that gateway is properly configured. Returns no error
// if there is no gateway defined.
if err := validateGatewayOptions(o); err != nil {
return err
}
// Check that cluster name if defined matches any gateway name.
if err := validateClusterName(o); err != nil {
return err
}
if err := validateMQTTOptions(o); err != nil {
return err
}
if err := validateJetStreamOptions(o); err != nil {
return err
}
// Finally check websocket options.
return validateWebsocketOptions(o)
}
func (s *Server) getOpts() *Options {
s.optsMu.RLock()
opts := s.opts
s.optsMu.RUnlock()
return opts
}
func (s *Server) setOpts(opts *Options) {
s.optsMu.Lock()
s.opts = opts
s.optsMu.Unlock()
}
func (s *Server) globalAccount() *Account {
s.mu.Lock()
gacc := s.gacc
s.mu.Unlock()
return gacc
}
// Used to setup Accounts.
// Lock is held upon entry.
func (s *Server) configureAccounts() error {
// Create the global account.
if s.gacc == nil {
s.gacc = NewAccount(globalAccountName)
s.registerAccountNoLock(s.gacc)
}
opts := s.opts
// Check opts and walk through them. We need to copy them here
// so that we do not keep a real one sitting in the options.
for _, acc := range s.opts.Accounts {
var a *Account
if acc.Name == globalAccountName {
a = s.gacc
} else {
a = acc.shallowCopy()
}
if acc.hasMappings() {
// For now just move and wipe from opts.Accounts version.
a.mappings = acc.mappings
acc.mappings = nil
// We use this for selecting between multiple weighted destinations.
a.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
acc.sl = nil
acc.clients = nil
s.registerAccountNoLock(a)
}
// Now that we have this we need to remap any referenced accounts in
// import or export maps to the new ones.
swapApproved := func(ea *exportAuth) {
for sub, a := range ea.approved {
var acc *Account
if v, ok := s.accounts.Load(a.Name); ok {
acc = v.(*Account)
}
ea.approved[sub] = acc
}
}
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
// Exports
for _, se := range acc.exports.streams {
if se != nil {
swapApproved(&se.exportAuth)
}
}
for _, se := range acc.exports.services {
if se != nil {
// Swap over the bound account for service exports.
if se.acc != nil {
if v, ok := s.accounts.Load(se.acc.Name); ok {
se.acc = v.(*Account)
}
}
swapApproved(&se.exportAuth)
}
}
// Imports
for _, si := range acc.imports.streams {
if v, ok := s.accounts.Load(si.acc.Name); ok {
si.acc = v.(*Account)
}
}
for _, si := range acc.imports.services {
if v, ok := s.accounts.Load(si.acc.Name); ok {
si.acc = v.(*Account)
si.se = si.acc.getServiceExport(si.to)
}
}
// Make sure the subs are running, but only if not reloading.
if len(acc.imports.services) > 0 && acc.ic == nil && !s.reloading {
acc.ic = s.createInternalAccountClient()
acc.ic.acc = acc
acc.addAllServiceImportSubs()
}
acc.updated = time.Now()
return true
})
// Set the system account if it was configured.
// Otherwise create a default one.
if opts.SystemAccount != _EMPTY_ {
// Lock may be acquired in lookupAccount, so release to call lookupAccount.
s.mu.Unlock()
acc, err := s.lookupAccount(opts.SystemAccount)
s.mu.Lock()
if err == nil && s.sys != nil && acc != s.sys.account {
// sys.account.clients (including internal client)/respmap/etc... are transferred separately
s.sys.account = acc
s.mu.Unlock()
// acquires server lock separately
s.addSystemAccountExports(acc)
s.mu.Lock()
}
if err != nil {
return fmt.Errorf("error resolving system account: %v", err)
}
}
return nil
}
// Setup the account resolver. For memory resolver, make sure the JWTs are
// properly formed but do not enforce expiration etc.
func (s *Server) configureResolver() error {
opts := s.getOpts()
s.accResolver = opts.AccountResolver
if opts.AccountResolver != nil {
// For URL resolver, set the TLSConfig if specified.
if opts.AccountResolverTLSConfig != nil {
if ar, ok := opts.AccountResolver.(*URLAccResolver); ok {
if t, ok := ar.c.Transport.(*http.Transport); ok {
t.CloseIdleConnections()
t.TLSClientConfig = opts.AccountResolverTLSConfig.Clone()
}
}
}
if len(opts.resolverPreloads) > 0 {
if s.accResolver.IsReadOnly() {
return fmt.Errorf("resolver preloads only available for writeable resolver types MEM/DIR/CACHE_DIR")
}
for k, v := range opts.resolverPreloads {
_, err := jwt.DecodeAccountClaims(v)
if err != nil {
return fmt.Errorf("preload account error for %q: %v", k, err)
}
s.accResolver.Store(k, v)
}
}
}
return nil
}
// This will check preloads for validation issues.
func (s *Server) checkResolvePreloads() {
opts := s.getOpts()
// We can just check the read-only opts versions here, that way we do not need
// to grab server lock or access s.accResolver.
for k, v := range opts.resolverPreloads {
claims, err := jwt.DecodeAccountClaims(v)
if err != nil {
s.Errorf("Preloaded account [%s] not valid", k)
}
// Check if it is expired.
vr := jwt.CreateValidationResults()
claims.Validate(vr)
if vr.IsBlocking(true) {
s.Warnf("Account [%s] has validation issues:", k)
for _, v := range vr.Issues {
s.Warnf(" - %s", v.Description)
}
}
}
}
func (s *Server) generateRouteInfoJSON() {
b, _ := json.Marshal(s.routeInfo)
pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)}
s.routeInfoJSON = bytes.Join(pcs, []byte(" "))
}
// Determines if we are in pre NATS 2.0 setup with no accounts.
func (s *Server) globalAccountOnly() bool {
var hasOthers bool
if s.trustedKeys != nil {
return false
}
s.mu.Lock()
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
// Ignore global and system
if acc == s.gacc || (s.sys != nil && acc == s.sys.account) {
return true
}
hasOthers = true
return false
})
s.mu.Unlock()
return !hasOthers
}
// Determines if this server is in standalone mode, meaning no routes or gateways or leafnodes.
func (s *Server) standAloneMode() bool {
opts := s.getOpts()
return opts.Cluster.Port == 0 && opts.LeafNode.Port == 0 && opts.Gateway.Port == 0
}
func (s *Server) configuredRoutes() int {
return len(s.getOpts().Routes)
}
// activePeers is used in bootstrapping raft groups like the JetStream meta controller.
func (s *Server) activePeers() (peers []string) {
s.mu.Lock()
defer s.mu.Unlock()
if s.sys == nil {
return nil
}
// FIXME(dlc) - When this spans supercluster need to adjust below.
for _, r := range s.routes {
peers = append(peers, r.route.hash)
}
return peers
}
// isTrustedIssuer will check that the issuer is a trusted public key.
// This is used to make sure an account was signed by a trusted operator.
func (s *Server) isTrustedIssuer(issuer string) bool {
s.mu.Lock()
defer s.mu.Unlock()
// If we are not running in trusted mode and there is no issuer, that is ok.
if s.trustedKeys == nil && issuer == "" {
return true
}
for _, tk := range s.trustedKeys {
if tk == issuer {
return true
}
}
return false
}
// processTrustedKeys will process binary stamped and
// options-based trusted nkeys. Returns success.
func (s *Server) processTrustedKeys() bool {
s.strictSigningKeyUsage = map[string]struct{}{}
if trustedKeys != "" && !s.initStampedTrustedKeys() {
return false
} else if s.opts.TrustedKeys != nil {
for _, key := range s.opts.TrustedKeys {
if !nkeys.IsValidPublicOperatorKey(key) {
return false
}
}
s.trustedKeys = append([]string(nil), s.opts.TrustedKeys...)
for _, claim := range s.opts.TrustedOperators {
if !claim.StrictSigningKeyUsage {
continue
}
for _, key := range claim.SigningKeys {
s.strictSigningKeyUsage[key] = struct{}{}
}
}
}
return true
}
// checkTrustedKeyString will check that the string is a valid array
// of public operator nkeys.
func checkTrustedKeyString(keys string) []string {
tks := strings.Fields(keys)
if len(tks) == 0 {
return nil
}
// Walk all the keys and make sure they are valid.
for _, key := range tks {
if !nkeys.IsValidPublicOperatorKey(key) {
return nil
}
}
return tks
}
// initStampedTrustedKeys will check the stamped trusted keys
// and will set the server field 'trustedKeys'. Returns whether
// it succeeded or not.
func (s *Server) initStampedTrustedKeys() bool {
// Check to see if we have an override in options, which will cause us to fail.
if len(s.opts.TrustedKeys) > 0 {
return false
}
tks := checkTrustedKeyString(trustedKeys)
if len(tks) == 0 {
return false
}
s.trustedKeys = tks
return true
}
// PrintAndDie is exported for access in other packages.
func PrintAndDie(msg string) {
fmt.Fprintln(os.Stderr, msg)
os.Exit(1)
}
// PrintServerAndExit will print our version and exit.
func PrintServerAndExit() {
fmt.Printf("nats-server: v%s\n", VERSION)
os.Exit(0)
}
// ProcessCommandLineArgs takes the command line arguments
// validating and setting flags for handling in case any
// sub command was present.
func ProcessCommandLineArgs(cmd *flag.FlagSet) (showVersion bool, showHelp bool, err error) {
if len(cmd.Args()) > 0 {
arg := cmd.Args()[0]
switch strings.ToLower(arg) {
case "version":
return true, false, nil
case "help":
return false, true, nil
default:
return false, false, fmt.Errorf("unrecognized command: %q", arg)
}
}
return false, false, nil
}
// Public version.
func (s *Server) Running() bool {
return s.isRunning()
}
// Protected check on running state
func (s *Server) isRunning() bool {
s.mu.Lock()
running := s.running
s.mu.Unlock()
return running
}
func (s *Server) logPid() error {
pidStr := strconv.Itoa(os.Getpid())
return ioutil.WriteFile(s.getOpts().PidFile, []byte(pidStr), 0660)
}
// NewAccountsAllowed returns whether or not new accounts can be created on the fly.
func (s *Server) NewAccountsAllowed() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.opts.AllowNewAccounts
}
// numReservedAccounts will return the number of reserved accounts configured in the server.
// Currently this is 1, one for the global default account.
func (s *Server) numReservedAccounts() int {
return 1
}
// NumActiveAccounts reports number of active accounts on this server.
func (s *Server) NumActiveAccounts() int32 {
return atomic.LoadInt32(&s.activeAccounts)
}
// incActiveAccounts() just adds one under lock.
func (s *Server) incActiveAccounts() {
atomic.AddInt32(&s.activeAccounts, 1)
}
// decActiveAccounts() just subtracts one under lock.
func (s *Server) decActiveAccounts() {
atomic.AddInt32(&s.activeAccounts, -1)
}
// This should be used for testing only. Will be slow since we have to
// range over all accounts in the sync.Map to count.
func (s *Server) numAccounts() int {
count := 0
s.mu.Lock()
s.accounts.Range(func(k, v interface{}) bool {
count++
return true
})
s.mu.Unlock()
return count
}
// NumLoadedAccounts returns the number of loaded accounts.
func (s *Server) NumLoadedAccounts() int {
return s.numAccounts()
}
// LookupOrRegisterAccount will return the given account if known or create a new entry.
func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew bool) {
s.mu.Lock()
defer s.mu.Unlock()
if v, ok := s.accounts.Load(name); ok {
return v.(*Account), false
}
acc := NewAccount(name)
s.registerAccountNoLock(acc)
return acc, true
}
// RegisterAccount will register an account. The account must be new
// or this call will fail.
func (s *Server) RegisterAccount(name string) (*Account, error) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.accounts.Load(name); ok {
return nil, ErrAccountExists
}
acc := NewAccount(name)
s.registerAccountNoLock(acc)
return acc, nil
}
// SetSystemAccount will set the internal system account.
// If root operators are present it will also check validity.
func (s *Server) SetSystemAccount(accName string) error {
// Lookup from sync.Map first.
if v, ok := s.accounts.Load(accName); ok {
return s.setSystemAccount(v.(*Account))
}
// If we are here we do not have local knowledge of this account.
// Do this one by hand to return more useful error.
ac, jwt, err := s.fetchAccountClaims(accName)
if err != nil {
return err
}
acc := s.buildInternalAccount(ac)
acc.claimJWT = jwt
// Due to race, we need to make sure that we are not
// registering twice.
if racc := s.registerAccount(acc); racc != nil {
return nil
}
return s.setSystemAccount(acc)
}
// SystemAccount returns the system account if set.
func (s *Server) SystemAccount() *Account {
var sacc *Account
s.mu.Lock()
if s.sys != nil {
sacc = s.sys.account
}
s.mu.Unlock()
return sacc
}
// GlobalAccount returns the global account.
// Default clients will use the global account.
func (s *Server) GlobalAccount() *Account {
s.mu.Lock()
defer s.mu.Unlock()
return s.gacc
}
// SetDefaultSystemAccount will create a default system account if one is not present.
func (s *Server) SetDefaultSystemAccount() error {
if _, isNew := s.LookupOrRegisterAccount(DEFAULT_SYSTEM_ACCOUNT); !isNew {
return nil
}
s.Debugf("Created system account: %q", DEFAULT_SYSTEM_ACCOUNT)
return s.SetSystemAccount(DEFAULT_SYSTEM_ACCOUNT)
}
// For internal sends.
const internalSendQLen = 8192
// Assign a system account. Should only be called once.
// This sets up a server to send and receive messages from
// inside the server itself.
func (s *Server) setSystemAccount(acc *Account) error {
if acc == nil {
return ErrMissingAccount
}
// Don't try to fix this here.
if acc.IsExpired() {
return ErrAccountExpired
}
// If we are running with trusted keys for an operator
// make sure we check the account is legit.
if !s.isTrustedIssuer(acc.Issuer) {
return ErrAccountValidation
}
s.mu.Lock()
if s.sys != nil {
s.mu.Unlock()
return ErrAccountExists
}
// This is here in an attempt to quiet the race detector and not have to place
// locks on fast path for inbound messages and checking service imports.
acc.mu.Lock()
if acc.imports.services == nil {
acc.imports.services = make(map[string]*serviceImport)
}
acc.mu.Unlock()
s.sys = &internal{
account: acc,
client: s.createInternalSystemClient(),
seq: 1,
sid: 1,
servers: make(map[string]*serverUpdate),
replies: make(map[string]msgHandler),
sendq: make(chan *pubMsg, internalSendQLen),
resetCh: make(chan struct{}),
statsz: eventsHBInterval,
orphMax: 5 * eventsHBInterval,
chkOrph: 3 * eventsHBInterval,
}
s.sys.wg.Add(1)
s.mu.Unlock()
// Register with the account.
s.sys.client.registerWithAccount(acc)
s.addSystemAccountExports(acc)
// Start our internal loop to serialize outbound messages.
// We do our own wg here since we will stop first during shutdown.
go s.internalSendLoop(&s.sys.wg)
// Start up our general subscriptions
s.initEventTracking()
// Track for dead remote servers.
s.wrapChk(s.startRemoteServerSweepTimer)()
// Send out statsz updates periodically.
s.wrapChk(s.startStatszTimer)()
// If we have existing accounts make sure we enable account tracking.
s.mu.Lock()
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
s.enableAccountTracking(acc)
return true
})
s.mu.Unlock()
return nil
}
// Creates an internal system client.
func (s *Server) createInternalSystemClient() *client {
return s.createInternalClient(SYSTEM)
}
// Creates an internal jetstream client.
func (s *Server) createInternalJetStreamClient() *client {
return s.createInternalClient(JETSTREAM)
}
// Creates an internal client for Account.
func (s *Server) createInternalAccountClient() *client {
return s.createInternalClient(ACCOUNT)
}
// Internal clients. kind should be SYSTEM or JETSTREAM
func (s *Server) createInternalClient(kind int) *client {
if kind != SYSTEM && kind != JETSTREAM && kind != ACCOUNT {
return nil
}
now := time.Now()
c := &client{srv: s, kind: kind, opts: internalOpts, msubs: -1, mpay: -1, start: now, last: now}
c.initClient()
c.echo = false
c.headers = true
c.flags.set(noReconnect)
return c
}
// Determine if accounts should track subscriptions for
// efficient propagation.
// Lock should be held on entry.
func (s *Server) shouldTrackSubscriptions() bool {
return (s.opts.Cluster.Port != 0 || s.opts.Gateway.Port != 0)
}
// Invokes registerAccountNoLock under the protection of the server lock.
// That is, server lock is acquired/released in this function.
// See registerAccountNoLock for comment on returned value.
func (s *Server) registerAccount(acc *Account) *Account {
s.mu.Lock()
racc := s.registerAccountNoLock(acc)
s.mu.Unlock()
return racc
}
// Helper to set the sublist based on preferences.
func (s *Server) setAccountSublist(acc *Account) {
if acc != nil && acc.sl == nil {
opts := s.getOpts()
if opts != nil && opts.NoSublistCache {
acc.sl = NewSublistNoCache()
} else {
acc.sl = NewSublistWithCache()
}
}
}
// Registers an account in the server.
// Due to some locking considerations, we may end-up trying
// to register the same account twice. This function will
// then return the already registered account.
// Lock should be held on entry.
func (s *Server) registerAccountNoLock(acc *Account) *Account {
// We are under the server lock. Lookup from map, if present
// return existing account.
if a, _ := s.accounts.Load(acc.Name); a != nil {
s.tmpAccounts.Delete(acc.Name)
return a.(*Account)
}
// Finish account setup and store.
s.setAccountSublist(acc)
if acc.clients == nil {
acc.clients = make(map[*client]struct{})
}
// If we are capable of routing we will track subscription
// information for efficient interest propagation.
// During config reload, it is possible that account was
// already created (global account), so use locking and
// make sure we create only if needed.
acc.mu.Lock()
// TODO(dlc)- Double check that we need this for GWs.
if acc.rm == nil && s.opts != nil && s.shouldTrackSubscriptions() {
acc.rm = make(map[string]int32)
acc.lqws = make(map[string]int32)
}
acc.srv = s
acc.updated = time.Now()
acc.mu.Unlock()
s.accounts.Store(acc.Name, acc)
s.tmpAccounts.Delete(acc.Name)
s.enableAccountTracking(acc)
return nil
}
// lookupAccount is a function to return the account structure
// associated with an account name.
// Lock MUST NOT be held upon entry.
func (s *Server) lookupAccount(name string) (*Account, error) {
var acc *Account
if v, ok := s.accounts.Load(name); ok {
acc = v.(*Account)
}
if acc != nil {
// If we are expired and we have a resolver, then
// return the latest information from the resolver.
if acc.IsExpired() {
s.Debugf("Requested account [%s] has expired", name)
if s.AccountResolver() != nil {
if err := s.updateAccount(acc); err != nil {
// This error could mask expired, so just return expired here.
return nil, ErrAccountExpired
}
} else {
return nil, ErrAccountExpired
}
}
return acc, nil
}
// If we have a resolver see if it can fetch the account.
if s.AccountResolver() == nil {
return nil, ErrNoAccountResolver
}
return s.fetchAccount(name)
}
// LookupAccount is a public function to return the account structure
// associated with name.
func (s *Server) LookupAccount(name string) (*Account, error) {
return s.lookupAccount(name)
}
// This will fetch new claims and if found update the account with new claims.
// Lock MUST NOT be held upon entry.
func (s *Server) updateAccount(acc *Account) error {
// TODO(dlc) - Make configurable
if !acc.incomplete && time.Since(acc.updated) < time.Second {
s.Debugf("Requested account update for [%s] ignored, too soon", acc.Name)
return ErrAccountResolverUpdateTooSoon
}
claimJWT, err := s.fetchRawAccountClaims(acc.Name)
if err != nil {
return err
}
return s.updateAccountWithClaimJWT(acc, claimJWT)
}
// updateAccountWithClaimJWT will check and apply the claim update.
// Lock MUST NOT be held upon entry.
func (s *Server) updateAccountWithClaimJWT(acc *Account, claimJWT string) error {
if acc == nil {
return ErrMissingAccount
}
if acc.claimJWT != "" && acc.claimJWT == claimJWT && !acc.incomplete {
s.Debugf("Requested account update for [%s], same claims detected", acc.Name)
return ErrAccountResolverSameClaims
}
accClaims, _, err := s.verifyAccountClaims(claimJWT)
if err == nil && accClaims != nil {
acc.mu.Lock()
if acc.Issuer == "" {
acc.Issuer = accClaims.Issuer
}
if acc.Name != accClaims.Subject {
acc.mu.Unlock()
return ErrAccountValidation
}
acc.claimJWT = claimJWT
acc.mu.Unlock()
s.UpdateAccountClaims(acc, accClaims)
return nil
}
return err
}
// fetchRawAccountClaims will grab raw account claims iff we have a resolver.
// Lock is NOT held upon entry.
func (s *Server) fetchRawAccountClaims(name string) (string, error) {
accResolver := s.AccountResolver()
if accResolver == nil {
return "", ErrNoAccountResolver
}
// Need to do actual Fetch
start := time.Now()
claimJWT, err := fetchAccount(accResolver, name)
fetchTime := time.Since(start)
if fetchTime > time.Second {
s.Warnf("Account [%s] fetch took %v", name, fetchTime)
} else {
s.Debugf("Account [%s] fetch took %v", name, fetchTime)
}
if err != nil {
s.Warnf("Account fetch failed: %v", err)
return "", err
}
return claimJWT, nil
}
// fetchAccountClaims will attempt to fetch new claims if a resolver is present.
// Lock is NOT held upon entry.
func (s *Server) fetchAccountClaims(name string) (*jwt.AccountClaims, string, error) {
claimJWT, err := s.fetchRawAccountClaims(name)
if err != nil {
return nil, _EMPTY_, err
}
var claim *jwt.AccountClaims
claim, claimJWT, err = s.verifyAccountClaims(claimJWT)
if claim != nil && claim.Subject != name {
return nil, _EMPTY_, ErrAccountValidation
}
return claim, claimJWT, err
}
// verifyAccountClaims will decode and validate any account claims.
func (s *Server) verifyAccountClaims(claimJWT string) (*jwt.AccountClaims, string, error) {
accClaims, err := jwt.DecodeAccountClaims(claimJWT)
if err != nil {
return nil, _EMPTY_, err
}
if !s.isTrustedIssuer(accClaims.Issuer) {
return nil, _EMPTY_, ErrAccountValidation
}
vr := jwt.CreateValidationResults()
accClaims.Validate(vr)
if vr.IsBlocking(true) {
return nil, _EMPTY_, ErrAccountValidation
}
return accClaims, claimJWT, nil
}
// This will fetch an account from a resolver if defined.
// Lock is NOT held upon entry.
func (s *Server) fetchAccount(name string) (*Account, error) {
accClaims, claimJWT, err := s.fetchAccountClaims(name)
if accClaims == nil {
return nil, err
}
acc := s.buildInternalAccount(accClaims)
acc.claimJWT = claimJWT
// Due to possible race, if registerAccount() returns a non
// nil account, it means the same account was already
// registered and we should use this one.
if racc := s.registerAccount(acc); racc != nil {
// Update with the new claims in case they are new.
// Following call will ignore ErrAccountResolverSameClaims
// if claims are the same.
err = s.updateAccountWithClaimJWT(racc, claimJWT)
if err != nil && err != ErrAccountResolverSameClaims {
return nil, err
}
return racc, nil
}
// The sub imports may have been setup but will not have had their
// subscriptions properly setup. Do that here.
if len(acc.imports.services) > 0 {
if acc.ic == nil {
acc.ic = s.createInternalAccountClient()
acc.ic.acc = acc
}
acc.addAllServiceImportSubs()
}
return acc, nil
}
// Start up the server, this will block.
// Start via a Go routine if needed.
func (s *Server) Start() {
s.Noticef("Starting nats-server")
gc := gitCommit
if gc == "" {
gc = "not set"
}
s.Noticef(" Version: %s", VERSION)
s.Noticef(" Git: [%s]", gc)
s.Debugf(" Go build: %s", s.info.GoVersion)
s.Noticef(" Name: %s", s.info.Name)
if s.sys != nil {
s.Noticef(" Node: %s", s.sys.shash)
}
s.Noticef(" ID: %s", s.info.ID)
defer s.Noticef("Server is ready")
// Check for insecure configurations.
s.checkAuthforWarnings()
// Avoid RACE between Start() and Shutdown()
s.mu.Lock()
s.running = true
s.mu.Unlock()
s.grMu.Lock()
s.grRunning = true
s.grMu.Unlock()
// Snapshot server options.
opts := s.getOpts()
if opts.ConfigFile != _EMPTY_ {
s.Noticef("Using configuration file: %s", opts.ConfigFile)
}
hasOperators := len(opts.TrustedOperators) > 0
if hasOperators {
s.Noticef("Trusted Operators")
}
for _, opc := range opts.TrustedOperators {
s.Noticef(" System : %q", opc.Audience)
s.Noticef(" Operator: %q", opc.Name)
s.Noticef(" Issued : %v", time.Unix(opc.IssuedAt, 0))
s.Noticef(" Expires : %v", time.Unix(opc.Expires, 0))
}
if hasOperators && opts.SystemAccount == _EMPTY_ {
s.Warnf("Trusted Operators should utilize a System Account")
}
// If we have a memory resolver, check the accounts here for validation exceptions.
// This allows them to be logged right away vs when they are accessed via a client.
if hasOperators && len(opts.resolverPreloads) > 0 {
s.checkResolvePreloads()
}
// Log the pid to a file
if opts.PidFile != _EMPTY_ {
if err := s.logPid(); err != nil {
s.Fatalf("Could not write pidfile: %v", err)
return
}
}
// Setup system account which will start the eventing stack.
if sa := opts.SystemAccount; sa != _EMPTY_ {
if err := s.SetSystemAccount(sa); err != nil {
s.Fatalf("Can't set system account: %v", err)
return
}
} else if !opts.NoSystemAccount {
// We will create a default system account here.
s.SetDefaultSystemAccount()
}
// start up resolver machinery
if ar := s.AccountResolver(); ar != nil {
if err := ar.Start(s); err != nil {
s.Fatalf("Could not start resolver: %v", err)
return
}
// In operator mode, when the account resolver depends on an external system and
// the system account is the bootstrapping account, start fetching it
if len(opts.TrustedOperators) == 1 && opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT {
_, isMemResolver := ar.(*MemAccResolver)
if v, ok := s.accounts.Load(s.opts.SystemAccount); !isMemResolver && ok && v.(*Account).claimJWT == "" {
s.Noticef("Using bootstrapping system account")
s.startGoRoutine(func() {
defer s.grWG.Done()
t := time.NewTicker(time.Second)
defer t.Stop()
for {
select {
case <-s.quitCh:
return
case <-t.C:
if _, err := fetchAccount(ar, s.opts.SystemAccount); err != nil {
continue
}
if _, err := s.fetchAccount(s.opts.SystemAccount); err != nil {
continue
}
s.Noticef("System account fetched and updated")
return
}
}
})
}
}
}
// Start expiration of mapped GW replies, regardless if
// this server is configured with gateway or not.
s.startGWReplyMapExpiration()
// Check if JetStream has been enabled. This needs to be after
// the system account setup above. JetStream will create its
// own system account if one is not present.
if opts.JetStream {
// Make sure someone is not trying to enable on the system account.
if sa := s.SystemAccount(); sa != nil && sa.jsLimits != nil {
s.Fatalf("Not allowed to enable JetStream on the system account")
}
cfg := &JetStreamConfig{
StoreDir: opts.StoreDir,
MaxMemory: opts.JetStreamMaxMemory,
MaxStore: opts.JetStreamMaxStore,
}
if err := s.EnableJetStream(cfg); err != nil {
s.Fatalf("Can't start JetStream: %v", err)
return
}
} else {
// Check to see if any configured accounts have JetStream enabled
// and warn if they do.
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
acc.mu.RLock()
hasJs := acc.jsLimits != nil
name := acc.Name
acc.mu.RUnlock()
if hasJs {
s.Warnf("Account [%q] has JetStream configuration but JetStream not enabled", name)
}
return true
})
}
// Start monitoring if needed
if err := s.StartMonitoring(); err != nil {
s.Fatalf("Can't start monitoring: %v", err)
return
}
// Start up gateway if needed. Do this before starting the routes, because
// we want to resolve the gateway host:port so that this information can
// be sent to other routes.
if opts.Gateway.Port != 0 {
s.startGateways()
}
// Start websocket server if needed. Do this before starting the routes, and
// leaf node because we want to resolve the gateway host:port so that this
// information can be sent to other routes.
if opts.Websocket.Port != 0 {
s.startWebsocketServer()
}
// Start up listen if we want to accept leaf node connections.
if opts.LeafNode.Port != 0 {
// Will resolve or assign the advertise address for the leafnode listener.
// We need that in StartRouting().
s.startLeafNodeAcceptLoop()
}
// Solicit remote servers for leaf node connections.
if len(opts.LeafNode.Remotes) > 0 {
s.solicitLeafNodeRemotes(opts.LeafNode.Remotes)
}
// TODO (ik): I wanted to refactor this by starting the client
// accept loop first, that is, it would resolve listen spec
// in place, but start the accept-for-loop in a different go
// routine. This would get rid of the synchronization between
// this function and StartRouting, which I also would have wanted
// to refactor, but both AcceptLoop() and StartRouting() have
// been exported and not sure if that would break users using them.
// We could mark them as deprecated and remove in a release or two...
// The Routing routine needs to wait for the client listen
// port to be opened and potential ephemeral port selected.
clientListenReady := make(chan struct{})
// MQTT
if opts.MQTT.Port != 0 {
s.startMQTT()
}
// Start up routing as well if needed.
if opts.Cluster.Port != 0 {
s.startGoRoutine(func() {
s.StartRouting(clientListenReady)
})
}
// Pprof http endpoint for the profiler.
if opts.ProfPort != 0 {
s.StartProfiler()
}
if opts.PortsFileDir != _EMPTY_ {
s.logPorts()
}
// Wait for clients.
s.AcceptLoop(clientListenReady)
}
// Shutdown will shutdown the server instance by kicking out the AcceptLoop
// and closing all associated clients.
func (s *Server) Shutdown() {
// Shutdown our raftnodes. If we are the leader we will attempt to transfer.
s.shutdownRaftNodes()
// Shutdown the eventing system as needed.
// This is done first to send out any messages for
// account status. We will also clean up any
// eventing items associated with accounts.
s.shutdownEventing()
// Now check jetstream.
s.shutdownJetStream()
s.mu.Lock()
// Prevent issues with multiple calls.
if s.shutdown {
s.mu.Unlock()
return
}
s.Noticef("Initiating Shutdown...")
if s.accResolver != nil {
s.accResolver.Close()
}
opts := s.getOpts()
s.shutdown = true
s.running = false
s.grMu.Lock()
s.grRunning = false
s.grMu.Unlock()
conns := make(map[uint64]*client)
// Copy off the clients
for i, c := range s.clients {
conns[i] = c
}
// Copy off the connections that are not yet registered
// in s.routes, but for which the readLoop has started
s.grMu.Lock()
for i, c := range s.grTmpClients {
conns[i] = c
}
s.grMu.Unlock()
// Copy off the routes
for i, r := range s.routes {
conns[i] = r
}
// Copy off the gateways
s.getAllGatewayConnections(conns)
// Copy off the leaf nodes
for i, c := range s.leafs {
conns[i] = c
}
// Number of done channel responses we expect.
doneExpected := 0
// Kick client AcceptLoop()
if s.listener != nil {
doneExpected++
s.listener.Close()
s.listener = nil
}
// Kick websocket server
if s.websocket.server != nil {
doneExpected++
s.websocket.server.Close()
s.websocket.server = nil
s.websocket.listener = nil
}
// Kick MQTT accept loop
if s.mqtt.listener != nil {
doneExpected++
s.mqtt.listener.Close()
s.mqtt.listener = nil
}
// Kick leafnodes AcceptLoop()
if s.leafNodeListener != nil {
doneExpected++
s.leafNodeListener.Close()
s.leafNodeListener = nil
}
// Kick route AcceptLoop()
if s.routeListener != nil {
doneExpected++
s.routeListener.Close()
s.routeListener = nil
}
// Kick Gateway AcceptLoop()
if s.gatewayListener != nil {
doneExpected++
s.gatewayListener.Close()
s.gatewayListener = nil
}
// Kick HTTP monitoring if its running
if s.http != nil {
doneExpected++
s.http.Close()
s.http = nil
}
// Kick Profiling if its running
if s.profiler != nil {
doneExpected++
s.profiler.Close()
}
s.mu.Unlock()
// Release go routines that wait on that channel
close(s.quitCh)
// Close client and route connections
for _, c := range conns {
c.setNoReconnect()
c.closeConnection(ServerShutdown)
}
// Block until the accept loops exit
for doneExpected > 0 {
<-s.done
doneExpected--
}
// Wait for go routines to be done.
s.grWG.Wait()
if opts.PortsFileDir != _EMPTY_ {
s.deletePortsFile(opts.PortsFileDir)
}
s.Noticef("Server Exiting..")
// Close logger if applicable. It allows tests on Windows
// to be able to do proper cleanup (delete log file).
s.logging.RLock()
log := s.logging.logger
s.logging.RUnlock()
if log != nil {
if l, ok := log.(*logger.Logger); ok {
l.Close()
}
}
// Notify that the shutdown is complete
close(s.shutdownComplete)
}
// WaitForShutdown will block until the server has been fully shutdown.
func (s *Server) WaitForShutdown() {
<-s.shutdownComplete
}
// AcceptLoop is exported for easier testing.
func (s *Server) AcceptLoop(clr chan struct{}) {
// If we were to exit before the listener is setup properly,
// make sure we close the channel.
defer func() {
if clr != nil {
close(clr)
}
}()
// Snapshot server options.
opts := s.getOpts()
// Setup state that can enable shutdown
s.mu.Lock()
if s.shutdown {
s.mu.Unlock()
return
}
hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
l, e := natsListen("tcp", hp)
if e != nil {
s.mu.Unlock()
s.Fatalf("Error listening on port: %s, %q", hp, e)
return
}
s.Noticef("Listening for client connections on %s",
net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
// Alert of TLS enabled.
if opts.TLSConfig != nil {
s.Noticef("TLS required for client connections")
}
// If server was started with RANDOM_PORT (-1), opts.Port would be equal
// to 0 at the beginning this function. So we need to get the actual port
if opts.Port == 0 {
// Write resolved port back to options.
opts.Port = l.Addr().(*net.TCPAddr).Port
}
// Now that port has been set (if it was set to RANDOM), set the
// server's info Host/Port with either values from Options or
// ClientAdvertise.
if err := s.setInfoHostPort(); err != nil {
s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", s.opts.ClientAdvertise, err)
l.Close()
s.mu.Unlock()
return
}
// Keep track of client connect URLs. We may need them later.
s.clientConnectURLs = s.getClientConnectURLs()
s.listener = l
go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
func(_ error) bool {
if s.isLameDuckMode() {
// Signal that we are not accepting new clients
s.ldmCh <- true
// Now wait for the Shutdown...
<-s.quitCh
return true
}
return false
})
s.mu.Unlock()
// Let the caller know that we are ready
close(clr)
clr = nil
}
func (s *Server) acceptConnections(l net.Listener, acceptName string, createFunc func(conn net.Conn), errFunc func(err error) bool) {
tmpDelay := ACCEPT_MIN_SLEEP
for {
conn, err := l.Accept()
if err != nil {
if errFunc != nil && errFunc(err) {
return
}
if tmpDelay = s.acceptError(acceptName, err, tmpDelay); tmpDelay < 0 {
break
}
continue
}
tmpDelay = ACCEPT_MIN_SLEEP
if !s.startGoRoutine(func() {
createFunc(conn)
s.grWG.Done()
}) {
conn.Close()
}
}
s.Debugf(acceptName + " accept loop exiting..")
s.done <- true
}
// This function sets the server's info Host/Port based on server Options.
// Note that this function may be called during config reload, this is why
// Host/Port may be reset to original Options if the ClientAdvertise option
// is not set (since it may have previously been).
func (s *Server) setInfoHostPort() error {
// When this function is called, opts.Port is set to the actual listen
// port (if option was originally set to RANDOM), even during a config
// reload. So use of s.opts.Port is safe.
if s.opts.ClientAdvertise != "" {
h, p, err := parseHostPort(s.opts.ClientAdvertise, s.opts.Port)
if err != nil {
return err
}
s.info.Host = h
s.info.Port = p
} else {
s.info.Host = s.opts.Host
s.info.Port = s.opts.Port
}
return nil
}
// StartProfiler is called to enable dynamic profiling.
func (s *Server) StartProfiler() {
// Snapshot server options.
opts := s.getOpts()
port := opts.ProfPort
// Check for Random Port
if port == -1 {
port = 0
}
s.mu.Lock()
if s.shutdown {
s.mu.Unlock()
return
}
hp := net.JoinHostPort(opts.Host, strconv.Itoa(port))
l, err := net.Listen("tcp", hp)
s.Noticef("profiling port: %d", l.Addr().(*net.TCPAddr).Port)
if err != nil {
s.mu.Unlock()
s.Fatalf("error starting profiler: %s", err)
return
}
srv := &http.Server{
Addr: hp,
Handler: http.DefaultServeMux,
MaxHeaderBytes: 1 << 20,
}
s.profiler = l
s.profilingServer = srv
// Enable blocking profile
runtime.SetBlockProfileRate(1)
go func() {
// if this errors out, it's probably because the server is being shutdown
err := srv.Serve(l)
if err != nil {
s.mu.Lock()
shutdown := s.shutdown
s.mu.Unlock()
if !shutdown {
s.Fatalf("error starting profiler: %s", err)
}
}
srv.Close()
s.done <- true
}()
s.mu.Unlock()
}
// StartHTTPMonitoring will enable the HTTP monitoring port.
// DEPRECATED: Should use StartMonitoring.
func (s *Server) StartHTTPMonitoring() {
s.startMonitoring(false)
}
// StartHTTPSMonitoring will enable the HTTPS monitoring port.
// DEPRECATED: Should use StartMonitoring.
func (s *Server) StartHTTPSMonitoring() {
s.startMonitoring(true)
}
// StartMonitoring starts the HTTP or HTTPs server if needed.
func (s *Server) StartMonitoring() error {
// Snapshot server options.
opts := s.getOpts()
// Specifying both HTTP and HTTPS ports is a misconfiguration
if opts.HTTPPort != 0 && opts.HTTPSPort != 0 {
return fmt.Errorf("can't specify both HTTP (%v) and HTTPs (%v) ports", opts.HTTPPort, opts.HTTPSPort)
}
var err error
if opts.HTTPPort != 0 {
err = s.startMonitoring(false)
} else if opts.HTTPSPort != 0 {
if opts.TLSConfig == nil {
return fmt.Errorf("TLS cert and key required for HTTPS")
}
err = s.startMonitoring(true)
}
return err
}
// HTTP endpoints
const (
RootPath = "/"
VarzPath = "/varz"
ConnzPath = "/connz"
RoutezPath = "/routez"
GatewayzPath = "/gatewayz"
LeafzPath = "/leafz"
SubszPath = "/subsz"
StackszPath = "/stacksz"
AccountzPath = "/accountz"
)
func (s *Server) basePath(p string) string {
return path.Join(s.httpBasePath, p)
}
// Start the monitoring server
func (s *Server) startMonitoring(secure bool) error {
// Snapshot server options.
opts := s.getOpts()
// Used to track HTTP requests
s.httpReqStats = map[string]uint64{
RootPath: 0,
VarzPath: 0,
ConnzPath: 0,
RoutezPath: 0,
GatewayzPath: 0,
SubszPath: 0,
}
var (
hp string
err error
httpListener net.Listener
port int
)
monitorProtocol := "http"
if secure {
monitorProtocol += "s"
port = opts.HTTPSPort
if port == -1 {
port = 0
}
hp = net.JoinHostPort(opts.HTTPHost, strconv.Itoa(port))
config := opts.TLSConfig.Clone()
config.ClientAuth = tls.NoClientCert
httpListener, err = tls.Listen("tcp", hp, config)
} else {
port = opts.HTTPPort
if port == -1 {
port = 0
}
hp = net.JoinHostPort(opts.HTTPHost, strconv.Itoa(port))
httpListener, err = net.Listen("tcp", hp)
}
if err != nil {
return fmt.Errorf("can't listen to the monitor port: %v", err)
}
s.Noticef("Starting %s monitor on %s", monitorProtocol,
net.JoinHostPort(opts.HTTPHost, strconv.Itoa(httpListener.Addr().(*net.TCPAddr).Port)))
mux := http.NewServeMux()
// Root
mux.HandleFunc(s.basePath(RootPath), s.HandleRoot)
// Varz
mux.HandleFunc(s.basePath(VarzPath), s.HandleVarz)
// Connz
mux.HandleFunc(s.basePath(ConnzPath), s.HandleConnz)
// Routez
mux.HandleFunc(s.basePath(RoutezPath), s.HandleRoutez)
// Gatewayz
mux.HandleFunc(s.basePath(GatewayzPath), s.HandleGatewayz)
// Leafz
mux.HandleFunc(s.basePath(LeafzPath), s.HandleLeafz)
// Subz
mux.HandleFunc(s.basePath(SubszPath), s.HandleSubsz)
// Subz alias for backwards compatibility
mux.HandleFunc(s.basePath("/subscriptionsz"), s.HandleSubsz)
// Stacksz
mux.HandleFunc(s.basePath(StackszPath), s.HandleStacksz)
// Accountz
mux.HandleFunc(s.basePath(AccountzPath), s.HandleAccountz)
// Do not set a WriteTimeout because it could cause cURL/browser
// to return empty response or unable to display page if the
// server needs more time to build the response.
srv := &http.Server{
Addr: hp,
Handler: mux,
MaxHeaderBytes: 1 << 20,
}
s.mu.Lock()
if s.shutdown {
httpListener.Close()
s.mu.Unlock()
return nil
}
s.http = httpListener
s.httpHandler = mux
s.monitoringServer = srv
s.mu.Unlock()
go func() {
if err := srv.Serve(httpListener); err != nil {
s.mu.Lock()
shutdown := s.shutdown
s.mu.Unlock()
if !shutdown {
s.Fatalf("Error starting monitor on %q: %v", hp, err)
}
}
srv.Close()
srv.Handler = nil
s.mu.Lock()
s.httpHandler = nil
s.mu.Unlock()
s.done <- true
}()
return nil
}
// HTTPHandler returns the http.Handler object used to handle monitoring
// endpoints. It will return nil if the server is not configured for
// monitoring, or if the server has not been started yet (Server.Start()).
func (s *Server) HTTPHandler() http.Handler {
s.mu.Lock()
defer s.mu.Unlock()
return s.httpHandler
}
// Perform a conditional deep copy due to reference nature of [Client|WS]ConnectURLs.
// If updates are made to Info, this function should be consulted and updated.
// Assume lock is held.
func (s *Server) copyInfo() Info {
info := s.info
if len(info.ClientConnectURLs) > 0 {
info.ClientConnectURLs = append([]string(nil), s.info.ClientConnectURLs...)
}
if len(info.WSConnectURLs) > 0 {
info.WSConnectURLs = append([]string(nil), s.info.WSConnectURLs...)
}
return info
}
// tlsMixConn is used when we can receive both TLS and non-TLS connections on same port.
type tlsMixConn struct {
net.Conn
pre *bytes.Buffer
}
// Read for our mixed multi-reader.
func (c *tlsMixConn) Read(b []byte) (int, error) {
if c.pre != nil {
n, err := c.pre.Read(b)
if c.pre.Len() == 0 {
c.pre = nil
}
return n, err
}
return c.Conn.Read(b)
}
func (s *Server) createClient(conn net.Conn) *client {
// Snapshot server options.
opts := s.getOpts()
maxPay := int32(opts.MaxPayload)
maxSubs := int32(opts.MaxSubs)
// For system, maxSubs of 0 means unlimited, so re-adjust here.
if maxSubs == 0 {
maxSubs = -1
}
now := time.Now()
c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
c.registerWithAccount(s.globalAccount())
var info Info
var authRequired bool
s.mu.Lock()
// Grab JSON info string
info = s.copyInfo()
if s.nonceRequired() {
// Nonce handling
var raw [nonceLen]byte
nonce := raw[:]
s.generateNonce(nonce)
info.Nonce = string(nonce)
}
c.nonce = []byte(info.Nonce)
authRequired = info.AuthRequired
s.totalClients++
s.mu.Unlock()
// Grab lock
c.mu.Lock()
if authRequired {
c.flags.set(expectConnect)
}
// Initialize
c.initClient()
c.Debugf("Client connection created")
// Send our information.
// Need to be sent in place since writeLoop cannot be started until
// TLS handshake is done (if applicable).
c.sendProtoNow(c.generateClientInfoJSON(info))
// Unlock to register
c.mu.Unlock()
// Register with the server.
s.mu.Lock()
// If server is not running, Shutdown() may have already gathered the
// list of connections to close. It won't contain this one, so we need
// to bail out now otherwise the readLoop started down there would not
// be interrupted. Skip also if in lame duck mode.
if !s.running || s.ldm {
// There are some tests that create a server but don't start it,
// and use "async" clients and perform the parsing manually. Such
// clients would branch here (since server is not running). However,
// when a server was really running and has been shutdown, we must
// close this connection.
if s.shutdown {
conn.Close()
}
s.mu.Unlock()
return c
}
// If there is a max connections specified, check that adding
// this new client would not push us over the max
if opts.MaxConn > 0 && len(s.clients) >= opts.MaxConn {
s.mu.Unlock()
c.maxConnExceeded()
return nil
}
s.clients[c.cid] = c
tlsRequired := info.TLSRequired
s.mu.Unlock()
// Re-Grab lock
c.mu.Lock()
// Connection could have been closed while sending the INFO proto.
isClosed := c.isClosed()
var pre []byte
// If we have both TLS and non-TLS allowed we need to see which
// one the client wants.
if !isClosed && opts.TLSConfig != nil && opts.AllowNonTLS {
pre = make([]byte, 4)
c.nc.SetReadDeadline(time.Now().Add(secondsToDuration(opts.TLSTimeout)))
n, _ := io.ReadFull(c.nc, pre[:])
c.nc.SetReadDeadline(time.Time{})
pre = pre[:n]
if n > 0 && pre[0] == 0x16 {
tlsRequired = true
} else {
tlsRequired = false
}
}
// Check for TLS
if !isClosed && tlsRequired {
// If we have a prebuffer create a multi-reader.
if len(pre) > 0 {
c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
// Clear pre so it is not parsed.
pre = nil
}
// Performs server-side TLS handshake.
if err := c.doTLSServerHandshake(_EMPTY_, opts.TLSConfig, opts.TLSTimeout); err != nil {
c.mu.Unlock()
return nil
}
}
// If connection is marked as closed, bail out.
if isClosed {
c.mu.Unlock()
// Connection could have been closed due to TLS timeout or while trying
// to send the INFO protocol. We need to call closeConnection() to make
// sure that proper cleanup is done.
c.closeConnection(WriteError)
return nil
}
// Check for Auth. We schedule this timer after the TLS handshake to avoid
// the race where the timer fires during the handshake and causes the
// server to write bad data to the socket. See issue #432.
if authRequired {
c.setAuthTimer(secondsToDuration(opts.AuthTimeout))
}
// Do final client initialization
// Set the Ping timer. Will be reset once connect was received.
c.setPingTimer()
// Spin up the read loop.
s.startGoRoutine(func() { c.readLoop(pre) })
// Spin up the write loop.
s.startGoRoutine(func() { c.writeLoop() })
if tlsRequired {
c.Debugf("TLS handshake complete")
cs := c.nc.(*tls.Conn).ConnectionState()
c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tlsCipher(cs.CipherSuite))
}
c.mu.Unlock()
return c
}
// This will save off a closed client in a ring buffer such that
// /connz can inspect. Useful for debugging, etc.
func (s *Server) saveClosedClient(c *client, nc net.Conn, reason ClosedState) {
now := time.Now()
s.accountDisconnectEvent(c, now, reason.String())
c.mu.Lock()
cc := &closedClient{}
cc.fill(c, nc, now)
cc.Stop = &now
cc.Reason = reason.String()
// Do subs, do not place by default in main ConnInfo
if len(c.subs) > 0 {
cc.subs = make([]SubDetail, 0, len(c.subs))
for _, sub := range c.subs {
cc.subs = append(cc.subs, newSubDetail(sub))
}
}
// Hold user as well.
cc.user = c.opts.Username
// Hold account name if not the global account.
if c.acc != nil && c.acc.Name != globalAccountName {
cc.acc = c.acc.Name
}
cc.JWT = c.opts.JWT
cc.IssuerKey = issuerForClient(c)
cc.Tags = c.tags
cc.NameTag = c.nameTag
c.mu.Unlock()
// Place in the ring buffer
s.mu.Lock()
if s.closed != nil {
s.closed.append(cc)
}
s.mu.Unlock()
}
// Adds to the list of client and websocket clients connect URLs.
// If there was a change, an INFO protocol is sent to registered clients
// that support async INFO protocols.
func (s *Server) addConnectURLsAndSendINFOToClients(curls, wsurls []string) {
s.updateServerINFOAndSendINFOToClients(curls, wsurls, true)
}
// Removes from the list of client and websocket clients connect URLs.
// If there was a change, an INFO protocol is sent to registered clients
// that support async INFO protocols.
func (s *Server) removeConnectURLsAndSendINFOToClients(curls, wsurls []string) {
s.updateServerINFOAndSendINFOToClients(curls, wsurls, false)
}
// Updates the list of client and websocket clients connect URLs and if any change
// sends an async INFO update to clients that support it.
func (s *Server) updateServerINFOAndSendINFOToClients(curls, wsurls []string, add bool) {
s.mu.Lock()
defer s.mu.Unlock()
remove := !add
// Will return true if we need alter the server's Info object.
updateMap := func(urls []string, m refCountedUrlSet) bool {
wasUpdated := false
for _, url := range urls {
if add && m.addUrl(url) {
wasUpdated = true
} else if remove && m.removeUrl(url) {
wasUpdated = true
}
}
return wasUpdated
}
cliUpdated := updateMap(curls, s.clientConnectURLsMap)
wsUpdated := updateMap(wsurls, s.websocket.connectURLsMap)
updateInfo := func(infoURLs *[]string, urls []string, m refCountedUrlSet) {
// Recreate the info's slice from the map
*infoURLs = (*infoURLs)[:0]
// Add this server client connect ULRs first...
*infoURLs = append(*infoURLs, urls...)
// Then the ones from the map
for url := range m {
*infoURLs = append(*infoURLs, url)
}
}
if cliUpdated {
updateInfo(&s.info.ClientConnectURLs, s.clientConnectURLs, s.clientConnectURLsMap)
}
if wsUpdated {
updateInfo(&s.info.WSConnectURLs, s.websocket.connectURLs, s.websocket.connectURLsMap)
}
if cliUpdated || wsUpdated {
// Update the time of this update
s.lastCURLsUpdate = time.Now().UnixNano()
// Send to all registered clients that support async INFO protocols.
s.sendAsyncInfoToClients(cliUpdated, wsUpdated)
}
}
// Handle closing down a connection when the handshake has timedout.
func tlsTimeout(c *client, conn *tls.Conn) {
c.mu.Lock()
closed := c.isClosed()
c.mu.Unlock()
// Check if already closed
if closed {
return
}
cs := conn.ConnectionState()
if !cs.HandshakeComplete {
c.Errorf("TLS handshake timeout")
c.sendErr("Secure Connection - TLS Required")
c.closeConnection(TLSHandshakeError)
}
}
// Seems silly we have to write these
func tlsVersion(ver uint16) string {
switch ver {
case tls.VersionTLS10:
return "1.0"
case tls.VersionTLS11:
return "1.1"
case tls.VersionTLS12:
return "1.2"
case tls.VersionTLS13:
return "1.3"
}
return fmt.Sprintf("Unknown [0x%x]", ver)
}
// We use hex here so we don't need multiple versions
func tlsCipher(cs uint16) string {
name, present := cipherMapByID[cs]
if present {
return name
}
return fmt.Sprintf("Unknown [0x%x]", cs)
}
// Remove a client or route from our internal accounting.
func (s *Server) removeClient(c *client) {
// kind is immutable, so can check without lock
switch c.kind {
case CLIENT:
c.mu.Lock()
cid := c.cid
updateProtoInfoCount := false
if c.kind == CLIENT && c.opts.Protocol >= ClientProtoInfo {
updateProtoInfoCount = true
}
c.mu.Unlock()
s.mu.Lock()
delete(s.clients, cid)
if updateProtoInfoCount {
s.cproto--
}
mqtt := c.isMqtt()
s.mu.Unlock()
if mqtt {
s.mqttHandleClosedClient(c)
}
case ROUTER:
s.removeRoute(c)
case GATEWAY:
s.removeRemoteGatewayConnection(c)
case LEAF:
s.removeLeafNodeConnection(c)
}
}
func (s *Server) removeFromTempClients(cid uint64) {
s.grMu.Lock()
delete(s.grTmpClients, cid)
s.grMu.Unlock()
}
func (s *Server) addToTempClients(cid uint64, c *client) bool {
added := false
s.grMu.Lock()
if s.grRunning {
s.grTmpClients[cid] = c
added = true
}
s.grMu.Unlock()
return added
}
/////////////////////////////////////////////////////////////////
// These are some helpers for accounting in functional tests.
/////////////////////////////////////////////////////////////////
// NumRoutes will report the number of registered routes.
func (s *Server) NumRoutes() int {
s.mu.Lock()
nr := len(s.routes)
s.mu.Unlock()
return nr
}
// NumRemotes will report number of registered remotes.
func (s *Server) NumRemotes() int {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.remotes)
}
// NumLeafNodes will report number of leaf node connections.
func (s *Server) NumLeafNodes() int {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.leafs)
}
// NumClients will report the number of registered clients.
func (s *Server) NumClients() int {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.clients)
}
// GetClient will return the client associated with cid.
func (s *Server) GetClient(cid uint64) *client {
return s.getClient(cid)
}
// getClient will return the client associated with cid.
func (s *Server) getClient(cid uint64) *client {
s.mu.Lock()
defer s.mu.Unlock()
return s.clients[cid]
}
// GetLeafNode returns the leafnode associated with the cid.
func (s *Server) GetLeafNode(cid uint64) *client {
s.mu.Lock()
defer s.mu.Unlock()
return s.leafs[cid]
}
// NumSubscriptions will report how many subscriptions are active.
func (s *Server) NumSubscriptions() uint32 {
s.mu.Lock()
defer s.mu.Unlock()
return s.numSubscriptions()
}
// numSubscriptions will report how many subscriptions are active.
// Lock should be held.
func (s *Server) numSubscriptions() uint32 {
var subs int
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
if acc.sl != nil {
subs += acc.TotalSubs()
}
return true
})
return uint32(subs)
}
// NumSlowConsumers will report the number of slow consumers.
func (s *Server) NumSlowConsumers() int64 {
return atomic.LoadInt64(&s.slowConsumers)
}
// ConfigTime will report the last time the server configuration was loaded.
func (s *Server) ConfigTime() time.Time {
s.mu.Lock()
defer s.mu.Unlock()
return s.configTime
}
// Addr will return the net.Addr object for the current listener.
func (s *Server) Addr() net.Addr {
s.mu.Lock()
defer s.mu.Unlock()
if s.listener == nil {
return nil
}
return s.listener.Addr()
}
// MonitorAddr will return the net.Addr object for the monitoring listener.
func (s *Server) MonitorAddr() *net.TCPAddr {
s.mu.Lock()
defer s.mu.Unlock()
if s.http == nil {
return nil
}
return s.http.Addr().(*net.TCPAddr)
}
// ClusterAddr returns the net.Addr object for the route listener.
func (s *Server) ClusterAddr() *net.TCPAddr {
s.mu.Lock()
defer s.mu.Unlock()
if s.routeListener == nil {
return nil
}
return s.routeListener.Addr().(*net.TCPAddr)
}
// ProfilerAddr returns the net.Addr object for the profiler listener.
func (s *Server) ProfilerAddr() *net.TCPAddr {
s.mu.Lock()
defer s.mu.Unlock()
if s.profiler == nil {
return nil
}
return s.profiler.Addr().(*net.TCPAddr)
}
// ReadyForConnections returns `true` if the server is ready to accept clients
// and, if routing is enabled, route connections. If after the duration
// `dur` the server is still not ready, returns `false`.
func (s *Server) ReadyForConnections(dur time.Duration) bool {
// Snapshot server options.
opts := s.getOpts()
end := time.Now().Add(dur)
for time.Now().Before(end) {
s.mu.Lock()
ok := s.listener != nil &&
(opts.Cluster.Port == 0 || s.routeListener != nil) &&
(opts.Gateway.Name == "" || s.gatewayListener != nil) &&
(opts.LeafNode.Port == 0 || s.leafNodeListener != nil) &&
(opts.Websocket.Port == 0 || s.websocket.listener != nil)
s.mu.Unlock()
if ok {
return true
}
time.Sleep(25 * time.Millisecond)
}
return false
}
// Quick utility to function to tell if the server supports headers.
func (s *Server) supportsHeaders() bool {
if s == nil {
return false
}
return !(s.getOpts().NoHeaderSupport)
}
// ID returns the server's ID
func (s *Server) ID() string {
s.mu.Lock()
defer s.mu.Unlock()
return s.info.ID
}
// Name returns the server's name. This will be the same as the ID if it was not set.
func (s *Server) Name() string {
s.mu.Lock()
defer s.mu.Unlock()
return s.info.Name
}
func (s *Server) String() string {
return s.Name()
}
func (s *Server) startGoRoutine(f func()) bool {
var started bool
s.grMu.Lock()
if s.grRunning {
s.grWG.Add(1)
go f()
started = true
}
s.grMu.Unlock()
return started
}
func (s *Server) numClosedConns() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.closed.len()
}
func (s *Server) totalClosedConns() uint64 {
s.mu.Lock()
defer s.mu.Unlock()
return s.closed.totalConns()
}
func (s *Server) closedClients() []*closedClient {
s.mu.Lock()
defer s.mu.Unlock()
return s.closed.closedClients()
}
// getClientConnectURLs returns suitable URLs for clients to connect to the listen
// port based on the server options' Host and Port. If the Host corresponds to
// "any" interfaces, this call returns the list of resolved IP addresses.
// If ClientAdvertise is set, returns the client advertise host and port.
// The server lock is assumed held on entry.
func (s *Server) getClientConnectURLs() []string {
// Snapshot server options.
opts := s.getOpts()
// Ignore error here since we know that if there is client advertise, the
// parseHostPort is correct because we did it right before calling this
// function in Server.New().
urls, _ := s.getConnectURLs(opts.ClientAdvertise, opts.Host, opts.Port)
return urls
}
// Generic version that will return an array of URLs based on the given
// advertise, host and port values.
func (s *Server) getConnectURLs(advertise, host string, port int) ([]string, error) {
urls := make([]string, 0, 1)
// short circuit if advertise is set
if advertise != "" {
h, p, err := parseHostPort(advertise, port)
if err != nil {
return nil, err
}
urls = append(urls, net.JoinHostPort(h, strconv.Itoa(p)))
} else {
sPort := strconv.Itoa(port)
_, ips, err := s.getNonLocalIPsIfHostIsIPAny(host, true)
for _, ip := range ips {
urls = append(urls, net.JoinHostPort(ip, sPort))
}
if err != nil || len(urls) == 0 {
// We are here if s.opts.Host is not "0.0.0.0" nor "::", or if for some
// reason we could not add any URL in the loop above.
// We had a case where a Windows VM was hosed and would have err == nil
// and not add any address in the array in the loop above, and we
// ended-up returning 0.0.0.0, which is problematic for Windows clients.
// Check for 0.0.0.0 or :: specifically, and ignore if that's the case.
if host == "0.0.0.0" || host == "::" {
s.Errorf("Address %q can not be resolved properly", host)
} else {
urls = append(urls, net.JoinHostPort(host, sPort))
}
}
}
return urls, nil
}
// Returns an array of non local IPs if the provided host is
// 0.0.0.0 or ::. It returns the first resolved if `all` is
// false.
// The boolean indicate if the provided host was 0.0.0.0 (or ::)
// so that if the returned array is empty caller can decide
// what to do next.
func (s *Server) getNonLocalIPsIfHostIsIPAny(host string, all bool) (bool, []string, error) {
ip := net.ParseIP(host)
// If this is not an IP, we are done
if ip == nil {
return false, nil, nil
}
// If this is not 0.0.0.0 or :: we have nothing to do.
if !ip.IsUnspecified() {
return false, nil, nil
}
s.Debugf("Get non local IPs for %q", host)
var ips []string
ifaces, _ := net.Interfaces()
for _, i := range ifaces {
addrs, _ := i.Addrs()
for _, addr := range addrs {
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
ipStr := ip.String()
// Skip non global unicast addresses
if !ip.IsGlobalUnicast() || ip.IsUnspecified() {
ip = nil
continue
}
s.Debugf(" ip=%s", ipStr)
ips = append(ips, ipStr)
if !all {
break
}
}
}
return true, ips, nil
}
// if the ip is not specified, attempt to resolve it
func resolveHostPorts(addr net.Listener) []string {
hostPorts := make([]string, 0)
hp := addr.Addr().(*net.TCPAddr)
port := strconv.Itoa(hp.Port)
if hp.IP.IsUnspecified() {
var ip net.IP
ifaces, _ := net.Interfaces()
for _, i := range ifaces {
addrs, _ := i.Addrs()
for _, addr := range addrs {
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
hostPorts = append(hostPorts, net.JoinHostPort(ip.String(), port))
case *net.IPAddr:
ip = v.IP
hostPorts = append(hostPorts, net.JoinHostPort(ip.String(), port))
default:
continue
}
}
}
} else {
hostPorts = append(hostPorts, net.JoinHostPort(hp.IP.String(), port))
}
return hostPorts
}
// format the address of a net.Listener with a protocol
func formatURL(protocol string, addr net.Listener) []string {
hostports := resolveHostPorts(addr)
for i, hp := range hostports {
hostports[i] = fmt.Sprintf("%s://%s", protocol, hp)
}
return hostports
}
// Ports describes URLs that the server can be contacted in
type Ports struct {
Nats []string `json:"nats,omitempty"`
Monitoring []string `json:"monitoring,omitempty"`
Cluster []string `json:"cluster,omitempty"`
Profile []string `json:"profile,omitempty"`
WebSocket []string `json:"websocket,omitempty"`
}
// PortsInfo attempts to resolve all the ports. If after maxWait the ports are not
// resolved, it returns nil. Otherwise it returns a Ports struct
// describing ports where the server can be contacted
func (s *Server) PortsInfo(maxWait time.Duration) *Ports {
if s.readyForListeners(maxWait) {
opts := s.getOpts()
s.mu.Lock()
tls := s.info.TLSRequired
listener := s.listener
httpListener := s.http
clusterListener := s.routeListener
profileListener := s.profiler
wsListener := s.websocket.listener
wss := s.websocket.tls
s.mu.Unlock()
ports := Ports{}
if listener != nil {
natsProto := "nats"
if tls {
natsProto = "tls"
}
ports.Nats = formatURL(natsProto, listener)
}
if httpListener != nil {
monProto := "http"
if opts.HTTPSPort != 0 {
monProto = "https"
}
ports.Monitoring = formatURL(monProto, httpListener)
}
if clusterListener != nil {
clusterProto := "nats"
if opts.Cluster.TLSConfig != nil {
clusterProto = "tls"
}
ports.Cluster = formatURL(clusterProto, clusterListener)
}
if profileListener != nil {
ports.Profile = formatURL("http", profileListener)
}
if wsListener != nil {
protocol := wsSchemePrefix
if wss {
protocol = wsSchemePrefixTLS
}
ports.WebSocket = formatURL(protocol, wsListener)
}
return &ports
}
return nil
}
// Returns the portsFile. If a non-empty dirHint is provided, the dirHint
// path is used instead of the server option value
func (s *Server) portFile(dirHint string) string {
dirname := s.getOpts().PortsFileDir
if dirHint != "" {
dirname = dirHint
}
if dirname == _EMPTY_ {
return _EMPTY_
}
return filepath.Join(dirname, fmt.Sprintf("%s_%d.ports", filepath.Base(os.Args[0]), os.Getpid()))
}
// Delete the ports file. If a non-empty dirHint is provided, the dirHint
// path is used instead of the server option value
func (s *Server) deletePortsFile(hintDir string) {
portsFile := s.portFile(hintDir)
if portsFile != "" {
if err := os.Remove(portsFile); err != nil {
s.Errorf("Error cleaning up ports file %s: %v", portsFile, err)
}
}
}
// Writes a file with a serialized Ports to the specified ports_file_dir.
// The name of the file is `exename_pid.ports`, typically nats-server_pid.ports.
// if ports file is not set, this function has no effect
func (s *Server) logPorts() {
opts := s.getOpts()
portsFile := s.portFile(opts.PortsFileDir)
if portsFile != _EMPTY_ {
go func() {
info := s.PortsInfo(5 * time.Second)
if info == nil {
s.Errorf("Unable to resolve the ports in the specified time")
return
}
data, err := json.Marshal(info)
if err != nil {
s.Errorf("Error marshaling ports file: %v", err)
return
}
if err := ioutil.WriteFile(portsFile, data, 0666); err != nil {
s.Errorf("Error writing ports file (%s): %v", portsFile, err)
return
}
}()
}
}
// waits until a calculated list of listeners is resolved or a timeout
func (s *Server) readyForListeners(dur time.Duration) bool {
end := time.Now().Add(dur)
for time.Now().Before(end) {
s.mu.Lock()
listeners := s.serviceListeners()
s.mu.Unlock()
if len(listeners) == 0 {
return false
}
ok := true
for _, l := range listeners {
if l == nil {
ok = false
break
}
}
if ok {
return true
}
select {
case <-s.quitCh:
return false
case <-time.After(25 * time.Millisecond):
// continue - unable to select from quit - we are still running
}
}
return false
}
// returns a list of listeners that are intended for the process
// if the entry is nil, the interface is yet to be resolved
func (s *Server) serviceListeners() []net.Listener {
listeners := make([]net.Listener, 0)
opts := s.getOpts()
listeners = append(listeners, s.listener)
if opts.Cluster.Port != 0 {
listeners = append(listeners, s.routeListener)
}
if opts.HTTPPort != 0 || opts.HTTPSPort != 0 {
listeners = append(listeners, s.http)
}
if opts.ProfPort != 0 {
listeners = append(listeners, s.profiler)
}
if opts.Websocket.Port != 0 {
listeners = append(listeners, s.websocket.listener)
}
return listeners
}
// Returns true if in lame duck mode.
func (s *Server) isLameDuckMode() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.ldm
}
// This function will close the client listener then close the clients
// at some interval to avoid a reconnecting storm.
func (s *Server) lameDuckMode() {
s.mu.Lock()
// Check if there is actually anything to do
if s.shutdown || s.ldm || s.listener == nil {
s.mu.Unlock()
return
}
s.Noticef("Entering lame duck mode, stop accepting new clients")
s.ldm = true
expected := 1
s.listener.Close()
s.listener = nil
if s.websocket.server != nil {
expected++
s.websocket.server.Close()
s.websocket.server = nil
s.websocket.listener = nil
}
s.ldmCh = make(chan bool, expected)
opts := s.getOpts()
gp := opts.LameDuckGracePeriod
// For tests, we want the grace period to be in some cases bigger
// than the ldm duration, so to by-pass the validateOptions() check,
// we use negative number and flip it here.
if gp < 0 {
gp *= -1
}
s.mu.Unlock()
// If we are running any raftNodes transfer leaders.
if hadTransfers := s.transferRaftLeaders(); hadTransfers {
// They will tranfer leadership quickly, but wait here for a second.
select {
case <-time.After(time.Second):
case <-s.quitCh:
return
}
}
// Wait for accept loops to be done to make sure that no new
// client can connect
for i := 0; i < expected; i++ {
<-s.ldmCh
}
s.mu.Lock()
// Need to recheck few things
if s.shutdown || len(s.clients) == 0 {
s.mu.Unlock()
// If there is no client, we need to call Shutdown() to complete
// the LDMode. If server has been shutdown while lock was released,
// calling Shutdown() should be no-op.
s.Shutdown()
return
}
dur := int64(opts.LameDuckDuration)
dur -= int64(gp)
if dur <= 0 {
dur = int64(time.Second)
}
numClients := int64(len(s.clients))
batch := 1
// Sleep interval between each client connection close.
si := dur / numClients
if si < 1 {
// Should not happen (except in test with very small LD duration), but
// if there are too many clients, batch the number of close and
// use a tiny sleep interval that will result in yield likely.
si = 1
batch = int(numClients / dur)
} else if si > int64(time.Second) {
// Conversely, there is no need to sleep too long between clients
// and spread say 10 clients for the 2min duration. Sleeping no
// more than 1sec.
si = int64(time.Second)
}
// Now capture all clients
clients := make([]*client, 0, len(s.clients))
for _, client := range s.clients {
clients = append(clients, client)
}
// Now that we know that no new client can be accepted,
// send INFO to routes and clients to notify this state.
s.sendLDMToRoutes()
s.sendLDMToClients()
s.mu.Unlock()
t := time.NewTimer(gp)
// Delay start of closing of client connections in case
// we have several servers that we want to signal to enter LD mode
// and not have their client reconnect to each other.
select {
case <-t.C:
s.Noticef("Closing existing clients")
case <-s.quitCh:
t.Stop()
return
}
for i, client := range clients {
client.closeConnection(ServerShutdown)
if i == len(clients)-1 {
break
}
if batch == 1 || i%batch == 0 {
// We pick a random interval which will be at least si/2
v := rand.Int63n(si)
if v < si/2 {
v = si / 2
}
t.Reset(time.Duration(v))
// Sleep for given interval or bail out if kicked by Shutdown().
select {
case <-t.C:
case <-s.quitCh:
t.Stop()
return
}
}
}
s.Shutdown()
}
// Send an INFO update to routes with the indication that this server is in LDM mode.
// Server lock is held on entry.
func (s *Server) sendLDMToRoutes() {
s.routeInfo.LameDuckMode = true
s.generateRouteInfoJSON()
for _, r := range s.routes {
r.mu.Lock()
r.enqueueProto(s.routeInfoJSON)
r.mu.Unlock()
}
// Clear now so that we notify only once, should we have to send other INFOs.
s.routeInfo.LameDuckMode = false
}
// Send an INFO update to clients with the indication that this server is in
// LDM mode and with only URLs of other nodes.
// Server lock is held on entry.
func (s *Server) sendLDMToClients() {
s.info.LameDuckMode = true
// Clear this so that if there are further updates, we don't send our URLs.
s.clientConnectURLs = s.clientConnectURLs[:0]
if s.websocket.connectURLs != nil {
s.websocket.connectURLs = s.websocket.connectURLs[:0]
}
// Reset content first.
s.info.ClientConnectURLs = s.info.ClientConnectURLs[:0]
s.info.WSConnectURLs = s.info.WSConnectURLs[:0]
// Only add the other nodes if we are allowed to.
if !s.getOpts().Cluster.NoAdvertise {
for url := range s.clientConnectURLsMap {
s.info.ClientConnectURLs = append(s.info.ClientConnectURLs, url)
}
for url := range s.websocket.connectURLsMap {
s.info.WSConnectURLs = append(s.info.WSConnectURLs, url)
}
}
// Send to all registered clients that support async INFO protocols.
s.sendAsyncInfoToClients(true, true)
// We now clear the info.LameDuckMode flag so that if there are
// cluster updates and we send the INFO, we don't have the boolean
// set which would cause multiple LDM notifications to clients.
s.info.LameDuckMode = false
}
// If given error is a net.Error and is temporary, sleeps for the given
// delay and double it, but cap it to ACCEPT_MAX_SLEEP. The sleep is
// interrupted if the server is shutdown.
// An error message is displayed depending on the type of error.
// Returns the new (or unchanged) delay, or a negative value if the
// server has been or is being shutdown.
func (s *Server) acceptError(acceptName string, err error, tmpDelay time.Duration) time.Duration {
if !s.isRunning() {
return -1
}
if ne, ok := err.(net.Error); ok && ne.Temporary() {
s.Errorf("Temporary %s Accept Error(%v), sleeping %dms", acceptName, ne, tmpDelay/time.Millisecond)
select {
case <-time.After(tmpDelay):
case <-s.quitCh:
return -1
}
tmpDelay *= 2
if tmpDelay > ACCEPT_MAX_SLEEP {
tmpDelay = ACCEPT_MAX_SLEEP
}
} else {
s.Errorf("%s Accept error: %v", acceptName, err)
}
return tmpDelay
}
var errNoIPAvail = errors.New("no IP available")
func (s *Server) getRandomIP(resolver netResolver, url string, excludedAddresses map[string]struct{}) (string, error) {
host, port, err := net.SplitHostPort(url)
if err != nil {
return "", err
}
// If already an IP, skip.
if net.ParseIP(host) != nil {
return url, nil
}
ips, err := resolver.LookupHost(context.Background(), host)
if err != nil {
return "", fmt.Errorf("lookup for host %q: %v", host, err)
}
if len(excludedAddresses) > 0 {
for i := 0; i < len(ips); i++ {
ip := ips[i]
addr := net.JoinHostPort(ip, port)
if _, excluded := excludedAddresses[addr]; excluded {
if len(ips) == 1 {
ips = nil
break
}
ips[i] = ips[len(ips)-1]
ips = ips[:len(ips)-1]
i--
}
}
if len(ips) == 0 {
return "", errNoIPAvail
}
}
var address string
if len(ips) == 0 {
s.Warnf("Unable to get IP for %s, will try with %s: %v", host, url, err)
address = url
} else {
var ip string
if len(ips) == 1 {
ip = ips[0]
} else {
ip = ips[rand.Int31n(int32(len(ips)))]
}
// add the port
address = net.JoinHostPort(ip, port)
}
return address, nil
}
// Returns true for the first attempt and depending on the nature
// of the attempt (first connect or a reconnect), when the number
// of attempts is equal to the configured report attempts.
func (s *Server) shouldReportConnectErr(firstConnect bool, attempts int) bool {
opts := s.getOpts()
if firstConnect {
if attempts == 1 || attempts%opts.ConnectErrorReports == 0 {
return true
}
return false
}
if attempts == 1 || attempts%opts.ReconnectErrorReports == 0 {
return true
}
return false
}
// Invoked for route, leaf and gateway connections. Set the very first
// PING to a lower interval to capture the initial RTT.
// After that the PING interval will be set to the user defined value.
// Client lock should be held.
func (s *Server) setFirstPingTimer(c *client) {
opts := s.getOpts()
d := opts.PingInterval
if !opts.DisableShortFirstPing {
if c.kind != CLIENT {
if d > firstPingInterval {
d = firstPingInterval
}
if c.kind == GATEWAY {
d = adjustPingIntervalForGateway(d)
}
} else if d > firstClientPingInterval {
d = firstClientPingInterval
}
}
// We randomize the first one by an offset up to 20%, e.g. 2m ~= max 24s.
addDelay := rand.Int63n(int64(d / 5))
d += time.Duration(addDelay)
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
}