// Copyright 2012-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package server import ( "context" "crypto/tls" "crypto/x509" "errors" "flag" "fmt" "io/ioutil" "net" "net/url" "os" "path" "path/filepath" "regexp" "runtime" "strconv" "strings" "sync/atomic" "time" "github.com/nats-io/jwt/v2" "github.com/nats-io/nkeys" "github.com/nats-io/nats-server/v2/conf" ) var allowUnknownTopLevelField = int32(0) // NoErrOnUnknownFields can be used to change the behavior the processing // of a configuration file. By default, an error is reported if unknown // fields are found. If `noError` is set to true, no error will be reported // if top-level unknown fields are found. func NoErrOnUnknownFields(noError bool) { var val int32 if noError { val = int32(1) } atomic.StoreInt32(&allowUnknownTopLevelField, val) } // ClusterOpts are options for clusters. // NOTE: This structure is no longer used for monitoring endpoints // and json tags are deprecated and may be removed in the future. type ClusterOpts struct { Name string `json:"-"` Host string `json:"addr,omitempty"` Port int `json:"cluster_port,omitempty"` Username string `json:"-"` Password string `json:"-"` AuthTimeout float64 `json:"auth_timeout,omitempty"` Permissions *RoutePermissions `json:"-"` TLSTimeout float64 `json:"-"` TLSConfig *tls.Config `json:"-"` TLSMap bool `json:"-"` TLSCheckKnownURLs bool `json:"-"` ListenStr string `json:"-"` Advertise string `json:"-"` NoAdvertise bool `json:"-"` ConnectRetries int `json:"-"` // Not exported (used in tests) resolver netResolver } // GatewayOpts are options for gateways. // NOTE: This structure is no longer used for monitoring endpoints // and json tags are deprecated and may be removed in the future. type GatewayOpts struct { Name string `json:"name"` Host string `json:"addr,omitempty"` Port int `json:"port,omitempty"` Username string `json:"-"` Password string `json:"-"` AuthTimeout float64 `json:"auth_timeout,omitempty"` TLSConfig *tls.Config `json:"-"` TLSTimeout float64 `json:"tls_timeout,omitempty"` TLSMap bool `json:"-"` TLSCheckKnownURLs bool `json:"-"` Advertise string `json:"advertise,omitempty"` ConnectRetries int `json:"connect_retries,omitempty"` Gateways []*RemoteGatewayOpts `json:"gateways,omitempty"` RejectUnknown bool `json:"reject_unknown,omitempty"` // config got renamed to reject_unknown_cluster // Not exported, for tests. resolver netResolver sendQSubsBufSize int } // RemoteGatewayOpts are options for connecting to a remote gateway // NOTE: This structure is no longer used for monitoring endpoints // and json tags are deprecated and may be removed in the future. type RemoteGatewayOpts struct { Name string `json:"name"` TLSConfig *tls.Config `json:"-"` TLSTimeout float64 `json:"tls_timeout,omitempty"` URLs []*url.URL `json:"urls,omitempty"` } // LeafNodeOpts are options for a given server to accept leaf node connections and/or connect to a remote cluster. type LeafNodeOpts struct { Host string `json:"addr,omitempty"` Port int `json:"port,omitempty"` Username string `json:"-"` Password string `json:"-"` Account string `json:"-"` Users []*User `json:"-"` AuthTimeout float64 `json:"auth_timeout,omitempty"` TLSConfig *tls.Config `json:"-"` TLSTimeout float64 `json:"tls_timeout,omitempty"` TLSMap bool `json:"-"` Advertise string `json:"-"` NoAdvertise bool `json:"-"` ReconnectInterval time.Duration `json:"-"` // For solicited connections to other clusters/superclusters. Remotes []*RemoteLeafOpts `json:"remotes,omitempty"` // Not exported, for tests. resolver netResolver dialTimeout time.Duration connDelay time.Duration } // RemoteLeafOpts are options for connecting to a remote server as a leaf node. type RemoteLeafOpts struct { LocalAccount string `json:"local_account,omitempty"` URLs []*url.URL `json:"urls,omitempty"` Credentials string `json:"-"` TLS bool `json:"-"` TLSConfig *tls.Config `json:"-"` TLSTimeout float64 `json:"tls_timeout,omitempty"` Hub bool `json:"hub,omitempty"` DenyImports []string `json:"-"` DenyExports []string `json:"-"` // When an URL has the "ws" (or "wss") scheme, then the server will initiate the // connection as a websocket connection. By default, the websocket frames will be // masked (as if this server was a websocket client to the remote server). The // NoMasking option will change this behavior and will send umasked frames. Websocket struct { Compression bool `json:"-"` NoMasking bool `json:"-"` } } // Options block for nats-server. // NOTE: This structure is no longer used for monitoring endpoints // and json tags are deprecated and may be removed in the future. type Options struct { ConfigFile string `json:"-"` ServerName string `json:"server_name"` Host string `json:"addr"` Port int `json:"port"` ClientAdvertise string `json:"-"` Trace bool `json:"-"` Debug bool `json:"-"` TraceVerbose bool `json:"-"` NoLog bool `json:"-"` NoSigs bool `json:"-"` NoSublistCache bool `json:"-"` NoHeaderSupport bool `json:"-"` DisableShortFirstPing bool `json:"-"` Logtime bool `json:"-"` MaxConn int `json:"max_connections"` MaxSubs int `json:"max_subscriptions,omitempty"` Nkeys []*NkeyUser `json:"-"` Users []*User `json:"-"` Accounts []*Account `json:"-"` NoAuthUser string `json:"-"` SystemAccount string `json:"-"` NoSystemAccount bool `json:"-"` AllowNewAccounts bool `json:"-"` Username string `json:"-"` Password string `json:"-"` Authorization string `json:"-"` PingInterval time.Duration `json:"ping_interval"` MaxPingsOut int `json:"ping_max"` HTTPHost string `json:"http_host"` HTTPPort int `json:"http_port"` HTTPBasePath string `json:"http_base_path"` HTTPSPort int `json:"https_port"` AuthTimeout float64 `json:"auth_timeout"` MaxControlLine int32 `json:"max_control_line"` MaxPayload int32 `json:"max_payload"` MaxPending int64 `json:"max_pending"` Cluster ClusterOpts `json:"cluster,omitempty"` Gateway GatewayOpts `json:"gateway,omitempty"` LeafNode LeafNodeOpts `json:"leaf,omitempty"` JetStream bool `json:"jetstream"` JetStreamMaxMemory int64 `json:"-"` JetStreamMaxStore int64 `json:"-"` StoreDir string `json:"-"` Websocket WebsocketOpts `json:"-"` MQTT MQTTOpts `json:"-"` ProfPort int `json:"-"` PidFile string `json:"-"` PortsFileDir string `json:"-"` LogFile string `json:"-"` LogSizeLimit int64 `json:"-"` Syslog bool `json:"-"` RemoteSyslog string `json:"-"` Routes []*url.URL `json:"-"` RoutesStr string `json:"-"` TLSTimeout float64 `json:"tls_timeout"` TLS bool `json:"-"` TLSVerify bool `json:"-"` TLSMap bool `json:"-"` TLSCert string `json:"-"` TLSKey string `json:"-"` TLSCaCert string `json:"-"` TLSConfig *tls.Config `json:"-"` AllowNonTLS bool `json:"-"` WriteDeadline time.Duration `json:"-"` MaxClosedClients int `json:"-"` LameDuckDuration time.Duration `json:"-"` LameDuckGracePeriod time.Duration `json:"-"` // MaxTracedMsgLen is the maximum printable length for traced messages. MaxTracedMsgLen int `json:"-"` // Operating a trusted NATS server TrustedKeys []string `json:"-"` TrustedOperators []*jwt.OperatorClaims `json:"-"` AccountResolver AccountResolver `json:"-"` AccountResolverTLSConfig *tls.Config `json:"-"` CustomClientAuthentication Authentication `json:"-"` CustomRouterAuthentication Authentication `json:"-"` // CheckConfig configuration file syntax test was successful and exit. CheckConfig bool `json:"-"` // ConnectErrorReports specifies the number of failed attempts // at which point server should report the failure of an initial // connection to a route, gateway or leaf node. // See DEFAULT_CONNECT_ERROR_REPORTS for default value. ConnectErrorReports int // ReconnectErrorReports is similar to ConnectErrorReports except // that this applies to reconnect events. ReconnectErrorReports int // Tags describing the server. They will be included in varz // and used as a filter criteria for some system requests Tags jwt.TagList `json:"-"` // private fields, used to know if bool options are explicitly // defined in config and/or command line params. inConfig map[string]bool inCmdLine map[string]bool // private fields for operator mode operatorJWT []string resolverPreloads map[string]string // private fields, used for testing gatewaysSolicitDelay time.Duration routeProto int } // WebsocketOpts are options for websocket type WebsocketOpts struct { // The server will accept websocket client connections on this hostname/IP. Host string // The server will accept websocket client connections on this port. Port int // The host:port to advertise to websocket clients in the cluster. Advertise string // If no user name is provided when a client connects, will default to the // matching user from the global list of users in `Options.Users`. NoAuthUser string // Name of the cookie, which if present in WebSocket upgrade headers, // will be treated as JWT during CONNECT phase as long as // "jwt" specified in the CONNECT options is missing or empty. JWTCookie string // Authentication section. If anything is configured in this section, // it will override the authorization configuration of regular clients. Username string Password string Token string // Timeout for the authentication process. AuthTimeout float64 // By default the server will enforce the use of TLS. If no TLS configuration // is provided, you need to explicitly set NoTLS to true to allow the server // to start without TLS configuration. Note that if a TLS configuration is // present, this boolean is ignored and the server will run the Websocket // server with that TLS configuration. // Running without TLS is less secure since Websocket clients that use bearer // tokens will send them in clear. So this should not be used in production. NoTLS bool // TLS configuration is required. TLSConfig *tls.Config // If true, map certificate values for authentication purposes. TLSMap bool // If true, the Origin header must match the request's host. SameOrigin bool // Only origins in this list will be accepted. If empty and // SameOrigin is false, any origin is accepted. AllowedOrigins []string // If set to true, the server will negotiate with clients // if compression can be used. If this is false, no compression // will be used (both in server and clients) since it has to // be negotiated between both endpoints Compression bool // Total time allowed for the server to read the client request // and write the response back to the client. This include the // time needed for the TLS Handshake. HandshakeTimeout time.Duration } // MQTTOpts are options for MQTT type MQTTOpts struct { // The server will accept MQTT client connections on this hostname/IP. Host string // The server will accept MQTT client connections on this port. Port int // If no user name is provided when a client connects, will default to the // matching user from the global list of users in `Options.Users`. NoAuthUser string // Authentication section. If anything is configured in this section, // it will override the authorization configuration of regular clients. Username string Password string Token string // Timeout for the authentication process. AuthTimeout float64 // TLS configuration is required. TLSConfig *tls.Config // If true, map certificate values for authentication purposes. TLSMap bool // Timeout for the TLS handshake TLSTimeout float64 // AckWait is the amount of time after which a QoS 1 message sent to // a client is redelivered as a DUPLICATE if the server has not // received the PUBACK on the original Packet Identifier. // The value has to be positive. // Zero will cause the server to use the default value (30 seconds). // Note that changes to this option is applied only to new MQTT subscriptions. AckWait time.Duration // MaxAckPending is the amount of QoS 1 messages the server can send to // a subscription without receiving any PUBACK for those messages. // The valid range is [0..65535]. // The total of subscriptions' MaxAckPending on a given session cannot // exceed 65535. Attempting to create a subscription that would bring // the total above the limit would result in the server returning 0x80 // in the SUBACK for this subscription. // Due to how the NATS Server handles the MQTT "#" wildcard, each // subscription ending with "#" will use 2 times the MaxAckPending value. // Note that changes to this option is applied only to new subscriptions. MaxAckPending uint16 } type netResolver interface { LookupHost(ctx context.Context, host string) ([]string, error) } // Clone performs a deep copy of the Options struct, returning a new clone // with all values copied. func (o *Options) Clone() *Options { if o == nil { return nil } clone := &Options{} *clone = *o if o.Users != nil { clone.Users = make([]*User, len(o.Users)) for i, user := range o.Users { clone.Users[i] = user.clone() } } if o.Nkeys != nil { clone.Nkeys = make([]*NkeyUser, len(o.Nkeys)) for i, nkey := range o.Nkeys { clone.Nkeys[i] = nkey.clone() } } if o.Routes != nil { clone.Routes = deepCopyURLs(o.Routes) } if o.TLSConfig != nil { clone.TLSConfig = o.TLSConfig.Clone() } if o.Cluster.TLSConfig != nil { clone.Cluster.TLSConfig = o.Cluster.TLSConfig.Clone() } if o.Gateway.TLSConfig != nil { clone.Gateway.TLSConfig = o.Gateway.TLSConfig.Clone() } if len(o.Gateway.Gateways) > 0 { clone.Gateway.Gateways = make([]*RemoteGatewayOpts, len(o.Gateway.Gateways)) for i, g := range o.Gateway.Gateways { clone.Gateway.Gateways[i] = g.clone() } } // FIXME(dlc) - clone leaf node stuff. return clone } func deepCopyURLs(urls []*url.URL) []*url.URL { if urls == nil { return nil } curls := make([]*url.URL, len(urls)) for i, u := range urls { cu := &url.URL{} *cu = *u curls[i] = cu } return curls } // Configuration file authorization section. type authorization struct { // Singles user string pass string token string acc string // Multiple Nkeys/Users nkeys []*NkeyUser users []*User timeout float64 defaultPermissions *Permissions } // TLSConfigOpts holds the parsed tls config information, // used with flag parsing type TLSConfigOpts struct { CertFile string KeyFile string CaFile string Verify bool Insecure bool Map bool TLSCheckKnownURLs bool Timeout float64 Ciphers []uint16 CurvePreferences []tls.CurveID } var tlsUsage = ` TLS configuration is specified in the tls section of a configuration file: e.g. tls { cert_file: "./certs/server-cert.pem" key_file: "./certs/server-key.pem" ca_file: "./certs/ca.pem" verify: true verify_and_map: true cipher_suites: [ "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256" ] curve_preferences: [ "CurveP256", "CurveP384", "CurveP521" ] } Available cipher suites include: ` // ProcessConfigFile processes a configuration file. // FIXME(dlc): A bit hacky func ProcessConfigFile(configFile string) (*Options, error) { opts := &Options{} if err := opts.ProcessConfigFile(configFile); err != nil { // If only warnings then continue and return the options. if cerr, ok := err.(*processConfigErr); ok && len(cerr.Errors()) == 0 { return opts, nil } return nil, err } return opts, nil } // token is an item parsed from the configuration. type token interface { Value() interface{} Line() int IsUsedVariable() bool SourceFile() string Position() int } // unwrapValue can be used to get the token and value from an item // to be able to report the line number in case of an incorrect // configuration. // also stores the token in lastToken for use in convertPanicToError func unwrapValue(v interface{}, lastToken *token) (token, interface{}) { switch tk := v.(type) { case token: if lastToken != nil { *lastToken = tk } return tk, tk.Value() default: return nil, v } } // use in defer to recover from panic and turn it into an error associated with last token func convertPanicToErrorList(lastToken *token, errors *[]error) { // only recover if an error can be stored if errors == nil { return } else if err := recover(); err == nil { return } else if lastToken != nil && *lastToken != nil { *errors = append(*errors, &configErr{*lastToken, fmt.Sprint(err)}) } else { *errors = append(*errors, fmt.Errorf("encountered panic without a token %v", err)) } } // use in defer to recover from panic and turn it into an error associated with last token func convertPanicToError(lastToken *token, e *error) { // only recover if an error can be stored if e == nil || *e != nil { return } else if err := recover(); err == nil { return } else if lastToken != nil && *lastToken != nil { *e = &configErr{*lastToken, fmt.Sprint(err)} } else { *e = fmt.Errorf("%v", err) } } // configureSystemAccount configures a system account // if present in the configuration. func configureSystemAccount(o *Options, m map[string]interface{}) (retErr error) { var lt token defer convertPanicToError(<, &retErr) configure := func(v interface{}) error { tk, v := unwrapValue(v, <) sa, ok := v.(string) if !ok { return &configErr{tk, "system account name must be a string"} } o.SystemAccount = sa return nil } if v, ok := m["system_account"]; ok { return configure(v) } else if v, ok := m["system"]; ok { return configure(v) } return nil } // ProcessConfigFile updates the Options structure with options // present in the given configuration file. // This version is convenient if one wants to set some default // options and then override them with what is in the config file. // For instance, this version allows you to do something such as: // // opts := &Options{Debug: true} // opts.ProcessConfigFile(myConfigFile) // // If the config file contains "debug: false", after this call, // opts.Debug would really be false. It would be impossible to // achieve that with the non receiver ProcessConfigFile() version, // since one would not know after the call if "debug" was not present // or was present but set to false. func (o *Options) ProcessConfigFile(configFile string) error { o.ConfigFile = configFile if configFile == "" { return nil } m, err := conf.ParseFileWithChecks(configFile) if err != nil { return err } // Collect all errors and warnings and report them all together. errors := make([]error, 0) warnings := make([]error, 0) // First check whether a system account has been defined, // as that is a condition for other features to be enabled. if err := configureSystemAccount(o, m); err != nil { errors = append(errors, err) } for k, v := range m { o.processConfigFileLine(k, v, &errors, &warnings) } if len(errors) > 0 || len(warnings) > 0 { return &processConfigErr{ errors: errors, warnings: warnings, } } return nil } func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error, warnings *[]error) { var lt token defer convertPanicToErrorList(<, errors) tk, v := unwrapValue(v, <) switch strings.ToLower(k) { case "listen": hp, err := parseListen(v) if err != nil { *errors = append(*errors, &configErr{tk, err.Error()}) return } o.Host = hp.host o.Port = hp.port case "client_advertise": o.ClientAdvertise = v.(string) case "port": o.Port = int(v.(int64)) case "server_name": o.ServerName = v.(string) case "host", "net": o.Host = v.(string) case "debug": o.Debug = v.(bool) trackExplicitVal(o, &o.inConfig, "Debug", o.Debug) case "trace": o.Trace = v.(bool) trackExplicitVal(o, &o.inConfig, "Trace", o.Trace) case "trace_verbose": o.TraceVerbose = v.(bool) o.Trace = v.(bool) trackExplicitVal(o, &o.inConfig, "TraceVerbose", o.TraceVerbose) trackExplicitVal(o, &o.inConfig, "Trace", o.Trace) case "logtime": o.Logtime = v.(bool) trackExplicitVal(o, &o.inConfig, "Logtime", o.Logtime) case "mappings", "maps": gacc := NewAccount(globalAccountName) o.Accounts = append(o.Accounts, gacc) err := parseAccountMappings(tk, gacc, errors, warnings) if err != nil { *errors = append(*errors, err) return } case "disable_sublist_cache", "no_sublist_cache": o.NoSublistCache = v.(bool) case "accounts": err := parseAccounts(tk, o, errors, warnings) if err != nil { *errors = append(*errors, err) return } case "authorization": auth, err := parseAuthorization(tk, o, errors, warnings) if err != nil { *errors = append(*errors, err) return } o.Username = auth.user o.Password = auth.pass o.Authorization = auth.token if (auth.user != "" || auth.pass != "") && auth.token != "" { err := &configErr{tk, "Cannot have a user/pass and token"} *errors = append(*errors, err) return } o.AuthTimeout = auth.timeout // Check for multiple users defined if auth.users != nil { if auth.user != "" { err := &configErr{tk, "Can not have a single user/pass and a users array"} *errors = append(*errors, err) return } if auth.token != "" { err := &configErr{tk, "Can not have a token and a users array"} *errors = append(*errors, err) return } // Users may have been added from Accounts parsing, so do an append here o.Users = append(o.Users, auth.users...) } // Check for nkeys if auth.nkeys != nil { // NKeys may have been added from Accounts parsing, so do an append here o.Nkeys = append(o.Nkeys, auth.nkeys...) } case "http": hp, err := parseListen(v) if err != nil { err := &configErr{tk, err.Error()} *errors = append(*errors, err) return } o.HTTPHost = hp.host o.HTTPPort = hp.port case "https": hp, err := parseListen(v) if err != nil { err := &configErr{tk, err.Error()} *errors = append(*errors, err) return } o.HTTPHost = hp.host o.HTTPSPort = hp.port case "http_port", "monitor_port": o.HTTPPort = int(v.(int64)) case "https_port": o.HTTPSPort = int(v.(int64)) case "http_base_path": o.HTTPBasePath = v.(string) case "cluster": err := parseCluster(tk, o, errors, warnings) if err != nil { *errors = append(*errors, err) return } case "gateway": if err := parseGateway(tk, o, errors, warnings); err != nil { *errors = append(*errors, err) return } case "leaf", "leafnodes": err := parseLeafNodes(tk, o, errors, warnings) if err != nil { *errors = append(*errors, err) return } case "jetstream": err := parseJetStream(tk, o, errors, warnings) if err != nil { *errors = append(*errors, err) return } case "logfile", "log_file": o.LogFile = v.(string) case "logfile_size_limit", "log_size_limit": o.LogSizeLimit = v.(int64) case "syslog": o.Syslog = v.(bool) trackExplicitVal(o, &o.inConfig, "Syslog", o.Syslog) case "remote_syslog": o.RemoteSyslog = v.(string) case "pidfile", "pid_file": o.PidFile = v.(string) case "ports_file_dir": o.PortsFileDir = v.(string) case "prof_port": o.ProfPort = int(v.(int64)) case "max_control_line": if v.(int64) > 1<<31-1 { err := &configErr{tk, fmt.Sprintf("%s value is too big", k)} *errors = append(*errors, err) return } o.MaxControlLine = int32(v.(int64)) case "max_payload": if v.(int64) > 1<<31-1 { err := &configErr{tk, fmt.Sprintf("%s value is too big", k)} *errors = append(*errors, err) return } o.MaxPayload = int32(v.(int64)) case "max_pending": o.MaxPending = v.(int64) case "max_connections", "max_conn": o.MaxConn = int(v.(int64)) case "max_traced_msg_len": o.MaxTracedMsgLen = int(v.(int64)) case "max_subscriptions", "max_subs": o.MaxSubs = int(v.(int64)) case "ping_interval": o.PingInterval = parseDuration("ping_interval", tk, v, errors, warnings) case "ping_max": o.MaxPingsOut = int(v.(int64)) case "tls": tc, err := parseTLS(tk, true) if err != nil { *errors = append(*errors, err) return } if o.TLSConfig, err = GenTLSConfig(tc); err != nil { err := &configErr{tk, err.Error()} *errors = append(*errors, err) return } o.TLSTimeout = tc.Timeout o.TLSMap = tc.Map case "allow_non_tls": o.AllowNonTLS = v.(bool) case "write_deadline": o.WriteDeadline = parseDuration("write_deadline", tk, v, errors, warnings) case "lame_duck_duration": dur, err := time.ParseDuration(v.(string)) if err != nil { err := &configErr{tk, fmt.Sprintf("error parsing lame_duck_duration: %v", err)} *errors = append(*errors, err) return } if dur < 30*time.Second { err := &configErr{tk, fmt.Sprintf("invalid lame_duck_duration of %v, minimum is 30 seconds", dur)} *errors = append(*errors, err) return } o.LameDuckDuration = dur case "lame_duck_grace_period": dur, err := time.ParseDuration(v.(string)) if err != nil { err := &configErr{tk, fmt.Sprintf("error parsing lame_duck_grace_period: %v", err)} *errors = append(*errors, err) return } if dur < 0 { err := &configErr{tk, "invalid lame_duck_grace_period, needs to be positive"} *errors = append(*errors, err) return } o.LameDuckGracePeriod = dur case "operator", "operators", "roots", "root", "root_operators", "root_operator": opFiles := []string{} switch v := v.(type) { case string: opFiles = append(opFiles, v) case []string: opFiles = append(opFiles, v...) default: err := &configErr{tk, fmt.Sprintf("error parsing operators: unsupported type %T", v)} *errors = append(*errors, err) } // Assume for now these are file names, but they can also be the JWT itself inline. o.TrustedOperators = make([]*jwt.OperatorClaims, 0, len(opFiles)) for _, fname := range opFiles { theJWT, opc, err := readOperatorJWT(fname) if err != nil { err := &configErr{tk, fmt.Sprintf("error parsing operator JWT: %v", err)} *errors = append(*errors, err) continue } o.operatorJWT = append(o.operatorJWT, theJWT) o.TrustedOperators = append(o.TrustedOperators, opc) } if len(o.TrustedOperators) == 1 { // In case "resolver" is defined as well, it takes precedence if o.AccountResolver == nil { if accUrl, err := parseURL(o.TrustedOperators[0].AccountServerURL, "account resolver"); err == nil { // nsc automatically appends "/accounts" during nsc push o.AccountResolver, _ = NewURLAccResolver(accUrl.String() + "/accounts") } } // In case "system_account" is defined as well, it takes precedence if o.SystemAccount == "" { o.SystemAccount = o.TrustedOperators[0].SystemAccount } } case "resolver", "account_resolver", "accounts_resolver": switch v := v.(type) { case string: // "resolver" takes precedence over value obtained from "operator". // Clear so that parsing errors are not silently ignored. o.AccountResolver = nil memResolverRe := regexp.MustCompile(`(?i)(MEM|MEMORY)\s*`) resolverRe := regexp.MustCompile(`(?i)(?:URL){1}(?:\({1}\s*"?([^\s"]*)"?\s*\){1})?\s*`) if memResolverRe.MatchString(v) { o.AccountResolver = &MemAccResolver{} } else if items := resolverRe.FindStringSubmatch(v); len(items) == 2 { url := items[1] _, err := parseURL(url, "account resolver") if err != nil { *errors = append(*errors, &configErr{tk, err.Error()}) return } if ur, err := NewURLAccResolver(url); err != nil { err := &configErr{tk, err.Error()} *errors = append(*errors, err) return } else { o.AccountResolver = ur } } case map[string]interface{}: del := false dir := "" dirType := "" limit := int64(0) ttl := time.Duration(0) sync := time.Duration(0) var err error if v, ok := v["dir"]; ok { _, v := unwrapValue(v, <) dir = v.(string) } if v, ok := v["type"]; ok { _, v := unwrapValue(v, <) dirType = v.(string) } if v, ok := v["allow_delete"]; ok { _, v := unwrapValue(v, <) del = v.(bool) } if v, ok := v["limit"]; ok { _, v := unwrapValue(v, <) limit = v.(int64) } if v, ok := v["ttl"]; ok { _, v := unwrapValue(v, <) ttl, err = time.ParseDuration(v.(string)) } if v, ok := v["interval"]; err == nil && ok { _, v := unwrapValue(v, <) sync, err = time.ParseDuration(v.(string)) } if err != nil { *errors = append(*errors, &configErr{tk, err.Error()}) return } if dir == "" { *errors = append(*errors, &configErr{tk, "dir has no value and needs to point to a directory"}) return } if info, _ := os.Stat(dir); info != nil && (!info.IsDir() || info.Mode().Perm()&(1<<(uint(7))) == 0) { *errors = append(*errors, &configErr{tk, "dir needs to point to an accessible directory"}) return } var res AccountResolver switch strings.ToUpper(dirType) { case "CACHE": if sync != 0 { *errors = append(*errors, &configErr{tk, "CACHE does not accept sync"}) } if del { *errors = append(*errors, &configErr{tk, "CACHE does not accept allow_delete"}) } res, err = NewCacheDirAccResolver(dir, limit, ttl) case "FULL": if ttl != 0 { *errors = append(*errors, &configErr{tk, "FULL does not accept ttl"}) } res, err = NewDirAccResolver(dir, limit, sync, del) } if err != nil { *errors = append(*errors, &configErr{tk, err.Error()}) return } o.AccountResolver = res default: err := &configErr{tk, fmt.Sprintf("error parsing operator resolver, wrong type %T", v)} *errors = append(*errors, err) return } if o.AccountResolver == nil { err := &configErr{tk, "error parsing account resolver, should be MEM or " + " URL(\"url\") or a map containing dir and type state=[FULL|CACHE])"} *errors = append(*errors, err) } case "resolver_tls": tc, err := parseTLS(tk, true) if err != nil { *errors = append(*errors, err) return } if o.AccountResolverTLSConfig, err = GenTLSConfig(tc); err != nil { err := &configErr{tk, err.Error()} *errors = append(*errors, err) return } case "resolver_preload": mp, ok := v.(map[string]interface{}) if !ok { err := &configErr{tk, "preload should be a map of account_public_key:account_jwt"} *errors = append(*errors, err) return } o.resolverPreloads = make(map[string]string) for key, val := range mp { tk, val = unwrapValue(val, <) if jwtstr, ok := val.(string); !ok { err := &configErr{tk, "preload map value should be a string JWT"} *errors = append(*errors, err) continue } else { // Make sure this is a valid account JWT, that is a config error. // We will warn of expirations, etc later. if _, err := jwt.DecodeAccountClaims(jwtstr); err != nil { err := &configErr{tk, "invalid account JWT"} *errors = append(*errors, err) continue } o.resolverPreloads[key] = jwtstr } } case "no_auth_user": o.NoAuthUser = v.(string) case "system_account", "system": // Already processed at the beginning so we just skip them // to not treat them as unknown values. return case "no_system_account", "no_system", "no_sys_acc": o.NoSystemAccount = v.(bool) case "no_header_support": o.NoHeaderSupport = v.(bool) case "trusted", "trusted_keys": switch v := v.(type) { case string: o.TrustedKeys = []string{v} case []string: o.TrustedKeys = v case []interface{}: keys := make([]string, 0, len(v)) for _, mv := range v { tk, mv = unwrapValue(mv, <) if key, ok := mv.(string); ok { keys = append(keys, key) } else { err := &configErr{tk, fmt.Sprintf("error parsing trusted: unsupported type in array %T", mv)} *errors = append(*errors, err) continue } } o.TrustedKeys = keys default: err := &configErr{tk, fmt.Sprintf("error parsing trusted: unsupported type %T", v)} *errors = append(*errors, err) } // Do a quick sanity check on keys for _, key := range o.TrustedKeys { if !nkeys.IsValidPublicOperatorKey(key) { err := &configErr{tk, fmt.Sprintf("trust key %q required to be a valid public operator nkey", key)} *errors = append(*errors, err) } } case "connect_error_reports": o.ConnectErrorReports = int(v.(int64)) case "reconnect_error_reports": o.ReconnectErrorReports = int(v.(int64)) case "websocket", "ws": if err := parseWebsocket(tk, o, errors, warnings); err != nil { *errors = append(*errors, err) return } case "mqtt": if err := parseMQTT(tk, o, errors, warnings); err != nil { *errors = append(*errors, err) return } case "server_tags": var err error switch v := v.(type) { case string: o.Tags.Add(v) case []string: o.Tags.Add(v...) case []interface{}: for _, t := range v { if t, ok := t.(token); ok { if t, ok := t.Value().(string); ok { o.Tags.Add(t) continue } else { err = &configErr{tk, fmt.Sprintf("error parsing tags: unsupported type %T where string is expected", t)} } } else { err = &configErr{tk, fmt.Sprintf("error parsing tags: unsupported type %T", t)} } break } default: err = &configErr{tk, fmt.Sprintf("error parsing tags: unsupported type %T", v)} } if err != nil { *errors = append(*errors, err) return } default: if au := atomic.LoadInt32(&allowUnknownTopLevelField); au == 0 && !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: k, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) } } } func parseDuration(field string, tk token, v interface{}, errors *[]error, warnings *[]error) time.Duration { if wd, ok := v.(string); ok { if dur, err := time.ParseDuration(wd); err != nil { err := &configErr{tk, fmt.Sprintf("error parsing %s: %v", field, err)} *errors = append(*errors, err) return 0 } else { return dur } } else { // Backward compatible with old type, assume this is the // number of seconds. err := &configWarningErr{ field: field, configErr: configErr{ token: tk, reason: field + " should be converted to a duration", }, } *warnings = append(*warnings, err) return time.Duration(v.(int64)) * time.Second } } func trackExplicitVal(opts *Options, pm *map[string]bool, name string, val bool) { m := *pm if m == nil { m = make(map[string]bool) *pm = m } m[name] = val } // hostPort is simple struct to hold parsed listen/addr strings. type hostPort struct { host string port int } // parseListen will parse listen option which is replacing host/net and port func parseListen(v interface{}) (*hostPort, error) { hp := &hostPort{} switch vv := v.(type) { // Only a port case int64: hp.port = int(vv) case string: host, port, err := net.SplitHostPort(vv) if err != nil { return nil, fmt.Errorf("could not parse address string %q", vv) } hp.port, err = strconv.Atoi(port) if err != nil { return nil, fmt.Errorf("could not parse port %q", port) } hp.host = host default: return nil, fmt.Errorf("expected port or host:port, got %T", vv) } return hp, nil } // parseCluster will parse the cluster config. func parseCluster(v interface{}, opts *Options, errors *[]error, warnings *[]error) error { var lt token defer convertPanicToErrorList(<, errors) tk, v := unwrapValue(v, <) cm, ok := v.(map[string]interface{}) if !ok { return &configErr{tk, fmt.Sprintf("Expected map to define cluster, got %T", v)} } for mk, mv := range cm { // Again, unwrap token value if line check is required. tk, mv = unwrapValue(mv, <) switch strings.ToLower(mk) { case "name": opts.Cluster.Name = mv.(string) case "listen": hp, err := parseListen(mv) if err != nil { err := &configErr{tk, err.Error()} *errors = append(*errors, err) continue } opts.Cluster.Host = hp.host opts.Cluster.Port = hp.port case "port": opts.Cluster.Port = int(mv.(int64)) case "host", "net": opts.Cluster.Host = mv.(string) case "authorization": auth, err := parseAuthorization(tk, opts, errors, warnings) if err != nil { *errors = append(*errors, err) continue } if auth.users != nil { err := &configErr{tk, "Cluster authorization does not allow multiple users"} *errors = append(*errors, err) continue } opts.Cluster.Username = auth.user opts.Cluster.Password = auth.pass opts.Cluster.AuthTimeout = auth.timeout if auth.defaultPermissions != nil { err := &configWarningErr{ field: mk, configErr: configErr{ token: tk, reason: `setting "permissions" within cluster authorization block is deprecated`, }, } *warnings = append(*warnings, err) // Do not set permissions if they were specified in top-level cluster block. if opts.Cluster.Permissions == nil { setClusterPermissions(&opts.Cluster, auth.defaultPermissions) } } case "routes": ra := mv.([]interface{}) routes, errs := parseURLs(ra, "route") if errs != nil { *errors = append(*errors, errs...) continue } opts.Routes = routes case "tls": config, tlsopts, err := getTLSConfig(tk) if err != nil { *errors = append(*errors, err) continue } opts.Cluster.TLSConfig = config opts.Cluster.TLSTimeout = tlsopts.Timeout opts.Cluster.TLSMap = tlsopts.Map opts.Cluster.TLSCheckKnownURLs = tlsopts.TLSCheckKnownURLs case "cluster_advertise", "advertise": opts.Cluster.Advertise = mv.(string) case "no_advertise": opts.Cluster.NoAdvertise = mv.(bool) trackExplicitVal(opts, &opts.inConfig, "Cluster.NoAdvertise", opts.Cluster.NoAdvertise) case "connect_retries": opts.Cluster.ConnectRetries = int(mv.(int64)) case "permissions": perms, err := parseUserPermissions(mv, errors, warnings) if err != nil { *errors = append(*errors, err) continue } // Dynamic response permissions do not make sense here. if perms.Response != nil { err := &configErr{tk, "Cluster permissions do not support dynamic responses"} *errors = append(*errors, err) continue } // This will possibly override permissions that were define in auth block setClusterPermissions(&opts.Cluster, perms) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: mk, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) continue } } } return nil } func parseURLs(a []interface{}, typ string) (urls []*url.URL, errors []error) { urls = make([]*url.URL, 0, len(a)) var lt token defer convertPanicToErrorList(<, &errors) for _, u := range a { tk, u := unwrapValue(u, <) sURL := u.(string) url, err := parseURL(sURL, typ) if err != nil { err := &configErr{tk, err.Error()} errors = append(errors, err) continue } urls = append(urls, url) } return urls, errors } func parseURL(u string, typ string) (*url.URL, error) { urlStr := strings.TrimSpace(u) url, err := url.Parse(urlStr) if err != nil { return nil, fmt.Errorf("error parsing %s url [%q]", typ, urlStr) } return url, nil } func parseGateway(v interface{}, o *Options, errors *[]error, warnings *[]error) error { var lt token defer convertPanicToErrorList(<, errors) tk, v := unwrapValue(v, <) gm, ok := v.(map[string]interface{}) if !ok { return &configErr{tk, fmt.Sprintf("Expected gateway to be a map, got %T", v)} } for mk, mv := range gm { // Again, unwrap token value if line check is required. tk, mv = unwrapValue(mv, <) switch strings.ToLower(mk) { case "name": o.Gateway.Name = mv.(string) case "listen": hp, err := parseListen(mv) if err != nil { err := &configErr{tk, err.Error()} *errors = append(*errors, err) continue } o.Gateway.Host = hp.host o.Gateway.Port = hp.port case "port": o.Gateway.Port = int(mv.(int64)) case "host", "net": o.Gateway.Host = mv.(string) case "authorization": auth, err := parseAuthorization(tk, o, errors, warnings) if err != nil { *errors = append(*errors, err) continue } if auth.users != nil { *errors = append(*errors, &configErr{tk, "Gateway authorization does not allow multiple users"}) continue } o.Gateway.Username = auth.user o.Gateway.Password = auth.pass o.Gateway.AuthTimeout = auth.timeout case "tls": config, tlsopts, err := getTLSConfig(tk) if err != nil { *errors = append(*errors, err) continue } o.Gateway.TLSConfig = config o.Gateway.TLSTimeout = tlsopts.Timeout o.Gateway.TLSMap = tlsopts.Map o.Gateway.TLSCheckKnownURLs = tlsopts.TLSCheckKnownURLs case "advertise": o.Gateway.Advertise = mv.(string) case "connect_retries": o.Gateway.ConnectRetries = int(mv.(int64)) case "gateways": gateways, err := parseGateways(mv, errors, warnings) if err != nil { return err } o.Gateway.Gateways = gateways case "reject_unknown", "reject_unknown_cluster": o.Gateway.RejectUnknown = mv.(bool) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: mk, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) continue } } } return nil } var dynamicJSAccountLimits = &JetStreamAccountLimits{-1, -1, -1, -1} // Parses jetstream account limits for an account. Simple setup with boolen is allowed, and we will // use dynamic account limits. func parseJetStreamForAccount(v interface{}, acc *Account, errors *[]error, warnings *[]error) error { var lt token tk, v := unwrapValue(v, <) // Value here can be bool, or string "enabled" or a map. switch vv := v.(type) { case bool: if vv { acc.jsLimits = dynamicJSAccountLimits } case string: switch strings.ToLower(vv) { case "enabled", "enable": acc.jsLimits = dynamicJSAccountLimits case "disabled", "disable": acc.jsLimits = nil default: return &configErr{tk, fmt.Sprintf("Expected 'enabled' or 'disabled' for string value, got '%s'", vv)} } case map[string]interface{}: jsLimits := &JetStreamAccountLimits{-1, -1, -1, -1} for mk, mv := range vv { tk, mv = unwrapValue(mv, <) switch strings.ToLower(mk) { case "max_memory", "max_mem", "mem", "memory": vv, ok := mv.(int64) if !ok { return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } jsLimits.MaxMemory = int64(vv) case "max_store", "max_file", "max_disk", "store", "disk": vv, ok := mv.(int64) if !ok { return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } jsLimits.MaxStore = int64(vv) case "max_streams", "streams": vv, ok := mv.(int64) if !ok { return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } jsLimits.MaxStreams = int(vv) case "max_consumers", "consumers": vv, ok := mv.(int64) if !ok { return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)} } jsLimits.MaxConsumers = int(vv) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: mk, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) continue } } } acc.jsLimits = jsLimits default: return &configErr{tk, fmt.Sprintf("Expected map, bool or string to define JetStream, got %T", v)} } return nil } // Parse enablement of jetstream for a server. func parseJetStream(v interface{}, opts *Options, errors *[]error, warnings *[]error) error { var lt token tk, v := unwrapValue(v, <) // Value here can be bool, or string "enabled" or a map. switch vv := v.(type) { case bool: opts.JetStream = v.(bool) case string: switch strings.ToLower(vv) { case "enabled", "enable": opts.JetStream = true case "disabled", "disable": opts.JetStream = false default: return &configErr{tk, fmt.Sprintf("Expected 'enabled' or 'disabled' for string value, got '%s'", vv)} } case map[string]interface{}: for mk, mv := range vv { tk, mv = unwrapValue(mv, <) switch strings.ToLower(mk) { case "store_dir", "storedir": opts.StoreDir = mv.(string) case "max_memory_store", "max_mem_store", "max_mem": opts.JetStreamMaxMemory = mv.(int64) case "max_file_store", "max_file": opts.JetStreamMaxStore = mv.(int64) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: mk, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) continue } } } opts.JetStream = true default: return &configErr{tk, fmt.Sprintf("Expected map, bool or string to define JetStream, got %T", v)} } return nil } // parseLeafNodes will parse the leaf node config. func parseLeafNodes(v interface{}, opts *Options, errors *[]error, warnings *[]error) error { var lt token defer convertPanicToErrorList(<, errors) tk, v := unwrapValue(v, <) cm, ok := v.(map[string]interface{}) if !ok { return &configErr{tk, fmt.Sprintf("Expected map to define a leafnode, got %T", v)} } for mk, mv := range cm { // Again, unwrap token value if line check is required. tk, mv = unwrapValue(mv, <) switch strings.ToLower(mk) { case "listen": hp, err := parseListen(mv) if err != nil { err := &configErr{tk, err.Error()} *errors = append(*errors, err) continue } opts.LeafNode.Host = hp.host opts.LeafNode.Port = hp.port case "port": opts.LeafNode.Port = int(mv.(int64)) case "host", "net": opts.LeafNode.Host = mv.(string) case "authorization": auth, err := parseLeafAuthorization(tk, errors, warnings) if err != nil { *errors = append(*errors, err) continue } opts.LeafNode.Username = auth.user opts.LeafNode.Password = auth.pass opts.LeafNode.AuthTimeout = auth.timeout opts.LeafNode.Account = auth.acc opts.LeafNode.Users = auth.users // Validate user info config for leafnode authorization if err := validateLeafNodeAuthOptions(opts); err != nil { *errors = append(*errors, &configErr{tk, err.Error()}) continue } case "remotes": // Parse the remote options here. remotes, err := parseRemoteLeafNodes(tk, errors, warnings) if err != nil { *errors = append(*errors, err) continue } opts.LeafNode.Remotes = remotes case "reconnect", "reconnect_delay", "reconnect_interval": opts.LeafNode.ReconnectInterval = time.Duration(int(mv.(int64))) * time.Second case "tls": tc, err := parseTLS(tk, true) if err != nil { *errors = append(*errors, err) continue } if opts.LeafNode.TLSConfig, err = GenTLSConfig(tc); err != nil { err := &configErr{tk, err.Error()} *errors = append(*errors, err) continue } opts.LeafNode.TLSTimeout = tc.Timeout case "leafnode_advertise", "advertise": opts.LeafNode.Advertise = mv.(string) case "no_advertise": opts.LeafNode.NoAdvertise = mv.(bool) trackExplicitVal(opts, &opts.inConfig, "LeafNode.NoAdvertise", opts.LeafNode.NoAdvertise) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: mk, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) continue } } } return nil } // This is the authorization parser adapter for the leafnode's // authorization config. func parseLeafAuthorization(v interface{}, errors *[]error, warnings *[]error) (*authorization, error) { var ( am map[string]interface{} tk token lt token auth = &authorization{} ) defer convertPanicToErrorList(<, errors) _, v = unwrapValue(v, <) am = v.(map[string]interface{}) for mk, mv := range am { tk, mv = unwrapValue(mv, <) switch strings.ToLower(mk) { case "user", "username": auth.user = mv.(string) case "pass", "password": auth.pass = mv.(string) case "timeout": at := float64(1) switch mv := mv.(type) { case int64: at = float64(mv) case float64: at = mv } auth.timeout = at case "users": users, err := parseLeafUsers(tk, errors, warnings) if err != nil { *errors = append(*errors, err) continue } auth.users = users case "account": auth.acc = mv.(string) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: mk, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) } continue } } return auth, nil } // This is a trimmed down version of parseUsers that is adapted // for the users possibly defined in the authorization{} section // of leafnodes {}. func parseLeafUsers(mv interface{}, errors *[]error, warnings *[]error) ([]*User, error) { var ( tk token lt token users = []*User{} ) defer convertPanicToErrorList(<, errors) tk, mv = unwrapValue(mv, <) // Make sure we have an array uv, ok := mv.([]interface{}) if !ok { return nil, &configErr{tk, fmt.Sprintf("Expected users field to be an array, got %v", mv)} } for _, u := range uv { tk, u = unwrapValue(u, <) // Check its a map/struct um, ok := u.(map[string]interface{}) if !ok { err := &configErr{tk, fmt.Sprintf("Expected user entry to be a map/struct, got %v", u)} *errors = append(*errors, err) continue } user := &User{} for k, v := range um { tk, v = unwrapValue(v, <) switch strings.ToLower(k) { case "user", "username": user.Username = v.(string) case "pass", "password": user.Password = v.(string) case "account": // We really want to save just the account name here, but // the User object is *Account. So we create an account object // but it won't be registered anywhere. The server will just // use opts.LeafNode.Users[].Account.Name. Alternatively // we need to create internal objects to store u/p and account // name and have a server structure to hold that. user.Account = NewAccount(v.(string)) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: k, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) continue } } } users = append(users, user) } return users, nil } func parseRemoteLeafNodes(v interface{}, errors *[]error, warnings *[]error) ([]*RemoteLeafOpts, error) { var lt token defer convertPanicToErrorList(<, errors) tk, v := unwrapValue(v, <) ra, ok := v.([]interface{}) if !ok { return nil, &configErr{tk, fmt.Sprintf("Expected remotes field to be an array, got %T", v)} } remotes := make([]*RemoteLeafOpts, 0, len(ra)) for _, r := range ra { tk, r = unwrapValue(r, <) // Check its a map/struct rm, ok := r.(map[string]interface{}) if !ok { *errors = append(*errors, &configErr{tk, fmt.Sprintf("Expected remote leafnode entry to be a map/struct, got %v", r)}) continue } remote := &RemoteLeafOpts{} for k, v := range rm { tk, v = unwrapValue(v, <) switch strings.ToLower(k) { case "url", "urls": switch v := v.(type) { case []interface{}, []string: urls, errs := parseURLs(v.([]interface{}), "leafnode") if errs != nil { *errors = append(*errors, errs...) continue } remote.URLs = urls case string: url, err := parseURL(v, "leafnode") if err != nil { *errors = append(*errors, &configErr{tk, err.Error()}) continue } remote.URLs = append(remote.URLs, url) default: *errors = append(*errors, &configErr{tk, fmt.Sprintf("Expected remote leafnode url to be an array or string, got %v", v)}) continue } case "account", "local": remote.LocalAccount = v.(string) case "creds", "credentials": p, err := expandPath(v.(string)) if err != nil { *errors = append(*errors, &configErr{tk, err.Error()}) continue } remote.Credentials = p case "tls": tc, err := parseTLS(tk, true) if err != nil { *errors = append(*errors, err) continue } if remote.TLSConfig, err = GenTLSConfig(tc); err != nil { *errors = append(*errors, &configErr{tk, err.Error()}) continue } // If ca_file is defined, GenTLSConfig() sets TLSConfig.ClientCAs. // Set RootCAs since this tls.Config is used when soliciting // a connection (therefore behaves as a client). remote.TLSConfig.RootCAs = remote.TLSConfig.ClientCAs if tc.Timeout > 0 { remote.TLSTimeout = tc.Timeout } else { remote.TLSTimeout = float64(DEFAULT_LEAF_TLS_TIMEOUT) } case "hub": remote.Hub = v.(bool) case "deny_imports", "deny_import": subjects, err := parseSubjects(tk, errors, warnings) if err != nil { *errors = append(*errors, err) continue } remote.DenyImports = subjects case "deny_exports", "deny_export": subjects, err := parseSubjects(tk, errors, warnings) if err != nil { *errors = append(*errors, err) continue } remote.DenyExports = subjects case "ws_compress", "ws_compression", "websocket_compress", "websocket_compression": remote.Websocket.Compression = v.(bool) case "ws_no_masking", "websocket_no_masking": remote.Websocket.NoMasking = v.(bool) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: k, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) continue } } } remotes = append(remotes, remote) } return remotes, nil } // Parse TLS and returns a TLSConfig and TLSTimeout. // Used by cluster and gateway parsing. func getTLSConfig(tk token) (*tls.Config, *TLSConfigOpts, error) { tc, err := parseTLS(tk, false) if err != nil { return nil, nil, err } config, err := GenTLSConfig(tc) if err != nil { err := &configErr{tk, err.Error()} return nil, nil, err } // For clusters/gateways, we will force strict verification. We also act // as both client and server, so will mirror the rootCA to the // clientCA pool. config.ClientAuth = tls.RequireAndVerifyClientCert config.RootCAs = config.ClientCAs return config, tc, nil } func parseGateways(v interface{}, errors *[]error, warnings *[]error) ([]*RemoteGatewayOpts, error) { var lt token defer convertPanicToErrorList(<, errors) tk, v := unwrapValue(v, <) // Make sure we have an array ga, ok := v.([]interface{}) if !ok { return nil, &configErr{tk, fmt.Sprintf("Expected gateways field to be an array, got %T", v)} } gateways := []*RemoteGatewayOpts{} for _, g := range ga { tk, g = unwrapValue(g, <) // Check its a map/struct gm, ok := g.(map[string]interface{}) if !ok { *errors = append(*errors, &configErr{tk, fmt.Sprintf("Expected gateway entry to be a map/struct, got %v", g)}) continue } gateway := &RemoteGatewayOpts{} for k, v := range gm { tk, v = unwrapValue(v, <) switch strings.ToLower(k) { case "name": gateway.Name = v.(string) case "tls": tls, tlsopts, err := getTLSConfig(tk) if err != nil { *errors = append(*errors, err) continue } gateway.TLSConfig = tls gateway.TLSTimeout = tlsopts.Timeout case "url": url, err := parseURL(v.(string), "gateway") if err != nil { *errors = append(*errors, &configErr{tk, err.Error()}) continue } gateway.URLs = append(gateway.URLs, url) case "urls": urls, errs := parseURLs(v.([]interface{}), "gateway") if errs != nil { *errors = append(*errors, errs...) continue } gateway.URLs = urls default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: k, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) continue } } } gateways = append(gateways, gateway) } return gateways, nil } // Sets cluster's permissions based on given pub/sub permissions, // doing the appropriate translation. func setClusterPermissions(opts *ClusterOpts, perms *Permissions) { // Import is whether or not we will send a SUB for interest to the other side. // Export is whether or not we will accept a SUB from the remote for a given subject. // Both only effect interest registration. // The parsing sets Import into Publish and Export into Subscribe, convert // accordingly. opts.Permissions = &RoutePermissions{ Import: perms.Publish, Export: perms.Subscribe, } } // Temp structures to hold account import and export defintions since they need // to be processed after being parsed. type export struct { acc *Account sub string accs []string rt ServiceRespType lat *serviceLatency rthr time.Duration tPos uint } type importStream struct { acc *Account an string sub string to string pre string } type importService struct { acc *Account an string sub string to string share bool } // Checks if an account name is reserved. func isReservedAccount(name string) bool { return name == globalAccountName } func parseAccountMapDest(v interface{}, tk token, errors *[]error, warnings *[]error) (*MapDest, *configErr) { // These should be maps. mv, ok := v.(map[string]interface{}) if !ok { err := &configErr{tk, "Expected an entry for the mapping destination"} *errors = append(*errors, err) return nil, err } mdest := &MapDest{} var lt token var sw bool for k, v := range mv { tk, dmv := unwrapValue(v, <) switch strings.ToLower(k) { case "dest", "destination": mdest.Subject = dmv.(string) case "weight": switch vv := dmv.(type) { case string: ws := vv if strings.HasSuffix(ws, "%") { ws = ws[:len(ws)-1] } weight, err := strconv.Atoi(ws) if err != nil { err := &configErr{tk, fmt.Sprintf("Invalid weight %q for mapping destination", ws)} *errors = append(*errors, err) return nil, err } if weight > 100 || weight < 0 { err := &configErr{tk, fmt.Sprintf("Invalid weight %d for mapping destination", weight)} *errors = append(*errors, err) return nil, err } mdest.Weight = uint8(weight) sw = true case int64: weight := vv if weight > 100 || weight < 0 { err := &configErr{tk, fmt.Sprintf("Invalid weight %d for mapping destination", weight)} *errors = append(*errors, err) return nil, err } mdest.Weight = uint8(weight) sw = true default: err := &configErr{tk, fmt.Sprintf("Unknown entry type for weight of %v\n", vv)} *errors = append(*errors, err) return nil, err } case "cluster": mdest.Cluster = dmv.(string) default: err := &configErr{tk, fmt.Sprintf("Unknown field %q for mapping destination", k)} *errors = append(*errors, err) return nil, err } } if !sw { err := &configErr{tk, fmt.Sprintf("Missing weight for mapping destination %q", mdest.Subject)} *errors = append(*errors, err) return nil, err } return mdest, nil } // parseAccountMappings is called to parse account mappings. func parseAccountMappings(v interface{}, acc *Account, errors *[]error, warnings *[]error) error { var lt token defer convertPanicToErrorList(<, errors) tk, v := unwrapValue(v, <) am := v.(map[string]interface{}) for subj, mv := range am { if !IsValidSubject(subj) { err := &configErr{tk, fmt.Sprintf("Subject %q is not a valid subject", subj)} *errors = append(*errors, err) continue } tk, v := unwrapValue(mv, <) switch vv := v.(type) { case string: if err := acc.AddMapping(subj, v.(string)); err != nil { err := &configErr{tk, fmt.Sprintf("Error adding mapping for %q: %v", subj, err)} *errors = append(*errors, err) continue } case []interface{}: var mappings []*MapDest for _, mv := range v.([]interface{}) { tk, amv := unwrapValue(mv, <) mdest, err := parseAccountMapDest(amv, tk, errors, warnings) if err != nil { continue } mappings = append(mappings, mdest) } // Now add them in.. if err := acc.AddWeightedMappings(subj, mappings...); err != nil { err := &configErr{tk, fmt.Sprintf("Error adding mapping for %q: %v", subj, err)} *errors = append(*errors, err) continue } case interface{}: tk, amv := unwrapValue(mv, <) mdest, err := parseAccountMapDest(amv, tk, errors, warnings) if err != nil { continue } // Now add it in.. if err := acc.AddWeightedMappings(subj, mdest); err != nil { err := &configErr{tk, fmt.Sprintf("Error adding mapping for %q: %v", subj, err)} *errors = append(*errors, err) continue } default: err := &configErr{tk, fmt.Sprintf("Unknown type %T for mapping destination", vv)} *errors = append(*errors, err) continue } } return nil } // parseAccounts will parse the different accounts syntax. func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]error) error { var ( importStreams []*importStream importServices []*importService exportStreams []*export exportServices []*export lt token ) defer convertPanicToErrorList(<, errors) tk, v := unwrapValue(v, <) switch vv := v.(type) { // Simple array of account names. case []interface{}, []string: m := make(map[string]struct{}, len(v.([]interface{}))) for _, n := range v.([]interface{}) { tk, name := unwrapValue(n, <) ns := name.(string) // Check for reserved names. if isReservedAccount(ns) { err := &configErr{tk, fmt.Sprintf("%q is a Reserved Account", ns)} *errors = append(*errors, err) continue } if _, ok := m[ns]; ok { err := &configErr{tk, fmt.Sprintf("Duplicate Account Entry: %s", ns)} *errors = append(*errors, err) continue } opts.Accounts = append(opts.Accounts, NewAccount(ns)) m[ns] = struct{}{} } // More common map entry case map[string]interface{}: // Track users across accounts, must be unique across // accounts and nkeys vs users. uorn := make(map[string]struct{}) for aname, mv := range vv { tk, amv := unwrapValue(mv, <) // Skip referenced config vars within the account block. if tk.IsUsedVariable() { continue } // These should be maps. mv, ok := amv.(map[string]interface{}) if !ok { err := &configErr{tk, "Expected map entries for accounts"} *errors = append(*errors, err) continue } if isReservedAccount(aname) { err := &configErr{tk, fmt.Sprintf("%q is a Reserved Account", aname)} *errors = append(*errors, err) continue } var ( users []*User nkeyUsr []*NkeyUser usersTk token ) acc := NewAccount(aname) opts.Accounts = append(opts.Accounts, acc) for k, v := range mv { tk, mv := unwrapValue(v, <) switch strings.ToLower(k) { case "nkey": nk, ok := mv.(string) if !ok || !nkeys.IsValidPublicAccountKey(nk) { err := &configErr{tk, fmt.Sprintf("Not a valid public nkey for an account: %q", mv)} *errors = append(*errors, err) continue } acc.Nkey = nk case "imports": streams, services, err := parseAccountImports(tk, acc, errors, warnings) if err != nil { *errors = append(*errors, err) continue } importStreams = append(importStreams, streams...) importServices = append(importServices, services...) case "exports": streams, services, err := parseAccountExports(tk, acc, errors, warnings) if err != nil { *errors = append(*errors, err) continue } exportStreams = append(exportStreams, streams...) exportServices = append(exportServices, services...) case "jetstream": err := parseJetStreamForAccount(mv, acc, errors, warnings) if err != nil { *errors = append(*errors, err) continue } case "users": var err error usersTk = tk nkeyUsr, users, err = parseUsers(mv, opts, errors, warnings) if err != nil { *errors = append(*errors, err) continue } case "default_permissions": permissions, err := parseUserPermissions(tk, errors, warnings) if err != nil { *errors = append(*errors, err) continue } acc.defaultPerms = permissions case "mappings", "maps": err := parseAccountMappings(tk, acc, errors, warnings) if err != nil { *errors = append(*errors, err) continue } default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: k, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) } } } applyDefaultPermissions(users, nkeyUsr, acc.defaultPerms) for _, u := range nkeyUsr { if _, ok := uorn[u.Nkey]; ok { err := &configErr{usersTk, fmt.Sprintf("Duplicate nkey %q detected", u.Nkey)} *errors = append(*errors, err) continue } uorn[u.Nkey] = struct{}{} u.Account = acc } opts.Nkeys = append(opts.Nkeys, nkeyUsr...) for _, u := range users { if _, ok := uorn[u.Username]; ok { err := &configErr{usersTk, fmt.Sprintf("Duplicate user %q detected", u.Username)} *errors = append(*errors, err) continue } uorn[u.Username] = struct{}{} u.Account = acc } opts.Users = append(opts.Users, users...) } } lt = tk // Bail already if there are previous errors. if len(*errors) > 0 { return nil } // Parse Imports and Exports here after all accounts defined. // Do exports first since they need to be defined for imports to succeed // since we do permissions checks. // Create a lookup map for accounts lookups. am := make(map[string]*Account, len(opts.Accounts)) for _, a := range opts.Accounts { am[a.Name] = a } // Do stream exports for _, stream := range exportStreams { // Make array of accounts if applicable. var accounts []*Account for _, an := range stream.accs { ta := am[an] if ta == nil { msg := fmt.Sprintf("%q account not defined for stream export", an) *errors = append(*errors, &configErr{tk, msg}) continue } accounts = append(accounts, ta) } if err := stream.acc.addStreamExportWithAccountPos(stream.sub, accounts, stream.tPos); err != nil { msg := fmt.Sprintf("Error adding stream export %q: %v", stream.sub, err) *errors = append(*errors, &configErr{tk, msg}) continue } } for _, service := range exportServices { // Make array of accounts if applicable. var accounts []*Account for _, an := range service.accs { ta := am[an] if ta == nil { msg := fmt.Sprintf("%q account not defined for service export", an) *errors = append(*errors, &configErr{tk, msg}) continue } accounts = append(accounts, ta) } if err := service.acc.addServiceExportWithResponseAndAccountPos(service.sub, service.rt, accounts, service.tPos); err != nil { msg := fmt.Sprintf("Error adding service export %q: %v", service.sub, err) *errors = append(*errors, &configErr{tk, msg}) continue } if service.rthr != 0 { // Response threshold was set in options. if err := service.acc.SetServiceExportResponseThreshold(service.sub, service.rthr); err != nil { msg := fmt.Sprintf("Error adding service export response threshold for %q: %v", service.sub, err) *errors = append(*errors, &configErr{tk, msg}) continue } } if service.lat != nil { // System accounts are on be default so just make sure we have not opted out.. if opts.NoSystemAccount { msg := fmt.Sprintf("Error adding service latency sampling for %q: %v", service.sub, ErrNoSysAccount.Error()) *errors = append(*errors, &configErr{tk, msg}) continue } if err := service.acc.TrackServiceExportWithSampling(service.sub, service.lat.subject, int(service.lat.sampling)); err != nil { msg := fmt.Sprintf("Error adding service latency sampling for %q on subject %q: %v", service.sub, service.lat.subject, err) *errors = append(*errors, &configErr{tk, msg}) continue } } } for _, stream := range importStreams { ta := am[stream.an] if ta == nil { msg := fmt.Sprintf("%q account not defined for stream import", stream.an) *errors = append(*errors, &configErr{tk, msg}) continue } if stream.pre != "" { if err := stream.acc.AddStreamImport(ta, stream.sub, stream.pre); err != nil { msg := fmt.Sprintf("Error adding stream import %q: %v", stream.sub, err) *errors = append(*errors, &configErr{tk, msg}) continue } } else { if err := stream.acc.AddMappedStreamImport(ta, stream.sub, stream.to); err != nil { msg := fmt.Sprintf("Error adding stream import %q: %v", stream.sub, err) *errors = append(*errors, &configErr{tk, msg}) continue } } } for _, service := range importServices { ta := am[service.an] if ta == nil { msg := fmt.Sprintf("%q account not defined for service import", service.an) *errors = append(*errors, &configErr{tk, msg}) continue } if service.to == "" { service.to = service.sub } if err := service.acc.AddServiceImport(ta, service.to, service.sub); err != nil { msg := fmt.Sprintf("Error adding service import %q: %v", service.sub, err) *errors = append(*errors, &configErr{tk, msg}) continue } if err := service.acc.SetServiceImportSharing(ta, service.sub, service.share); err != nil { msg := fmt.Sprintf("Error setting service import sharing %q: %v", service.sub, err) *errors = append(*errors, &configErr{tk, msg}) continue } } return nil } // Parse the account exports func parseAccountExports(v interface{}, acc *Account, errors, warnings *[]error) ([]*export, []*export, error) { var lt token defer convertPanicToErrorList(<, errors) // This should be an array of objects/maps. tk, v := unwrapValue(v, <) ims, ok := v.([]interface{}) if !ok { return nil, nil, &configErr{tk, fmt.Sprintf("Exports should be an array, got %T", v)} } var services []*export var streams []*export for _, v := range ims { // Should have stream or service stream, service, err := parseExportStreamOrService(v, errors, warnings) if err != nil { *errors = append(*errors, err) continue } if service != nil { service.acc = acc services = append(services, service) } if stream != nil { stream.acc = acc streams = append(streams, stream) } } return streams, services, nil } // Parse the account imports func parseAccountImports(v interface{}, acc *Account, errors, warnings *[]error) ([]*importStream, []*importService, error) { var lt token defer convertPanicToErrorList(<, errors) // This should be an array of objects/maps. tk, v := unwrapValue(v, <) ims, ok := v.([]interface{}) if !ok { return nil, nil, &configErr{tk, fmt.Sprintf("Imports should be an array, got %T", v)} } var services []*importService var streams []*importStream svcSubjects := map[string]*importService{} for _, v := range ims { // Should have stream or service stream, service, err := parseImportStreamOrService(v, errors, warnings) if err != nil { *errors = append(*errors, err) continue } if service != nil { if dup := svcSubjects[service.to]; dup != nil { tk, _ := unwrapValue(v, <) err := &configErr{tk, fmt.Sprintf("Duplicate service import subject %q, previously used in import for account %q, subject %q", service.to, dup.an, dup.sub)} *errors = append(*errors, err) continue } svcSubjects[service.to] = service service.acc = acc services = append(services, service) } if stream != nil { stream.acc = acc streams = append(streams, stream) } } return streams, services, nil } // Helper to parse an embedded account description for imported services or streams. func parseAccount(v map[string]interface{}, errors, warnings *[]error) (string, string, error) { var lt token defer convertPanicToErrorList(<, errors) var accountName, subject string for mk, mv := range v { tk, mv := unwrapValue(mv, <) switch strings.ToLower(mk) { case "account": accountName = mv.(string) case "subject": subject = mv.(string) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: mk, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) } } } return accountName, subject, nil } // Parse an export stream or service. // e.g. // {stream: "public.>"} # No accounts means public. // {stream: "synadia.private.>", accounts: [cncf, natsio]} // {service: "pub.request"} # No accounts means public. // {service: "pub.special.request", accounts: [nats.io]} func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*export, *export, error) { var ( curStream *export curService *export accounts []string rt ServiceRespType rtSeen bool rtToken token lat *serviceLatency threshSeen bool thresh time.Duration latToken token lt token accTokPos uint ) defer convertPanicToErrorList(<, errors) tk, v := unwrapValue(v, <) vv, ok := v.(map[string]interface{}) if !ok { return nil, nil, &configErr{tk, fmt.Sprintf("Export Items should be a map with type entry, got %T", v)} } for mk, mv := range vv { tk, mv := unwrapValue(mv, <) switch strings.ToLower(mk) { case "stream": if curService != nil { err := &configErr{tk, fmt.Sprintf("Detected stream %q but already saw a service", mv)} *errors = append(*errors, err) continue } if rtToken != nil { err := &configErr{rtToken, "Detected response directive on non-service"} *errors = append(*errors, err) continue } if latToken != nil { err := &configErr{latToken, "Detected latency directive on non-service"} *errors = append(*errors, err) continue } mvs, ok := mv.(string) if !ok { err := &configErr{tk, fmt.Sprintf("Expected stream name to be string, got %T", mv)} *errors = append(*errors, err) continue } curStream = &export{sub: mvs} if accounts != nil { curStream.accs = accounts } case "service": if curStream != nil { err := &configErr{tk, fmt.Sprintf("Detected service %q but already saw a stream", mv)} *errors = append(*errors, err) continue } mvs, ok := mv.(string) if !ok { err := &configErr{tk, fmt.Sprintf("Expected service name to be string, got %T", mv)} *errors = append(*errors, err) continue } curService = &export{sub: mvs} if accounts != nil { curService.accs = accounts } if rtSeen { curService.rt = rt } if lat != nil { curService.lat = lat } if threshSeen { curService.rthr = thresh } case "response", "response_type": if rtSeen { err := &configErr{tk, "Duplicate response type definition"} *errors = append(*errors, err) continue } rtSeen = true rtToken = tk mvs, ok := mv.(string) if !ok { err := &configErr{tk, fmt.Sprintf("Expected response type to be string, got %T", mv)} *errors = append(*errors, err) continue } switch strings.ToLower(mvs) { case "single", "singleton": rt = Singleton case "stream": rt = Streamed case "chunk", "chunked": rt = Chunked default: err := &configErr{tk, fmt.Sprintf("Unknown response type: %q", mvs)} *errors = append(*errors, err) continue } if curService != nil { curService.rt = rt } if curStream != nil { err := &configErr{tk, "Detected response directive on non-service"} *errors = append(*errors, err) } case "threshold", "response_threshold", "response_max_time", "response_time": if threshSeen { err := &configErr{tk, "Duplicate response threshold detected"} *errors = append(*errors, err) continue } threshSeen = true mvs, ok := mv.(string) if !ok { err := &configErr{tk, fmt.Sprintf("Expected response threshold to be a parseable time duration, got %T", mv)} *errors = append(*errors, err) continue } var err error thresh, err = time.ParseDuration(mvs) if err != nil { err := &configErr{tk, fmt.Sprintf("Expected response threshold to be a parseable time duration, got %q", mvs)} *errors = append(*errors, err) continue } if curService != nil { curService.rthr = thresh } if curStream != nil { err := &configErr{tk, "Detected response directive on non-service"} *errors = append(*errors, err) } case "accounts": for _, iv := range mv.([]interface{}) { _, mv := unwrapValue(iv, <) accounts = append(accounts, mv.(string)) } if curStream != nil { curStream.accs = accounts } else if curService != nil { curService.accs = accounts } case "latency": latToken = tk var err error lat, err = parseServiceLatency(tk, mv) if err != nil { *errors = append(*errors, err) continue } if curStream != nil { err = &configErr{tk, "Detected latency directive on non-service"} *errors = append(*errors, err) continue } if curService != nil { curService.lat = lat } case "account_token_position": accTokPos = uint(mv.(int64)) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: mk, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) } } } if curStream != nil { curStream.tPos = accTokPos } if curService != nil { curService.tPos = accTokPos } return curStream, curService, nil } // parseServiceLatency returns a latency config block. func parseServiceLatency(root token, v interface{}) (l *serviceLatency, retErr error) { var lt token defer convertPanicToError(<, &retErr) if subject, ok := v.(string); ok { return &serviceLatency{ subject: subject, sampling: DEFAULT_SERVICE_LATENCY_SAMPLING, }, nil } latency, ok := v.(map[string]interface{}) if !ok { return nil, &configErr{token: root, reason: fmt.Sprintf("Expected latency entry to be a map/struct or string, got %T", v)} } sl := serviceLatency{ sampling: DEFAULT_SERVICE_LATENCY_SAMPLING, } // Read sampling value. if v, ok := latency["sampling"]; ok { tk, v := unwrapValue(v, <) header := false var sample int64 switch vv := v.(type) { case int64: // Sample is an int, like 50. sample = vv case string: // Sample is a string, like "50%". if strings.ToLower(strings.TrimSpace(vv)) == "headers" { header = true sample = 0 break } s := strings.TrimSuffix(vv, "%") n, err := strconv.Atoi(s) if err != nil { return nil, &configErr{token: tk, reason: fmt.Sprintf("Failed to parse latency sample: %v", err)} } sample = int64(n) default: return nil, &configErr{token: tk, reason: fmt.Sprintf("Expected latency sample to be a string or map/struct, got %T", v)} } if !header { if sample < 1 || sample > 100 { return nil, &configErr{token: tk, reason: ErrBadSampling.Error()} } } sl.sampling = int8(sample) } // Read subject value. v, ok = latency["subject"] if !ok { return nil, &configErr{token: root, reason: "Latency subject required, but missing"} } tk, v := unwrapValue(v, <) subject, ok := v.(string) if !ok { return nil, &configErr{token: tk, reason: fmt.Sprintf("Expected latency subject to be a string, got %T", subject)} } sl.subject = subject return &sl, nil } // Parse an import stream or service. // e.g. // {stream: {account: "synadia", subject:"public.synadia"}, prefix: "imports.synadia"} // {stream: {account: "synadia", subject:"synadia.private.*"}} // {service: {account: "synadia", subject: "pub.special.request"}, to: "synadia.request"} func parseImportStreamOrService(v interface{}, errors, warnings *[]error) (*importStream, *importService, error) { var ( curStream *importStream curService *importService pre, to string share bool lt token ) defer convertPanicToErrorList(<, errors) tk, mv := unwrapValue(v, <) vv, ok := mv.(map[string]interface{}) if !ok { return nil, nil, &configErr{tk, fmt.Sprintf("Import Items should be a map with type entry, got %T", mv)} } for mk, mv := range vv { tk, mv := unwrapValue(mv, <) switch strings.ToLower(mk) { case "stream": if curService != nil { err := &configErr{tk, "Detected stream but already saw a service"} *errors = append(*errors, err) continue } ac, ok := mv.(map[string]interface{}) if !ok { err := &configErr{tk, fmt.Sprintf("Stream entry should be an account map, got %T", mv)} *errors = append(*errors, err) continue } // Make sure this is a map with account and subject accountName, subject, err := parseAccount(ac, errors, warnings) if err != nil { *errors = append(*errors, err) continue } if accountName == "" || subject == "" { err := &configErr{tk, "Expect an account name and a subject"} *errors = append(*errors, err) continue } curStream = &importStream{an: accountName, sub: subject} if to != "" { curStream.to = to } if pre != "" { curStream.pre = pre } case "service": if curStream != nil { err := &configErr{tk, "Detected service but already saw a stream"} *errors = append(*errors, err) continue } ac, ok := mv.(map[string]interface{}) if !ok { err := &configErr{tk, fmt.Sprintf("Service entry should be an account map, got %T", mv)} *errors = append(*errors, err) continue } // Make sure this is a map with account and subject accountName, subject, err := parseAccount(ac, errors, warnings) if err != nil { *errors = append(*errors, err) continue } if accountName == "" || subject == "" { err := &configErr{tk, "Expect an account name and a subject"} *errors = append(*errors, err) continue } curService = &importService{an: accountName, sub: subject} if to != "" { curService.to = to } else { curService.to = subject } curService.share = share case "prefix": pre = mv.(string) if curStream != nil { curStream.pre = pre } case "to": to = mv.(string) if curService != nil { curService.to = to } if curStream != nil { curStream.to = to if curStream.pre != "" { err := &configErr{tk, "Stream import can not have a 'prefix' and a 'to' property"} *errors = append(*errors, err) continue } } case "share": share = mv.(bool) if curService != nil { curService.share = share } default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: mk, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) } } } return curStream, curService, nil } // Apply permission defaults to users/nkeyuser that don't have their own. func applyDefaultPermissions(users []*User, nkeys []*NkeyUser, defaultP *Permissions) { if defaultP == nil { return } for _, user := range users { if user.Permissions == nil { user.Permissions = defaultP } } for _, user := range nkeys { if user.Permissions == nil { user.Permissions = defaultP } } } // Helper function to parse Authorization configs. func parseAuthorization(v interface{}, opts *Options, errors *[]error, warnings *[]error) (*authorization, error) { var ( am map[string]interface{} tk token lt token auth = &authorization{} ) defer convertPanicToErrorList(<, errors) _, v = unwrapValue(v, <) am = v.(map[string]interface{}) for mk, mv := range am { tk, mv = unwrapValue(mv, <) switch strings.ToLower(mk) { case "user", "username": auth.user = mv.(string) case "pass", "password": auth.pass = mv.(string) case "token": auth.token = mv.(string) case "timeout": at := float64(1) switch mv := mv.(type) { case int64: at = float64(mv) case float64: at = mv } auth.timeout = at case "users": nkeys, users, err := parseUsers(tk, opts, errors, warnings) if err != nil { *errors = append(*errors, err) continue } auth.users = users auth.nkeys = nkeys case "default_permission", "default_permissions", "permissions": permissions, err := parseUserPermissions(tk, errors, warnings) if err != nil { *errors = append(*errors, err) continue } auth.defaultPermissions = permissions default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: mk, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) } continue } applyDefaultPermissions(auth.users, auth.nkeys, auth.defaultPermissions) } return auth, nil } // Helper function to parse multiple users array with optional permissions. func parseUsers(mv interface{}, opts *Options, errors *[]error, warnings *[]error) ([]*NkeyUser, []*User, error) { var ( tk token lt token keys []*NkeyUser users = []*User{} ) defer convertPanicToErrorList(<, errors) tk, mv = unwrapValue(mv, <) // Make sure we have an array uv, ok := mv.([]interface{}) if !ok { return nil, nil, &configErr{tk, fmt.Sprintf("Expected users field to be an array, got %v", mv)} } for _, u := range uv { tk, u = unwrapValue(u, <) // Check its a map/struct um, ok := u.(map[string]interface{}) if !ok { err := &configErr{tk, fmt.Sprintf("Expected user entry to be a map/struct, got %v", u)} *errors = append(*errors, err) continue } var ( user = &User{} nkey = &NkeyUser{} perms *Permissions err error ) for k, v := range um { // Also needs to unwrap first tk, v = unwrapValue(v, <) switch strings.ToLower(k) { case "nkey": nkey.Nkey = v.(string) case "user", "username": user.Username = v.(string) case "pass", "password": user.Password = v.(string) case "permission", "permissions", "authorization": perms, err = parseUserPermissions(tk, errors, warnings) if err != nil { *errors = append(*errors, err) continue } case "allowed_connection_types", "connection_types", "clients": cts := parseAllowedConnectionTypes(tk, <, v, errors, warnings) nkey.AllowedConnectionTypes = cts user.AllowedConnectionTypes = cts default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: k, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) continue } } } // Place perms if we have them. if perms != nil { // nkey takes precedent. if nkey.Nkey != "" { nkey.Permissions = perms } else { user.Permissions = perms } } // Check to make sure we have at least an nkey or username defined. if nkey.Nkey == "" && user.Username == "" { return nil, nil, &configErr{tk, "User entry requires a user"} } else if nkey.Nkey != "" { // Make sure the nkey a proper public nkey for a user.. if !nkeys.IsValidPublicUserKey(nkey.Nkey) { return nil, nil, &configErr{tk, "Not a valid public nkey for a user"} } // If we have user or password defined here that is an error. if user.Username != "" || user.Password != "" { return nil, nil, &configErr{tk, "Nkey users do not take usernames or passwords"} } keys = append(keys, nkey) } else { users = append(users, user) } } return keys, users, nil } func parseAllowedConnectionTypes(tk token, lt *token, mv interface{}, errors *[]error, warnings *[]error) map[string]struct{} { cts, err := parseStringArray("allowed connection types", tk, lt, mv, errors, warnings) // If error, it has already been added to the `errors` array, simply return if err != nil { return nil } m, err := convertAllowedConnectionTypes(cts) if err != nil { *errors = append(*errors, &configErr{tk, err.Error()}) } return m } // Helper function to parse user/account permissions func parseUserPermissions(mv interface{}, errors, warnings *[]error) (*Permissions, error) { var ( tk token lt token p = &Permissions{} ) defer convertPanicToErrorList(<, errors) tk, mv = unwrapValue(mv, <) pm, ok := mv.(map[string]interface{}) if !ok { return nil, &configErr{tk, fmt.Sprintf("Expected permissions to be a map/struct, got %+v", mv)} } for k, v := range pm { tk, mv = unwrapValue(v, <) switch strings.ToLower(k) { // For routes: // Import is Publish // Export is Subscribe case "pub", "publish", "import": perms, err := parseVariablePermissions(mv, errors, warnings) if err != nil { *errors = append(*errors, err) continue } p.Publish = perms case "sub", "subscribe", "export": perms, err := parseVariablePermissions(mv, errors, warnings) if err != nil { *errors = append(*errors, err) continue } p.Subscribe = perms case "publish_allow_responses", "allow_responses": rp := &ResponsePermission{ MaxMsgs: DEFAULT_ALLOW_RESPONSE_MAX_MSGS, Expires: DEFAULT_ALLOW_RESPONSE_EXPIRATION, } // Try boolean first responses, ok := mv.(bool) if ok { if responses { p.Response = rp } } else { p.Response = parseAllowResponses(v, errors, warnings) } if p.Response != nil { if p.Publish == nil { p.Publish = &SubjectPermission{} } if p.Publish.Allow == nil { // We turn off the blanket allow statement. p.Publish.Allow = []string{} } } default: if !tk.IsUsedVariable() { err := &configErr{tk, fmt.Sprintf("Unknown field %q parsing permissions", k)} *errors = append(*errors, err) } } } return p, nil } // Top level parser for authorization configurations. func parseVariablePermissions(v interface{}, errors, warnings *[]error) (*SubjectPermission, error) { switch vv := v.(type) { case map[string]interface{}: // New style with allow and/or deny properties. return parseSubjectPermission(vv, errors, warnings) default: // Old style return parseOldPermissionStyle(v, errors, warnings) } } // Helper function to parse subject singletons and/or arrays func parseSubjects(v interface{}, errors, warnings *[]error) ([]string, error) { var lt token defer convertPanicToErrorList(<, errors) tk, v := unwrapValue(v, <) var subjects []string switch vv := v.(type) { case string: subjects = append(subjects, vv) case []string: subjects = vv case []interface{}: for _, i := range vv { tk, i := unwrapValue(i, <) subject, ok := i.(string) if !ok { return nil, &configErr{tk, "Subject in permissions array cannot be cast to string"} } subjects = append(subjects, subject) } default: return nil, &configErr{tk, fmt.Sprintf("Expected subject permissions to be a subject, or array of subjects, got %T", v)} } if err := checkSubjectArray(subjects); err != nil { return nil, &configErr{tk, err.Error()} } return subjects, nil } // Helper function to parse a ResponsePermission. func parseAllowResponses(v interface{}, errors, warnings *[]error) *ResponsePermission { var lt token defer convertPanicToErrorList(<, errors) tk, v := unwrapValue(v, <) // Check if this is a map. pm, ok := v.(map[string]interface{}) if !ok { err := &configErr{tk, "error parsing response permissions, expected a boolean or a map"} *errors = append(*errors, err) return nil } rp := &ResponsePermission{ MaxMsgs: DEFAULT_ALLOW_RESPONSE_MAX_MSGS, Expires: DEFAULT_ALLOW_RESPONSE_EXPIRATION, } for k, v := range pm { tk, v = unwrapValue(v, <) switch strings.ToLower(k) { case "max", "max_msgs", "max_messages", "max_responses": max := int(v.(int64)) // Negative values are accepted (mean infinite), and 0 // means default value (set above). if max != 0 { rp.MaxMsgs = max } case "expires", "expiration", "ttl": wd, ok := v.(string) if ok { ttl, err := time.ParseDuration(wd) if err != nil { err := &configErr{tk, fmt.Sprintf("error parsing expires: %v", err)} *errors = append(*errors, err) return nil } // Negative values are accepted (mean infinite), and 0 // means default value (set above). if ttl != 0 { rp.Expires = ttl } } else { err := &configErr{tk, "error parsing expires, not a duration string"} *errors = append(*errors, err) return nil } default: if !tk.IsUsedVariable() { err := &configErr{tk, fmt.Sprintf("Unknown field %q parsing permissions", k)} *errors = append(*errors, err) } } } return rp } // Helper function to parse old style authorization configs. func parseOldPermissionStyle(v interface{}, errors, warnings *[]error) (*SubjectPermission, error) { subjects, err := parseSubjects(v, errors, warnings) if err != nil { return nil, err } return &SubjectPermission{Allow: subjects}, nil } // Helper function to parse new style authorization into a SubjectPermission with Allow and Deny. func parseSubjectPermission(v interface{}, errors, warnings *[]error) (*SubjectPermission, error) { var lt token defer convertPanicToErrorList(<, errors) m := v.(map[string]interface{}) if len(m) == 0 { return nil, nil } p := &SubjectPermission{} for k, v := range m { tk, _ := unwrapValue(v, <) switch strings.ToLower(k) { case "allow": subjects, err := parseSubjects(tk, errors, warnings) if err != nil { *errors = append(*errors, err) continue } p.Allow = subjects case "deny": subjects, err := parseSubjects(tk, errors, warnings) if err != nil { *errors = append(*errors, err) continue } p.Deny = subjects default: if !tk.IsUsedVariable() { err := &configErr{tk, fmt.Sprintf("Unknown field name %q parsing subject permissions, only 'allow' or 'deny' are permitted", k)} *errors = append(*errors, err) } } } return p, nil } // Helper function to validate subjects, etc for account permissioning. func checkSubjectArray(sa []string) error { for _, s := range sa { if !IsValidSubject(s) { return fmt.Errorf("subject %q is not a valid subject", s) } } return nil } // PrintTLSHelpAndDie prints TLS usage and exits. func PrintTLSHelpAndDie() { fmt.Printf("%s", tlsUsage) for k := range cipherMap { fmt.Printf(" %s\n", k) } fmt.Printf("\nAvailable curve preferences include:\n") for k := range curvePreferenceMap { fmt.Printf(" %s\n", k) } os.Exit(0) } func parseCipher(cipherName string) (uint16, error) { cipher, exists := cipherMap[cipherName] if !exists { return 0, fmt.Errorf("unrecognized cipher %s", cipherName) } return cipher, nil } func parseCurvePreferences(curveName string) (tls.CurveID, error) { curve, exists := curvePreferenceMap[curveName] if !exists { return 0, fmt.Errorf("unrecognized curve preference %s", curveName) } return curve, nil } // Helper function to parse TLS configs. func parseTLS(v interface{}, isClientCtx bool) (t *TLSConfigOpts, retErr error) { var ( tlsm map[string]interface{} tc = TLSConfigOpts{} lt token ) defer convertPanicToError(<, &retErr) _, v = unwrapValue(v, <) tlsm = v.(map[string]interface{}) for mk, mv := range tlsm { tk, mv := unwrapValue(mv, <) switch strings.ToLower(mk) { case "cert_file": certFile, ok := mv.(string) if !ok { return nil, &configErr{tk, "error parsing tls config, expected 'cert_file' to be filename"} } tc.CertFile = certFile case "key_file": keyFile, ok := mv.(string) if !ok { return nil, &configErr{tk, "error parsing tls config, expected 'key_file' to be filename"} } tc.KeyFile = keyFile case "ca_file": caFile, ok := mv.(string) if !ok { return nil, &configErr{tk, "error parsing tls config, expected 'ca_file' to be filename"} } tc.CaFile = caFile case "insecure": insecure, ok := mv.(bool) if !ok { return nil, &configErr{tk, "error parsing tls config, expected 'insecure' to be a boolean"} } tc.Insecure = insecure case "verify": verify, ok := mv.(bool) if !ok { return nil, &configErr{tk, "error parsing tls config, expected 'verify' to be a boolean"} } tc.Verify = verify case "verify_and_map": verify, ok := mv.(bool) if !ok { return nil, &configErr{tk, "error parsing tls config, expected 'verify_and_map' to be a boolean"} } if verify { tc.Verify = verify } tc.Map = verify case "verify_cert_and_check_known_urls": verify, ok := mv.(bool) if !ok { return nil, &configErr{tk, "error parsing tls config, expected 'verify_cert_and_check_known_urls' to be a boolean"} } if verify && isClientCtx { return nil, &configErr{tk, "verify_cert_and_check_known_urls not supported in this context"} } if verify { tc.Verify = verify } tc.TLSCheckKnownURLs = verify case "cipher_suites": ra := mv.([]interface{}) if len(ra) == 0 { return nil, &configErr{tk, "error parsing tls config, 'cipher_suites' cannot be empty"} } tc.Ciphers = make([]uint16, 0, len(ra)) for _, r := range ra { tk, r := unwrapValue(r, <) cipher, err := parseCipher(r.(string)) if err != nil { return nil, &configErr{tk, err.Error()} } tc.Ciphers = append(tc.Ciphers, cipher) } case "curve_preferences": ra := mv.([]interface{}) if len(ra) == 0 { return nil, &configErr{tk, "error parsing tls config, 'curve_preferences' cannot be empty"} } tc.CurvePreferences = make([]tls.CurveID, 0, len(ra)) for _, r := range ra { tk, r := unwrapValue(r, <) cps, err := parseCurvePreferences(r.(string)) if err != nil { return nil, &configErr{tk, err.Error()} } tc.CurvePreferences = append(tc.CurvePreferences, cps) } case "timeout": at := float64(0) switch mv := mv.(type) { case int64: at = float64(mv) case float64: at = mv } tc.Timeout = at default: return nil, &configErr{tk, fmt.Sprintf("error parsing tls config, unknown field [%q]", mk)} } } // If cipher suites were not specified then use the defaults if tc.Ciphers == nil { tc.Ciphers = defaultCipherSuites() } // If curve preferences were not specified, then use the defaults if tc.CurvePreferences == nil { tc.CurvePreferences = defaultCurvePreferences() } return &tc, nil } func parseSimpleAuth(v interface{}, errors *[]error, warnings *[]error) *authorization { var ( am map[string]interface{} tk token lt token auth = &authorization{} ) defer convertPanicToErrorList(<, errors) _, v = unwrapValue(v, <) am = v.(map[string]interface{}) for mk, mv := range am { tk, mv = unwrapValue(mv, <) switch strings.ToLower(mk) { case "user", "username": auth.user = mv.(string) case "pass", "password": auth.pass = mv.(string) case "token": auth.token = mv.(string) case "timeout": at := float64(1) switch mv := mv.(type) { case int64: at = float64(mv) case float64: at = mv } auth.timeout = at default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: mk, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) } continue } } return auth } func parseStringArray(fieldName string, tk token, lt *token, mv interface{}, errors *[]error, warnings *[]error) ([]string, error) { switch mv := mv.(type) { case string: return []string{mv}, nil case []interface{}: strs := make([]string, 0, len(mv)) for _, val := range mv { tk, val = unwrapValue(val, lt) if str, ok := val.(string); ok { strs = append(strs, str) } else { err := &configErr{tk, fmt.Sprintf("error parsing %s: unsupported type in array %T", fieldName, val)} *errors = append(*errors, err) continue } } return strs, nil default: err := &configErr{tk, fmt.Sprintf("error parsing %s: unsupported type %T", fieldName, mv)} *errors = append(*errors, err) return nil, err } } func parseWebsocket(v interface{}, o *Options, errors *[]error, warnings *[]error) error { var lt token defer convertPanicToErrorList(<, errors) tk, v := unwrapValue(v, <) gm, ok := v.(map[string]interface{}) if !ok { return &configErr{tk, fmt.Sprintf("Expected websocket to be a map, got %T", v)} } for mk, mv := range gm { // Again, unwrap token value if line check is required. tk, mv = unwrapValue(mv, <) switch strings.ToLower(mk) { case "listen": hp, err := parseListen(mv) if err != nil { err := &configErr{tk, err.Error()} *errors = append(*errors, err) continue } o.Websocket.Host = hp.host o.Websocket.Port = hp.port case "port": o.Websocket.Port = int(mv.(int64)) case "host", "net": o.Websocket.Host = mv.(string) case "advertise": o.Websocket.Advertise = mv.(string) case "no_tls": o.Websocket.NoTLS = mv.(bool) case "tls": tc, err := parseTLS(tk, true) if err != nil { *errors = append(*errors, err) continue } if o.Websocket.TLSConfig, err = GenTLSConfig(tc); err != nil { err := &configErr{tk, err.Error()} *errors = append(*errors, err) continue } o.Websocket.TLSMap = tc.Map case "same_origin": o.Websocket.SameOrigin = mv.(bool) case "allowed_origins", "allowed_origin", "allow_origins", "allow_origin", "origins", "origin": o.Websocket.AllowedOrigins, _ = parseStringArray("allowed origins", tk, <, mv, errors, warnings) case "handshake_timeout": ht := time.Duration(0) switch mv := mv.(type) { case int64: ht = time.Duration(mv) * time.Second case string: var err error ht, err = time.ParseDuration(mv) if err != nil { err := &configErr{tk, err.Error()} *errors = append(*errors, err) continue } default: err := &configErr{tk, fmt.Sprintf("error parsing handshake timeout: unsupported type %T", mv)} *errors = append(*errors, err) } o.Websocket.HandshakeTimeout = ht case "compress", "compression": o.Websocket.Compression = mv.(bool) case "authorization", "authentication": auth := parseSimpleAuth(tk, errors, warnings) o.Websocket.Username = auth.user o.Websocket.Password = auth.pass o.Websocket.Token = auth.token o.Websocket.AuthTimeout = auth.timeout case "jwt_cookie": o.Websocket.JWTCookie = mv.(string) case "no_auth_user": o.Websocket.NoAuthUser = mv.(string) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: mk, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) continue } } } return nil } func parseMQTT(v interface{}, o *Options, errors *[]error, warnings *[]error) error { var lt token defer convertPanicToErrorList(<, errors) tk, v := unwrapValue(v, <) gm, ok := v.(map[string]interface{}) if !ok { return &configErr{tk, fmt.Sprintf("Expected mqtt to be a map, got %T", v)} } for mk, mv := range gm { // Again, unwrap token value if line check is required. tk, mv = unwrapValue(mv, <) switch strings.ToLower(mk) { case "listen": hp, err := parseListen(mv) if err != nil { err := &configErr{tk, err.Error()} *errors = append(*errors, err) continue } o.MQTT.Host = hp.host o.MQTT.Port = hp.port case "port": o.MQTT.Port = int(mv.(int64)) case "host", "net": o.MQTT.Host = mv.(string) case "tls": tc, err := parseTLS(tk, true) if err != nil { *errors = append(*errors, err) continue } if o.MQTT.TLSConfig, err = GenTLSConfig(tc); err != nil { err := &configErr{tk, err.Error()} *errors = append(*errors, err) continue } o.MQTT.TLSTimeout = tc.Timeout o.MQTT.TLSMap = tc.Map case "authorization", "authentication": auth := parseSimpleAuth(tk, errors, warnings) o.MQTT.Username = auth.user o.MQTT.Password = auth.pass o.MQTT.Token = auth.token o.MQTT.AuthTimeout = auth.timeout case "no_auth_user": o.MQTT.NoAuthUser = mv.(string) case "ack_wait", "ackwait": o.MQTT.AckWait = parseDuration("ack_wait", tk, mv, errors, warnings) case "max_ack_pending", "max_pending", "max_inflight": tmp := int(mv.(int64)) if tmp < 0 || tmp > 0xFFFF { err := &configErr{tk, fmt.Sprintf("invalid value %v, should in [0..%d] range", tmp, 0xFFFF)} *errors = append(*errors, err) } else { o.MQTT.MaxAckPending = uint16(tmp) } default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ field: mk, configErr: configErr{ token: tk, }, } *errors = append(*errors, err) continue } } } return nil } // GenTLSConfig loads TLS related configuration parameters. func GenTLSConfig(tc *TLSConfigOpts) (*tls.Config, error) { // Create the tls.Config from our options before including the certs. // It will determine the cipher suites that we prefer. // FIXME(dlc) change if ARM based. config := tls.Config{ MinVersion: tls.VersionTLS12, CipherSuites: tc.Ciphers, PreferServerCipherSuites: true, CurvePreferences: tc.CurvePreferences, InsecureSkipVerify: tc.Insecure, } switch { case tc.CertFile != "" && tc.KeyFile == "": return nil, fmt.Errorf("missing 'key_file' in TLS configuration") case tc.CertFile == "" && tc.KeyFile != "": return nil, fmt.Errorf("missing 'cert_file' in TLS configuration") case tc.CertFile != "" && tc.KeyFile != "": // Now load in cert and private key cert, err := tls.LoadX509KeyPair(tc.CertFile, tc.KeyFile) if err != nil { return nil, fmt.Errorf("error parsing X509 certificate/key pair: %v", err) } cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0]) if err != nil { return nil, fmt.Errorf("error parsing certificate: %v", err) } config.Certificates = []tls.Certificate{cert} } // Require client certificates as needed if tc.Verify { config.ClientAuth = tls.RequireAndVerifyClientCert } // Add in CAs if applicable. if tc.CaFile != "" { rootPEM, err := ioutil.ReadFile(tc.CaFile) if err != nil || rootPEM == nil { return nil, err } pool := x509.NewCertPool() ok := pool.AppendCertsFromPEM(rootPEM) if !ok { return nil, fmt.Errorf("failed to parse root ca certificate") } config.ClientCAs = pool } return &config, nil } // MergeOptions will merge two options giving preference to the flagOpts // if the item is present. func MergeOptions(fileOpts, flagOpts *Options) *Options { if fileOpts == nil { return flagOpts } if flagOpts == nil { return fileOpts } // Merge the two, flagOpts override opts := *fileOpts if flagOpts.Port != 0 { opts.Port = flagOpts.Port } if flagOpts.Host != "" { opts.Host = flagOpts.Host } if flagOpts.ClientAdvertise != "" { opts.ClientAdvertise = flagOpts.ClientAdvertise } if flagOpts.Username != "" { opts.Username = flagOpts.Username } if flagOpts.Password != "" { opts.Password = flagOpts.Password } if flagOpts.Authorization != "" { opts.Authorization = flagOpts.Authorization } if flagOpts.HTTPPort != 0 { opts.HTTPPort = flagOpts.HTTPPort } if flagOpts.HTTPBasePath != "" { opts.HTTPBasePath = flagOpts.HTTPBasePath } if flagOpts.Debug { opts.Debug = true } if flagOpts.Trace { opts.Trace = true } if flagOpts.Logtime { opts.Logtime = true } if flagOpts.LogFile != "" { opts.LogFile = flagOpts.LogFile } if flagOpts.PidFile != "" { opts.PidFile = flagOpts.PidFile } if flagOpts.PortsFileDir != "" { opts.PortsFileDir = flagOpts.PortsFileDir } if flagOpts.ProfPort != 0 { opts.ProfPort = flagOpts.ProfPort } if flagOpts.Cluster.ListenStr != "" { opts.Cluster.ListenStr = flagOpts.Cluster.ListenStr } if flagOpts.Cluster.NoAdvertise { opts.Cluster.NoAdvertise = true } if flagOpts.Cluster.ConnectRetries != 0 { opts.Cluster.ConnectRetries = flagOpts.Cluster.ConnectRetries } if flagOpts.Cluster.Advertise != "" { opts.Cluster.Advertise = flagOpts.Cluster.Advertise } if flagOpts.RoutesStr != "" { mergeRoutes(&opts, flagOpts) } return &opts } // RoutesFromStr parses route URLs from a string func RoutesFromStr(routesStr string) []*url.URL { routes := strings.Split(routesStr, ",") if len(routes) == 0 { return nil } routeUrls := []*url.URL{} for _, r := range routes { r = strings.TrimSpace(r) u, _ := url.Parse(r) routeUrls = append(routeUrls, u) } return routeUrls } // This will merge the flag routes and override anything that was present. func mergeRoutes(opts, flagOpts *Options) { routeUrls := RoutesFromStr(flagOpts.RoutesStr) if routeUrls == nil { return } opts.Routes = routeUrls opts.RoutesStr = flagOpts.RoutesStr } // RemoveSelfReference removes this server from an array of routes func RemoveSelfReference(clusterPort int, routes []*url.URL) ([]*url.URL, error) { var cleanRoutes []*url.URL cport := strconv.Itoa(clusterPort) selfIPs, err := getInterfaceIPs() if err != nil { return nil, err } for _, r := range routes { host, port, err := net.SplitHostPort(r.Host) if err != nil { return nil, err } ipList, err := getURLIP(host) if err != nil { return nil, err } if cport == port && isIPInList(selfIPs, ipList) { continue } cleanRoutes = append(cleanRoutes, r) } return cleanRoutes, nil } func isIPInList(list1 []net.IP, list2 []net.IP) bool { for _, ip1 := range list1 { for _, ip2 := range list2 { if ip1.Equal(ip2) { return true } } } return false } func getURLIP(ipStr string) ([]net.IP, error) { ipList := []net.IP{} ip := net.ParseIP(ipStr) if ip != nil { ipList = append(ipList, ip) return ipList, nil } hostAddr, err := net.LookupHost(ipStr) if err != nil { return nil, fmt.Errorf("Error looking up host with route hostname: %v", err) } for _, addr := range hostAddr { ip = net.ParseIP(addr) if ip != nil { ipList = append(ipList, ip) } } return ipList, nil } func getInterfaceIPs() ([]net.IP, error) { var localIPs []net.IP interfaceAddr, err := net.InterfaceAddrs() if err != nil { return nil, fmt.Errorf("Error getting self referencing address: %v", err) } for i := 0; i < len(interfaceAddr); i++ { interfaceIP, _, _ := net.ParseCIDR(interfaceAddr[i].String()) if net.ParseIP(interfaceIP.String()) != nil { localIPs = append(localIPs, interfaceIP) } else { return nil, fmt.Errorf("Error parsing self referencing address: %v", err) } } return localIPs, nil } func setBaselineOptions(opts *Options) { // Setup non-standard Go defaults if opts.Host == "" { opts.Host = DEFAULT_HOST } if opts.HTTPHost == "" { // Default to same bind from server if left undefined opts.HTTPHost = opts.Host } if opts.Port == 0 { opts.Port = DEFAULT_PORT } else if opts.Port == RANDOM_PORT { // Choose randomly inside of net.Listen opts.Port = 0 } if opts.MaxConn == 0 { opts.MaxConn = DEFAULT_MAX_CONNECTIONS } if opts.PingInterval == 0 { opts.PingInterval = DEFAULT_PING_INTERVAL } if opts.MaxPingsOut == 0 { opts.MaxPingsOut = DEFAULT_PING_MAX_OUT } if opts.TLSTimeout == 0 { opts.TLSTimeout = float64(TLS_TIMEOUT) / float64(time.Second) } if opts.AuthTimeout == 0 { opts.AuthTimeout = getDefaultAuthTimeout(opts.TLSConfig, opts.TLSTimeout) } if opts.Cluster.Port != 0 { if opts.Cluster.Host == "" { opts.Cluster.Host = DEFAULT_HOST } if opts.Cluster.TLSTimeout == 0 { opts.Cluster.TLSTimeout = float64(TLS_TIMEOUT) / float64(time.Second) } if opts.Cluster.AuthTimeout == 0 { opts.Cluster.AuthTimeout = getDefaultAuthTimeout(opts.Cluster.TLSConfig, opts.Cluster.TLSTimeout) } } if opts.LeafNode.Port != 0 { if opts.LeafNode.Host == "" { opts.LeafNode.Host = DEFAULT_HOST } if opts.LeafNode.TLSTimeout == 0 { opts.LeafNode.TLSTimeout = float64(TLS_TIMEOUT) / float64(time.Second) } if opts.LeafNode.AuthTimeout == 0 { opts.LeafNode.AuthTimeout = getDefaultAuthTimeout(opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout) } } // Set baseline connect port for remotes. for _, r := range opts.LeafNode.Remotes { if r != nil { for _, u := range r.URLs { if u.Port() == "" { u.Host = net.JoinHostPort(u.Host, strconv.Itoa(DEFAULT_LEAFNODE_PORT)) } } } } // Set this regardless of opts.LeafNode.Port if opts.LeafNode.ReconnectInterval == 0 { opts.LeafNode.ReconnectInterval = DEFAULT_LEAF_NODE_RECONNECT } if opts.MaxControlLine == 0 { opts.MaxControlLine = MAX_CONTROL_LINE_SIZE } if opts.MaxPayload == 0 { opts.MaxPayload = MAX_PAYLOAD_SIZE } if opts.MaxPending == 0 { opts.MaxPending = MAX_PENDING_SIZE } if opts.WriteDeadline == time.Duration(0) { opts.WriteDeadline = DEFAULT_FLUSH_DEADLINE } if opts.MaxClosedClients == 0 { opts.MaxClosedClients = DEFAULT_MAX_CLOSED_CLIENTS } if opts.LameDuckDuration == 0 { opts.LameDuckDuration = DEFAULT_LAME_DUCK_DURATION } if opts.LameDuckGracePeriod == 0 { opts.LameDuckGracePeriod = DEFAULT_LAME_DUCK_GRACE_PERIOD } if opts.Gateway.Port != 0 { if opts.Gateway.Host == "" { opts.Gateway.Host = DEFAULT_HOST } if opts.Gateway.TLSTimeout == 0 { opts.Gateway.TLSTimeout = float64(TLS_TIMEOUT) / float64(time.Second) } if opts.Gateway.AuthTimeout == 0 { opts.Gateway.AuthTimeout = getDefaultAuthTimeout(opts.Gateway.TLSConfig, opts.Gateway.TLSTimeout) } } if opts.ConnectErrorReports == 0 { opts.ConnectErrorReports = DEFAULT_CONNECT_ERROR_REPORTS } if opts.ReconnectErrorReports == 0 { opts.ReconnectErrorReports = DEFAULT_RECONNECT_ERROR_REPORTS } if opts.Websocket.Port != 0 { if opts.Websocket.Host == "" { opts.Websocket.Host = DEFAULT_HOST } } if opts.MQTT.Port != 0 { if opts.MQTT.Host == "" { opts.MQTT.Host = DEFAULT_HOST } if opts.MQTT.TLSTimeout == 0 { opts.MQTT.TLSTimeout = float64(TLS_TIMEOUT) / float64(time.Second) } } // JetStream if opts.JetStreamMaxMemory == 0 { opts.JetStreamMaxMemory = -1 } if opts.JetStreamMaxStore == 0 { opts.JetStreamMaxStore = -1 } } func getDefaultAuthTimeout(tls *tls.Config, tlsTimeout float64) float64 { var authTimeout float64 if tls != nil { authTimeout = tlsTimeout + 1.0 } else { authTimeout = float64(AUTH_TIMEOUT / time.Second) } return authTimeout } // ConfigureOptions accepts a flag set and augments it with NATS Server // specific flags. On success, an options structure is returned configured // based on the selected flags and/or configuration file. // The command line options take precedence to the ones in the configuration file. func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, printTLSHelp func()) (*Options, error) { opts := &Options{} var ( showVersion bool showHelp bool showTLSHelp bool signal string configFile string dbgAndTrace bool trcAndVerboseTrc bool dbgAndTrcAndVerboseTrc bool err error ) fs.BoolVar(&showHelp, "h", false, "Show this message.") fs.BoolVar(&showHelp, "help", false, "Show this message.") fs.IntVar(&opts.Port, "port", 0, "Port to listen on.") fs.IntVar(&opts.Port, "p", 0, "Port to listen on.") fs.StringVar(&opts.ServerName, "n", "", "Server name.") fs.StringVar(&opts.ServerName, "name", "", "Server name.") fs.StringVar(&opts.ServerName, "server_name", "", "Server name.") fs.StringVar(&opts.Host, "addr", "", "Network host to listen on.") fs.StringVar(&opts.Host, "a", "", "Network host to listen on.") fs.StringVar(&opts.Host, "net", "", "Network host to listen on.") fs.StringVar(&opts.ClientAdvertise, "client_advertise", "", "Client URL to advertise to other servers.") fs.BoolVar(&opts.Debug, "D", false, "Enable Debug logging.") fs.BoolVar(&opts.Debug, "debug", false, "Enable Debug logging.") fs.BoolVar(&opts.Trace, "V", false, "Enable Trace logging.") fs.BoolVar(&trcAndVerboseTrc, "VV", false, "Enable Verbose Trace logging. (Traces system account as well)") fs.BoolVar(&opts.Trace, "trace", false, "Enable Trace logging.") fs.BoolVar(&dbgAndTrace, "DV", false, "Enable Debug and Trace logging.") fs.BoolVar(&dbgAndTrcAndVerboseTrc, "DVV", false, "Enable Debug and Verbose Trace logging. (Traces system account as well)") fs.BoolVar(&opts.Logtime, "T", true, "Timestamp log entries.") fs.BoolVar(&opts.Logtime, "logtime", true, "Timestamp log entries.") fs.StringVar(&opts.Username, "user", "", "Username required for connection.") fs.StringVar(&opts.Password, "pass", "", "Password required for connection.") fs.StringVar(&opts.Authorization, "auth", "", "Authorization token required for connection.") fs.IntVar(&opts.HTTPPort, "m", 0, "HTTP Port for /varz, /connz endpoints.") fs.IntVar(&opts.HTTPPort, "http_port", 0, "HTTP Port for /varz, /connz endpoints.") fs.IntVar(&opts.HTTPSPort, "ms", 0, "HTTPS Port for /varz, /connz endpoints.") fs.IntVar(&opts.HTTPSPort, "https_port", 0, "HTTPS Port for /varz, /connz endpoints.") fs.StringVar(&configFile, "c", "", "Configuration file.") fs.StringVar(&configFile, "config", "", "Configuration file.") fs.BoolVar(&opts.CheckConfig, "t", false, "Check configuration and exit.") fs.StringVar(&signal, "sl", "", "Send signal to nats-server process (stop, quit, reopen, reload).") fs.StringVar(&signal, "signal", "", "Send signal to nats-server process (stop, quit, reopen, reload).") fs.StringVar(&opts.PidFile, "P", "", "File to store process pid.") fs.StringVar(&opts.PidFile, "pid", "", "File to store process pid.") fs.StringVar(&opts.PortsFileDir, "ports_file_dir", "", "Creates a ports file in the specified directory (_.ports).") fs.StringVar(&opts.LogFile, "l", "", "File to store logging output.") fs.StringVar(&opts.LogFile, "log", "", "File to store logging output.") fs.Int64Var(&opts.LogSizeLimit, "log_size_limit", 0, "Logfile size limit being auto-rotated") fs.BoolVar(&opts.Syslog, "s", false, "Enable syslog as log method.") fs.BoolVar(&opts.Syslog, "syslog", false, "Enable syslog as log method.") fs.StringVar(&opts.RemoteSyslog, "r", "", "Syslog server addr (udp://127.0.0.1:514).") fs.StringVar(&opts.RemoteSyslog, "remote_syslog", "", "Syslog server addr (udp://127.0.0.1:514).") fs.BoolVar(&showVersion, "version", false, "Print version information.") fs.BoolVar(&showVersion, "v", false, "Print version information.") fs.IntVar(&opts.ProfPort, "profile", 0, "Profiling HTTP port.") fs.StringVar(&opts.RoutesStr, "routes", "", "Routes to actively solicit a connection.") fs.StringVar(&opts.Cluster.ListenStr, "cluster", "", "Cluster url from which members can solicit routes.") fs.StringVar(&opts.Cluster.ListenStr, "cluster_listen", "", "Cluster url from which members can solicit routes.") fs.StringVar(&opts.Cluster.Advertise, "cluster_advertise", "", "Cluster URL to advertise to other servers.") fs.BoolVar(&opts.Cluster.NoAdvertise, "no_advertise", false, "Advertise known cluster IPs to clients.") fs.IntVar(&opts.Cluster.ConnectRetries, "connect_retries", 0, "For implicit routes, number of connect retries.") fs.StringVar(&opts.Cluster.Name, "cluster_name", "", "Cluster Name, if not set one will be dynamically generated.") fs.BoolVar(&showTLSHelp, "help_tls", false, "TLS help.") fs.BoolVar(&opts.TLS, "tls", false, "Enable TLS.") fs.BoolVar(&opts.TLSVerify, "tlsverify", false, "Enable TLS with client verification.") fs.StringVar(&opts.TLSCert, "tlscert", "", "Server certificate file.") fs.StringVar(&opts.TLSKey, "tlskey", "", "Private key for server certificate.") fs.StringVar(&opts.TLSCaCert, "tlscacert", "", "Client certificate CA for verification.") fs.IntVar(&opts.MaxTracedMsgLen, "max_traced_msg_len", 0, "Maximum printable length for traced messages. 0 for unlimited.") fs.BoolVar(&opts.JetStream, "js", false, "Enable JetStream.") fs.BoolVar(&opts.JetStream, "jetstream", false, "Enable JetStream.") fs.StringVar(&opts.StoreDir, "sd", "", "Storage directory.") fs.StringVar(&opts.StoreDir, "store_dir", "", "Storage directory.") // The flags definition above set "default" values to some of the options. // Calling Parse() here will override the default options with any value // specified from the command line. This is ok. We will then update the // options with the content of the configuration file (if present), and then, // call Parse() again to override the default+config with command line values. // Calling Parse() before processing config file is necessary since configFile // itself is a command line argument, and also Parse() is required in order // to know if user wants simply to show "help" or "version", etc... if err := fs.Parse(args); err != nil { return nil, err } if showVersion { printVersion() return nil, nil } if showHelp { printHelp() return nil, nil } if showTLSHelp { printTLSHelp() return nil, nil } // Process args looking for non-flag options, // 'version' and 'help' only for now showVersion, showHelp, err = ProcessCommandLineArgs(fs) if err != nil { return nil, err } else if showVersion { printVersion() return nil, nil } else if showHelp { printHelp() return nil, nil } // Snapshot flag options. FlagSnapshot = opts.Clone() // Keep track of the boolean flags that were explicitly set with their value. fs.Visit(func(f *flag.Flag) { switch f.Name { case "DVV": trackExplicitVal(FlagSnapshot, &FlagSnapshot.inCmdLine, "Debug", dbgAndTrcAndVerboseTrc) trackExplicitVal(FlagSnapshot, &FlagSnapshot.inCmdLine, "Trace", dbgAndTrcAndVerboseTrc) trackExplicitVal(FlagSnapshot, &FlagSnapshot.inCmdLine, "TraceVerbose", dbgAndTrcAndVerboseTrc) case "DV": trackExplicitVal(FlagSnapshot, &FlagSnapshot.inCmdLine, "Debug", dbgAndTrace) trackExplicitVal(FlagSnapshot, &FlagSnapshot.inCmdLine, "Trace", dbgAndTrace) case "D": fallthrough case "debug": trackExplicitVal(FlagSnapshot, &FlagSnapshot.inCmdLine, "Debug", FlagSnapshot.Debug) case "VV": trackExplicitVal(FlagSnapshot, &FlagSnapshot.inCmdLine, "Trace", trcAndVerboseTrc) trackExplicitVal(FlagSnapshot, &FlagSnapshot.inCmdLine, "TraceVerbose", trcAndVerboseTrc) case "V": fallthrough case "trace": trackExplicitVal(FlagSnapshot, &FlagSnapshot.inCmdLine, "Trace", FlagSnapshot.Trace) case "T": fallthrough case "logtime": trackExplicitVal(FlagSnapshot, &FlagSnapshot.inCmdLine, "Logtime", FlagSnapshot.Logtime) case "s": fallthrough case "syslog": trackExplicitVal(FlagSnapshot, &FlagSnapshot.inCmdLine, "Syslog", FlagSnapshot.Syslog) case "no_advertise": trackExplicitVal(FlagSnapshot, &FlagSnapshot.inCmdLine, "Cluster.NoAdvertise", FlagSnapshot.Cluster.NoAdvertise) } }) // Process signal control. if signal != "" { if err := processSignal(signal); err != nil { return nil, err } } // Parse config if given if configFile != "" { // This will update the options with values from the config file. err := opts.ProcessConfigFile(configFile) if err != nil { if opts.CheckConfig { return nil, err } if cerr, ok := err.(*processConfigErr); !ok || len(cerr.Errors()) != 0 { return nil, err } // If we get here we only have warnings and can still continue fmt.Fprint(os.Stderr, err) } else if opts.CheckConfig { // Report configuration file syntax test was successful and exit. return opts, nil } // Call this again to override config file options with options from command line. // Note: We don't need to check error here since if there was an error, it would // have been caught the first time this function was called (after setting up the // flags). fs.Parse(args) } else if opts.CheckConfig { return nil, fmt.Errorf("must specify [-c, --config] option to check configuration file syntax") } // Special handling of some flags var ( flagErr error tlsDisabled bool tlsOverride bool ) fs.Visit(func(f *flag.Flag) { // short-circuit if an error was encountered if flagErr != nil { return } if strings.HasPrefix(f.Name, "tls") { if f.Name == "tls" { if !opts.TLS { // User has specified "-tls=false", we need to disable TLS opts.TLSConfig = nil tlsDisabled = true tlsOverride = false return } tlsOverride = true } else if !tlsDisabled { tlsOverride = true } } else { switch f.Name { case "VV": opts.Trace, opts.TraceVerbose = trcAndVerboseTrc, trcAndVerboseTrc case "DVV": opts.Trace, opts.Debug, opts.TraceVerbose = dbgAndTrcAndVerboseTrc, dbgAndTrcAndVerboseTrc, dbgAndTrcAndVerboseTrc case "DV": // Check value to support -DV=false opts.Trace, opts.Debug = dbgAndTrace, dbgAndTrace case "cluster", "cluster_listen": // Override cluster config if explicitly set via flags. flagErr = overrideCluster(opts) case "routes": // Keep in mind that the flag has updated opts.RoutesStr at this point. if opts.RoutesStr == "" { // Set routes array to nil since routes string is empty opts.Routes = nil return } routeUrls := RoutesFromStr(opts.RoutesStr) opts.Routes = routeUrls } } }) if flagErr != nil { return nil, flagErr } // This will be true if some of the `-tls` params have been set and // `-tls=false` has not been set. if tlsOverride { if err := overrideTLS(opts); err != nil { return nil, err } } // If we don't have cluster defined in the configuration // file and no cluster listen string override, but we do // have a routes override, we need to report misconfiguration. if opts.RoutesStr != "" && opts.Cluster.ListenStr == "" && opts.Cluster.Host == "" && opts.Cluster.Port == 0 { return nil, errors.New("solicited routes require cluster capabilities, e.g. --cluster") } return opts, nil } func normalizeBasePath(p string) string { if len(p) == 0 { return "/" } // add leading slash if p[0] != '/' { p = "/" + p } return path.Clean(p) } // overrideTLS is called when at least "-tls=true" has been set. func overrideTLS(opts *Options) error { if opts.TLSCert == "" { return errors.New("TLS Server certificate must be present and valid") } if opts.TLSKey == "" { return errors.New("TLS Server private key must be present and valid") } tc := TLSConfigOpts{} tc.CertFile = opts.TLSCert tc.KeyFile = opts.TLSKey tc.CaFile = opts.TLSCaCert tc.Verify = opts.TLSVerify var err error opts.TLSConfig, err = GenTLSConfig(&tc) return err } // overrideCluster updates Options.Cluster if that flag "cluster" (or "cluster_listen") // has explicitly be set in the command line. If it is set to empty string, it will // clear the Cluster options. func overrideCluster(opts *Options) error { if opts.Cluster.ListenStr == "" { // This one is enough to disable clustering. opts.Cluster.Port = 0 return nil } // -1 will fail url.Parse, so if we have -1, change it to // 0, and then after parse, replace the port with -1 so we get // automatic port allocation wantsRandom := false if strings.HasSuffix(opts.Cluster.ListenStr, ":-1") { wantsRandom = true cls := fmt.Sprintf("%s:0", opts.Cluster.ListenStr[0:len(opts.Cluster.ListenStr)-3]) opts.Cluster.ListenStr = cls } clusterURL, err := url.Parse(opts.Cluster.ListenStr) if err != nil { return err } h, p, err := net.SplitHostPort(clusterURL.Host) if err != nil { return err } if wantsRandom { p = "-1" } opts.Cluster.Host = h _, err = fmt.Sscan(p, &opts.Cluster.Port) if err != nil { return err } if clusterURL.User != nil { pass, hasPassword := clusterURL.User.Password() if !hasPassword { return errors.New("expected cluster password to be set") } opts.Cluster.Password = pass user := clusterURL.User.Username() opts.Cluster.Username = user } else { // Since we override from flag and there is no user/pwd, make // sure we clear what we may have gotten from config file. opts.Cluster.Username = "" opts.Cluster.Password = "" } return nil } func processSignal(signal string) error { var ( pid string commandAndPid = strings.Split(signal, "=") ) if l := len(commandAndPid); l == 2 { pid = maybeReadPidFile(commandAndPid[1]) } else if l > 2 { return fmt.Errorf("invalid signal parameters: %v", commandAndPid[2:]) } if err := ProcessSignal(Command(commandAndPid[0]), pid); err != nil { return err } os.Exit(0) return nil } // maybeReadPidFile returns a PID or Windows service name obtained via the following method: // 1. Try to open a file with path "pidStr" (absolute or relative). // 2. If such a file exists and can be read, return its contents. // 3. Otherwise, return the original "pidStr" string. func maybeReadPidFile(pidStr string) string { if b, err := ioutil.ReadFile(pidStr); err == nil { return string(b) } return pidStr } func homeDir() (string, error) { if runtime.GOOS == "windows" { homeDrive, homePath := os.Getenv("HOMEDRIVE"), os.Getenv("HOMEPATH") userProfile := os.Getenv("USERPROFILE") home := filepath.Join(homeDrive, homePath) if homeDrive == "" || homePath == "" { if userProfile == "" { return "", errors.New("nats: failed to get home dir, require %HOMEDRIVE% and %HOMEPATH% or %USERPROFILE%") } home = userProfile } return home, nil } home := os.Getenv("HOME") if home == "" { return "", errors.New("failed to get home dir, require $HOME") } return home, nil } func expandPath(p string) (string, error) { p = os.ExpandEnv(p) if !strings.HasPrefix(p, "~") { return p, nil } home, err := homeDir() if err != nil { return "", err } return filepath.Join(home, p[1:]), nil }