Files
nats-server/server/client.go
Derek Collison f4f3d3baf1 Updates for operator based configurations.
Added update to parse and load operator JWTs.
Changed to add in signing keys from operator JWT to list of trusted keys.
Added URL account resolver.
Added account claim updates by system messages.

Signed-off-by: Derek Collison <derek@nats.io>
2018-12-02 20:34:33 -08:00

2813 lines
71 KiB
Go

// Copyright 2012-2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"math/rand"
"net"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/nats-io/jwt"
)
// Type of client connection.
const (
// CLIENT is an end user.
CLIENT = iota
// ROUTER is another router in the cluster.
ROUTER
// GATEWAY is a link between 2 clusters.
GATEWAY
// SYSTEM is an internal system client.
SYSTEM
)
const (
// ClientProtoZero is the original Client protocol from 2009.
// http://nats.io/documentation/internals/nats-protocol/
ClientProtoZero = iota
// ClientProtoInfo signals a client can receive more then the original INFO block.
// This can be used to update clients on other cluster members, etc.
ClientProtoInfo
)
func init() {
rand.Seed(time.Now().UnixNano())
}
const (
// Scratch buffer size for the processMsg() calls.
msgScratchSize = 1024
msgHeadProto = "RMSG "
msgHeadProtoLen = len(msgHeadProto)
)
// For controlling dynamic buffer sizes.
const (
startBufSize = 512 // For INFO/CONNECT block
minBufSize = 64 // Smallest to shrink to for PING/PONG
maxBufSize = 65536 // 64k
shortsToShrink = 2
)
// Represent client booleans with a bitmask
type clientFlag byte
// Some client state represented as flags
const (
connectReceived clientFlag = 1 << iota // The CONNECT proto has been received
infoReceived // The INFO protocol has been received
firstPongSent // The first PONG has been sent
handshakeComplete // For TLS clients, indicate that the handshake is complete
clearConnection // Marks that clearConnection has already been called.
flushOutbound // Marks client as having a flushOutbound call in progress.
noReconnect // Indicate that on close, this connection should not attempt a reconnect
)
// set the flag (would be equivalent to set the boolean to true)
func (cf *clientFlag) set(c clientFlag) {
*cf |= c
}
// clear the flag (would be equivalent to set the boolean to false)
func (cf *clientFlag) clear(c clientFlag) {
*cf &= ^c
}
// isSet returns true if the flag is set, false otherwise
func (cf clientFlag) isSet(c clientFlag) bool {
return cf&c != 0
}
// setIfNotSet will set the flag `c` only if that flag was not already
// set and return true to indicate that the flag has been set. Returns
// false otherwise.
func (cf *clientFlag) setIfNotSet(c clientFlag) bool {
if *cf&c == 0 {
*cf |= c
return true
}
return false
}
// ClosedState is the reason client was closed. This will
// be passed into calls to clearConnection, but will only
// be stored in ConnInfo for monitoring.
type ClosedState int
const (
ClientClosed = ClosedState(iota + 1)
AuthenticationTimeout
AuthenticationViolation
TLSHandshakeError
SlowConsumerPendingBytes
SlowConsumerWriteDeadline
WriteError
ReadError
ParseError
StaleConnection
ProtocolViolation
BadClientProtocolVersion
WrongPort
MaxAccountConnectionsExceeded
MaxConnectionsExceeded
MaxPayloadExceeded
MaxControlLineExceeded
MaxSubscriptionsExceeded
DuplicateRoute
RouteRemoved
ServerShutdown
AuthenticationExpired
WrongGateway
)
type client struct {
// Here first because of use of atomics, and memory alignment.
stats
mpay int32
msubs int
mu sync.Mutex
kind int
cid uint64
opts clientOpts
start time.Time
nonce []byte
nc net.Conn
ncs string
out outbound
srv *Server
acc *Account
user *NkeyUser
host string
port int
subs map[string]*subscription
perms *permissions
mperms *msgDeny
darray []string
in readCache
pcd map[*client]struct{}
atmr *time.Timer
ping pinfo
msgb [msgScratchSize]byte
last time.Time
parseState
rtt time.Duration
rttStart time.Time
route *route
gw *gateway
debug bool
trace bool
echo bool
flags clientFlag // Compact booleans into a single field. Size will be increased when needed.
}
// Struct for PING initiation from the server.
type pinfo struct {
tmr *time.Timer
out int
}
// outbound holds pending data for a socket.
type outbound struct {
p []byte // Primary write buffer
s []byte // Secondary for use post flush
nb net.Buffers // net.Buffers for writev IO
sz int // limit size per []byte, uses variable BufSize constants, start, min, max.
sws int // Number of short writes, used for dyanmic resizing.
pb int64 // Total pending/queued bytes.
pm int64 // Total pending/queued messages.
sg *sync.Cond // Flusher conditional for signaling.
fsp int // Flush signals that are pending from readLoop's pcd.
mp int64 // snapshot of max pending.
wdl time.Duration // Snapshot fo write deadline.
lft time.Duration // Last flush time.
}
type perm struct {
allow *Sublist
deny *Sublist
}
type permissions struct {
sub perm
pub perm
pcache map[string]bool
}
// msgDeny is used when a user permission for subscriptions has a deny
// clause but a subscription could be made that is of broader scope.
// e.g. deny = "foo", but user subscribes to "*". That subscription should
// succeed but no message sent on foo should be delivered.
type msgDeny struct {
deny *Sublist
dcache map[string]bool
}
// routeTarget collects information regarding routes and queue groups for
// sending information to a remote.
type routeTarget struct {
sub *subscription
qs []byte
_qs [32]byte
}
const (
maxResultCacheSize = 512
maxDenyPermCacheSize = 256
maxPermCacheSize = 128
pruneSize = 32
routeTargetInit = 8
)
// Used in readloop to cache hot subject lookups and group statistics.
type readCache struct {
// These are for clients who are bound to a single account.
genid uint64
results map[string]*SublistResult
// This is for routes and gateways to have their own L1 as well that is account aware.
pacache map[string]*perAccountCache
// This is for when we deliver messages across a route. We use this structure
// to make sure to only send one message and properly scope to queues as needed.
rts []routeTarget
prand *rand.Rand
msgs int
bytes int
subs int
rsz int // Read buffer size
srs int // Short reads, used for dynamic buffer resizing.
}
const (
maxPerAccountCacheSize = 32768
prunePerAccountCacheSize = 512
)
// perAccountCache is for L1 semantics for inbound messages from a route or gateway to mimic the performance of clients.
type perAccountCache struct {
acc *Account
results *SublistResult
genid uint64
}
func (c *client) String() (id string) {
return c.ncs
}
func (c *client) GetOpts() *clientOpts {
return &c.opts
}
// GetTLSConnectionState returns the TLS ConnectionState if TLS is enabled, nil
// otherwise. Implements the ClientAuth interface.
func (c *client) GetTLSConnectionState() *tls.ConnectionState {
tc, ok := c.nc.(*tls.Conn)
if !ok {
return nil
}
state := tc.ConnectionState()
return &state
}
// This is the main subscription struct that indicates
// interest in published messages.
// FIXME(dlc) - This is getting bloated for normal subs, need
// to optionally have an opts section for non-normal stuff.
type subscription struct {
client *client
im *streamImport // This is for import stream support.
shadow []*subscription // This is to track shadowed accounts.
subject []byte
queue []byte
sid []byte
nm int64
max int64
qw int32
}
type clientOpts struct {
Echo bool `json:"echo"`
Verbose bool `json:"verbose"`
Pedantic bool `json:"pedantic"`
TLSRequired bool `json:"tls_required"`
Nkey string `json:"nkey,omitempty"`
JWT string `json:"jwt,omitempty"`
Sig string `json:"sig,omitempty"`
Authorization string `json:"auth_token,omitempty"`
Username string `json:"user,omitempty"`
Password string `json:"pass,omitempty"`
Name string `json:"name"`
Lang string `json:"lang"`
Version string `json:"version"`
Protocol int `json:"protocol"`
Account string `json:"account,omitempty"`
AccountNew bool `json:"new_account,omitempty"`
// Routes only
Import *SubjectPermission `json:"import,omitempty"`
Export *SubjectPermission `json:"export,omitempty"`
}
var defaultOpts = clientOpts{Verbose: true, Pedantic: true, Echo: true}
func init() {
rand.Seed(time.Now().UnixNano())
}
// Lock should be held
func (c *client) initClient() {
s := c.srv
c.cid = atomic.AddUint64(&s.gcid, 1)
// Outbound data structure setup
c.out.sz = startBufSize
c.out.sg = sync.NewCond(&c.mu)
opts := s.getOpts()
// Snapshots to avoid mutex access in fast paths.
c.out.wdl = opts.WriteDeadline
c.out.mp = opts.MaxPending
c.subs = make(map[string]*subscription)
c.echo = true
c.debug = (atomic.LoadInt32(&c.srv.logging.debug) != 0)
c.trace = (atomic.LoadInt32(&c.srv.logging.trace) != 0)
// This is a scratch buffer used for processMsg()
// The msg header starts with "RMSG ", which can be used
// for both local and routes.
// in bytes that is [82 77 83 71 32].
c.msgb = [msgScratchSize]byte{82, 77, 83, 71, 32}
// This is to track pending clients that have data to be flushed
// after we process inbound msgs from our own connection.
c.pcd = make(map[*client]struct{})
// snapshot the string version of the connection
var conn string
if ip, ok := c.nc.(*net.TCPConn); ok {
addr := ip.RemoteAddr().(*net.TCPAddr)
c.host = addr.IP.String()
c.port = addr.Port
conn = fmt.Sprintf("%s:%d", addr.IP, addr.Port)
}
switch c.kind {
case CLIENT:
c.ncs = fmt.Sprintf("%s - cid:%d", conn, c.cid)
case ROUTER:
c.ncs = fmt.Sprintf("%s - rid:%d", conn, c.cid)
case GATEWAY:
c.ncs = fmt.Sprintf("%s - gid:%d", conn, c.cid)
case SYSTEM:
c.ncs = "SYSTEM"
}
}
// Helper function to report errors.
func (c *client) reportErrRegisterAccount(acc *Account, err error) {
if err == ErrTooManyAccountConnections {
c.maxAccountConnExceeded()
return
}
c.Errorf("Problem registering with account [%s]", acc.Name)
c.sendErr("Failed Account Registration")
}
// RegisterWithAccount will register the given user with a specific
// account. This will change the subject namespace.
func (c *client) registerWithAccount(acc *Account) error {
if acc == nil || acc.sl == nil {
return ErrBadAccount
}
// If we were previously registered, usually to $G, do accounting here to remove.
if c.acc != nil {
if prev := c.acc.removeClient(c); prev == 1 && c.srv != nil {
c.srv.decActiveAccounts()
}
}
// Check if we have a max connections violation
if acc.MaxTotalClientsReached() {
return ErrTooManyAccountConnections
}
// Add in new one.
if prev := acc.addClient(c); prev == 0 && c.srv != nil {
c.srv.incActiveAccounts()
}
c.mu.Lock()
c.acc = acc
c.applyAccountLimits()
c.mu.Unlock()
return nil
}
// Apply account limits
// Lock is held on entry.
// FIXME(dlc) - Should server be able to override here?
func (c *client) applyAccountLimits() {
if c.acc == nil {
return
}
// Set here, check for more details below. Only set if non-zero.
if c.acc.msubs > 0 {
c.msubs = c.acc.msubs
}
if c.acc.mpay > 0 {
c.mpay = c.acc.mpay
}
opts := c.srv.getOpts()
// We check here if the server has an option set that is lower than the account limit.
if c.mpay != 0 && opts.MaxPayload != 0 && int32(opts.MaxPayload) < c.acc.mpay {
c.Errorf("Max Payload set to %d from server config which overrides %d from account claims", opts.MaxPayload, c.acc.mpay)
c.mpay = int32(opts.MaxPayload)
}
// We check here if the server has an option set that is lower than the account limit.
if c.msubs != 0 && opts.MaxSubs != 0 && opts.MaxSubs < c.acc.msubs {
c.Errorf("Max Subscriptions set to %d from server config which overrides %d from account claims", opts.MaxSubs, c.acc.msubs)
c.msubs = opts.MaxSubs
}
if c.msubs > 0 && len(c.subs) > c.msubs {
go func() {
c.maxSubsExceeded()
time.Sleep(20 * time.Millisecond)
c.closeConnection(MaxSubscriptionsExceeded)
}()
}
}
// RegisterUser allows auth to call back into a new client
// with the authenticated user. This is used to map
// any permissions into the client and setup accounts.
func (c *client) RegisterUser(user *User) {
// Register with proper account and sublist.
if user.Account != nil {
if err := c.registerWithAccount(user.Account); err != nil {
c.reportErrRegisterAccount(user.Account, err)
return
}
}
c.mu.Lock()
defer c.mu.Unlock()
// Assign permissions.
if user.Permissions == nil {
// Reset perms to nil in case client previously had them.
c.perms = nil
c.mperms = nil
return
}
c.setPermissions(user.Permissions)
}
// RegisterNkey allows auth to call back into a new nkey
// client with the authenticated user. This is used to map
// any permissions into the client and setup accounts.
func (c *client) RegisterNkeyUser(user *NkeyUser) {
// Register with proper account and sublist.
if user.Account != nil {
if err := c.registerWithAccount(user.Account); err != nil {
c.reportErrRegisterAccount(user.Account, err)
return
}
}
c.mu.Lock()
defer c.mu.Unlock()
c.user = user
// Assign permissions.
if user.Permissions == nil {
// Reset perms to nil in case client previously had them.
c.perms = nil
c.mperms = nil
return
}
c.setPermissions(user.Permissions)
}
// Initializes client.perms structure.
// Lock is held on entry.
func (c *client) setPermissions(perms *Permissions) {
if perms == nil {
return
}
c.perms = &permissions{}
c.perms.pcache = make(map[string]bool)
// Loop over publish permissions
if perms.Publish != nil {
if len(perms.Publish.Allow) > 0 {
c.perms.pub.allow = NewSublist()
}
for _, pubSubject := range perms.Publish.Allow {
sub := &subscription{subject: []byte(pubSubject)}
c.perms.pub.allow.Insert(sub)
}
if len(perms.Publish.Deny) > 0 {
c.perms.pub.deny = NewSublist()
}
for _, pubSubject := range perms.Publish.Deny {
sub := &subscription{subject: []byte(pubSubject)}
c.perms.pub.deny.Insert(sub)
}
}
// Loop over subscribe permissions
if perms.Subscribe != nil {
if len(perms.Subscribe.Allow) > 0 {
c.perms.sub.allow = NewSublist()
}
for _, subSubject := range perms.Subscribe.Allow {
sub := &subscription{subject: []byte(subSubject)}
c.perms.sub.allow.Insert(sub)
}
if len(perms.Subscribe.Deny) > 0 {
c.perms.sub.deny = NewSublist()
// Also hold onto this array for later.
c.darray = perms.Subscribe.Deny
}
for _, subSubject := range perms.Subscribe.Deny {
sub := &subscription{subject: []byte(subSubject)}
c.perms.sub.deny.Insert(sub)
}
}
}
// Check to see if we have an expiration for the user JWT via base claims.
// FIXME(dlc) - Clear on connect with new JWT.
func (c *client) checkExpiration(claims *jwt.ClaimsData) {
if claims.Expires == 0 {
return
}
tn := time.Now().Unix()
if claims.Expires < tn {
return
}
expiresAt := time.Duration(claims.Expires - tn)
c.setExpirationTimer(expiresAt * time.Second)
}
// This will load up the deny structure used for filtering delivered
// messages based on a deny clause for subscriptions.
// Lock should be held.
func (c *client) loadMsgDenyFilter() {
c.mperms = &msgDeny{NewSublist(), make(map[string]bool)}
for _, sub := range c.darray {
c.mperms.deny.Insert(&subscription{subject: []byte(sub)})
}
}
// writeLoop is the main socket write functionality.
// Runs in its own Go routine.
func (c *client) writeLoop() {
defer c.srv.grWG.Done()
// Used to check that we did flush from last wake up.
waitOk := true
// Main loop. Will wait to be signaled and then will use
// buffered outbound structure for efficient writev to the underlying socket.
for {
c.mu.Lock()
if waitOk && (c.out.pb == 0 || c.out.fsp > 0) && len(c.out.nb) == 0 && !c.flags.isSet(clearConnection) {
// Wait on pending data.
c.out.sg.Wait()
}
// Flush data
waitOk = c.flushOutbound()
isClosed := c.flags.isSet(clearConnection)
c.mu.Unlock()
if isClosed {
return
}
}
}
// readLoop is the main socket read functionality.
// Runs in its own Go routine.
func (c *client) readLoop() {
// Grab the connection off the client, it will be cleared on a close.
// We check for that after the loop, but want to avoid a nil dereference
c.mu.Lock()
nc := c.nc
s := c.srv
c.in.rsz = startBufSize
defer s.grWG.Done()
c.mu.Unlock()
if nc == nil {
return
}
// Start read buffer.
b := make([]byte, c.in.rsz)
for {
n, err := nc.Read(b)
if err != nil {
if err == io.EOF {
c.closeConnection(ClientClosed)
} else {
c.closeConnection(ReadError)
}
return
}
// Grab for updates for last activity.
last := time.Now()
// Clear inbound stats cache
c.in.msgs = 0
c.in.bytes = 0
c.in.subs = 0
// Main call into parser for inbound data. This will generate callouts
// to process messages, etc.
if err := c.parse(b[:n]); err != nil {
// handled inline
if err != ErrMaxPayload && err != ErrAuthentication {
c.Errorf("%s", err.Error())
c.closeConnection(ProtocolViolation)
}
return
}
// Updates stats for client and server that were collected
// from parsing through the buffer.
if c.in.msgs > 0 {
atomic.AddInt64(&c.inMsgs, int64(c.in.msgs))
atomic.AddInt64(&c.inBytes, int64(c.in.bytes))
atomic.AddInt64(&s.inMsgs, int64(c.in.msgs))
atomic.AddInt64(&s.inBytes, int64(c.in.bytes))
}
// Budget to spend in place flushing outbound data.
// Client will be checked on several fronts to see
// if applicable. Routes will never wait in place.
budget := 500 * time.Microsecond
if c.kind == ROUTER {
budget = 0
}
// Check pending clients for flush.
for cp := range c.pcd {
// Queue up a flush for those in the set
cp.mu.Lock()
// Update last activity for message delivery
cp.last = last
cp.out.fsp--
if budget > 0 && cp.flushOutbound() {
budget -= cp.out.lft
} else {
cp.flushSignal()
}
cp.mu.Unlock()
delete(c.pcd, cp)
}
// Update activity, check read buffer size.
c.mu.Lock()
nc := c.nc
// Activity based on interest changes or data/msgs.
if c.in.msgs > 0 || c.in.subs > 0 {
c.last = last
}
if n >= cap(b) {
c.in.srs = 0
} else if n < cap(b)/2 { // divide by 2 b/c we want less than what we would shrink to.
c.in.srs++
}
// Update read buffer size as/if needed.
if n >= cap(b) && cap(b) < maxBufSize {
// Grow
c.in.rsz = cap(b) * 2
b = make([]byte, c.in.rsz)
} else if n < cap(b) && cap(b) > minBufSize && c.in.srs > shortsToShrink {
// Shrink, for now don't accelerate, ping/pong will eventually sort it out.
c.in.rsz = cap(b) / 2
b = make([]byte, c.in.rsz)
}
c.mu.Unlock()
// Check to see if we got closed, e.g. slow consumer
if nc == nil {
return
}
}
}
// collapsePtoNB will place primary onto nb buffer as needed in prep for WriteTo.
// This will return a copy on purpose.
func (c *client) collapsePtoNB() net.Buffers {
if c.out.p != nil {
p := c.out.p
c.out.p = nil
return append(c.out.nb, p)
}
return c.out.nb
}
// This will handle the fixup needed on a partial write.
// Assume pending has been already calculated correctly.
func (c *client) handlePartialWrite(pnb net.Buffers) {
nb := c.collapsePtoNB()
// The partial needs to be first, so append nb to pnb
c.out.nb = append(pnb, nb...)
}
// flushOutbound will flush outbound buffer to a client.
// Will return if data was attempted to be written.
// Lock must be held
func (c *client) flushOutbound() bool {
if c.flags.isSet(flushOutbound) {
return false
}
c.flags.set(flushOutbound)
defer c.flags.clear(flushOutbound)
// Check for nothing to do.
if c.nc == nil || c.srv == nil || c.out.pb == 0 {
return true // true because no need to queue a signal.
}
// Snapshot opts
srv := c.srv
// Place primary on nb, assign primary to secondary, nil out nb and secondary.
nb := c.collapsePtoNB()
c.out.p, c.out.nb, c.out.s = c.out.s, nil, nil
// For selecting primary replacement.
cnb := nb
// In case it goes away after releasing the lock.
nc := c.nc
attempted := c.out.pb
apm := c.out.pm
// Do NOT hold lock during actual IO
c.mu.Unlock()
// flush here
now := time.Now()
// FIXME(dlc) - writev will do multiple IOs past 1024 on
// most platforms, need to account for that with deadline?
nc.SetWriteDeadline(now.Add(c.out.wdl))
// Actual write to the socket.
n, err := nb.WriteTo(nc)
nc.SetWriteDeadline(time.Time{})
lft := time.Since(now)
// Re-acquire client lock
c.mu.Lock()
// Update flush time statistics
c.out.lft = lft
// Subtract from pending bytes and messages.
c.out.pb -= n
c.out.pm -= apm // FIXME(dlc) - this will not be accurate.
// Check for partial writes
if n != attempted && n > 0 {
c.handlePartialWrite(nb)
} else if n >= int64(c.out.sz) {
c.out.sws = 0
}
if err != nil {
if n == 0 {
c.out.pb -= attempted
}
if ne, ok := err.(net.Error); ok && ne.Timeout() {
atomic.AddInt64(&srv.slowConsumers, 1)
c.clearConnection(SlowConsumerWriteDeadline)
c.Noticef("Slow Consumer Detected: WriteDeadline of %v Exceeded", c.out.wdl)
} else {
c.clearConnection(WriteError)
c.Debugf("Error flushing: %v", err)
}
return true
}
// Adjust based on what we wrote plus any pending.
pt := int(n + c.out.pb)
// Adjust sz as needed downward, keeping power of 2.
// We do this at a slower rate, hence the pt*4.
if pt < c.out.sz && c.out.sz > minBufSize {
c.out.sws++
if c.out.sws > shortsToShrink {
c.out.sz >>= 1
}
}
// Adjust sz as needed upward, keeping power of 2.
if pt > c.out.sz && c.out.sz < maxBufSize {
c.out.sz <<= 1
}
// Check to see if we can reuse buffers.
if len(cnb) > 0 {
oldp := cnb[0][:0]
if cap(oldp) >= c.out.sz {
// Replace primary or secondary if they are nil, reusing same buffer.
if c.out.p == nil {
c.out.p = oldp
} else if c.out.s == nil || cap(c.out.s) < c.out.sz {
c.out.s = oldp
}
}
}
return true
}
// flushSignal will use server to queue the flush IO operation to a pool of flushers.
// Lock must be held.
func (c *client) flushSignal() {
c.out.sg.Signal()
}
func (c *client) traceMsg(msg []byte) {
if !c.trace {
return
}
// FIXME(dlc), allow limits to printable payload.
c.Tracef("<<- MSG_PAYLOAD: [%q]", msg[:len(msg)-LEN_CR_LF])
}
func (c *client) traceInOp(op string, arg []byte) {
c.traceOp("<<- %s", op, arg)
}
func (c *client) traceOutOp(op string, arg []byte) {
c.traceOp("->> %s", op, arg)
}
func (c *client) traceOp(format, op string, arg []byte) {
if !c.trace {
return
}
opa := []interface{}{}
if op != "" {
opa = append(opa, op)
}
if arg != nil {
opa = append(opa, string(arg))
}
c.Tracef(format, opa)
}
// Process the information messages from Clients and other Routes.
func (c *client) processInfo(arg []byte) error {
info := Info{}
if err := json.Unmarshal(arg, &info); err != nil {
return err
}
switch c.kind {
case ROUTER:
c.processRouteInfo(&info)
case GATEWAY:
c.processGatewayInfo(&info)
}
return nil
}
func (c *client) processErr(errStr string) {
switch c.kind {
case CLIENT:
c.Errorf("Client Error %s", errStr)
case ROUTER:
c.Errorf("Route Error %s", errStr)
case GATEWAY:
c.Errorf("Gateway Error %s", errStr)
}
c.closeConnection(ParseError)
}
// Password pattern matcher.
var passPat = regexp.MustCompile(`"?\s*pass\S*?"?\s*[:=]\s*"?(([^",\r\n}])*)`)
// removePassFromTrace removes any notion of passwords from trace
// messages for logging.
func removePassFromTrace(arg []byte) []byte {
if !bytes.Contains(arg, []byte(`pass`)) {
return arg
}
// Take a copy of the connect proto just for the trace message.
var _arg [4096]byte
buf := append(_arg[:0], arg...)
m := passPat.FindAllSubmatchIndex(buf, -1)
if len(m) == 0 {
return arg
}
redactedPass := []byte("[REDACTED]")
for _, i := range m {
if len(i) < 4 {
continue
}
start := i[2]
end := i[3]
// Replace password substring.
buf = append(buf[:start], append(redactedPass, buf[end:]...)...)
break
}
return buf
}
func (c *client) processConnect(arg []byte) error {
if c.trace {
c.traceInOp("CONNECT", removePassFromTrace(arg))
}
c.mu.Lock()
// If we can't stop the timer because the callback is in progress...
if !c.clearAuthTimer() {
// wait for it to finish and handle sending the failure back to
// the client.
for c.nc != nil {
c.mu.Unlock()
time.Sleep(25 * time.Millisecond)
c.mu.Lock()
}
c.mu.Unlock()
return nil
}
c.last = time.Now()
kind := c.kind
srv := c.srv
// Moved unmarshalling of clients' Options under the lock.
// The client has already been added to the server map, so it is possible
// that other routines lookup the client, and access its options under
// the client's lock, so unmarshalling the options outside of the lock
// would cause data RACEs.
if err := json.Unmarshal(arg, &c.opts); err != nil {
c.mu.Unlock()
return err
}
// Indicate that the CONNECT protocol has been received, and that the
// server now knows which protocol this client supports.
c.flags.set(connectReceived)
// Capture these under lock
c.echo = c.opts.Echo
proto := c.opts.Protocol
verbose := c.opts.Verbose
lang := c.opts.Lang
account := c.opts.Account
accountNew := c.opts.AccountNew
c.mu.Unlock()
if srv != nil {
// Applicable to clients only:
// As soon as c.opts is unmarshalled and if the proto is at
// least ClientProtoInfo, we need to increment the following counter.
// This is decremented when client is removed from the server's
// clients map.
if kind == CLIENT && proto >= ClientProtoInfo {
srv.mu.Lock()
srv.cproto++
srv.mu.Unlock()
}
// Check for Auth
if ok := srv.checkAuthentication(c); !ok {
c.authViolation()
return ErrAuthentication
}
// Check for Account designation
if account != "" {
var acc *Account
var wasNew bool
if !srv.NewAccountsAllowed() {
acc = srv.LookupAccount(account)
if acc == nil {
c.Errorf(ErrMissingAccount.Error())
c.sendErr("Account Not Found")
return ErrMissingAccount
} else if accountNew {
c.Errorf(ErrAccountExists.Error())
c.sendErr(ErrAccountExists.Error())
return ErrAccountExists
}
} else {
// We can create this one on the fly.
acc, wasNew = srv.LookupOrRegisterAccount(account)
if accountNew && !wasNew {
c.Errorf(ErrAccountExists.Error())
c.sendErr(ErrAccountExists.Error())
return ErrAccountExists
}
}
// If we are here we can register ourselves with the new account.
if err := c.registerWithAccount(acc); err != nil {
c.reportErrRegisterAccount(acc, err)
return ErrBadAccount
}
} else if c.acc == nil {
// By default register with the global account.
c.registerWithAccount(srv.gacc)
}
}
switch kind {
case CLIENT:
// Check client protocol request if it exists.
if proto < ClientProtoZero || proto > ClientProtoInfo {
c.sendErr(ErrBadClientProtocol.Error())
c.closeConnection(BadClientProtocolVersion)
return ErrBadClientProtocol
}
if verbose {
c.sendOK()
}
case ROUTER:
// Delegate the rest of processing to the route
return c.processRouteConnect(srv, arg, lang)
case GATEWAY:
// Delegate the rest of processing to the gateway
return c.processGatewayConnect(arg)
}
return nil
}
func (c *client) sendErrAndErr(err string) {
c.sendErr(err)
c.Errorf(err)
}
func (c *client) sendErrAndDebug(err string) {
c.sendErr(err)
c.Debugf(err)
}
func (c *client) authTimeout() {
c.sendErrAndDebug("Authentication Timeout")
c.closeConnection(AuthenticationTimeout)
}
func (c *client) authExpired() {
c.sendErrAndDebug("User Authentication Expired")
c.closeConnection(AuthenticationExpired)
}
func (c *client) accountAuthExpired() {
c.sendErrAndDebug("Account Authentication Expired")
c.closeConnection(AuthenticationExpired)
}
func (c *client) authViolation() {
var hasTrustedNkeys, hasNkeys, hasUsers bool
if s := c.srv; s != nil {
s.mu.Lock()
hasTrustedNkeys = len(s.trustedKeys) > 0
hasNkeys = s.nkeys != nil
hasUsers = s.users != nil
s.mu.Unlock()
}
if hasTrustedNkeys {
c.Errorf("%v", ErrAuthentication)
} else if hasNkeys {
c.Errorf("%s - Nkey %q",
ErrAuthentication.Error(),
c.opts.Nkey)
} else if hasUsers {
c.Errorf("%s - User %q",
ErrAuthentication.Error(),
c.opts.Username)
} else {
c.Errorf(ErrAuthentication.Error())
}
c.sendErr("Authorization Violation")
c.closeConnection(AuthenticationViolation)
}
func (c *client) maxAccountConnExceeded() {
c.sendErrAndErr(ErrTooManyAccountConnections.Error())
c.closeConnection(MaxAccountConnectionsExceeded)
}
func (c *client) maxConnExceeded() {
c.sendErrAndErr(ErrTooManyConnections.Error())
c.closeConnection(MaxConnectionsExceeded)
}
func (c *client) maxSubsExceeded() {
c.sendErrAndErr(ErrTooManySubs.Error())
}
func (c *client) maxPayloadViolation(sz int, max int32) {
c.Errorf("%s: %d vs %d", ErrMaxPayload.Error(), sz, max)
c.sendErr("Maximum Payload Violation")
c.closeConnection(MaxPayloadExceeded)
}
// queueOutbound queues data for client/route connections.
// Return if the data is referenced or not. If referenced, the caller
// should not reuse the `data` array.
// Lock should be held.
func (c *client) queueOutbound(data []byte) bool {
// Assume data will not be referenced
referenced := false
// Add to pending bytes total.
c.out.pb += int64(len(data))
// Check for slow consumer via pending bytes limit.
// ok to return here, client is going away.
if c.out.pb > c.out.mp {
c.clearConnection(SlowConsumerPendingBytes)
atomic.AddInt64(&c.srv.slowConsumers, 1)
c.Noticef("Slow Consumer Detected: MaxPending of %d Exceeded", c.out.mp)
return referenced
}
if c.out.p == nil && len(data) < maxBufSize {
if c.out.sz == 0 {
c.out.sz = startBufSize
}
if c.out.s != nil && cap(c.out.s) >= c.out.sz {
c.out.p = c.out.s
c.out.s = nil
} else {
// FIXME(dlc) - make power of 2 if less than maxBufSize?
c.out.p = make([]byte, 0, c.out.sz)
}
}
// Determine if we copy or reference
available := cap(c.out.p) - len(c.out.p)
if len(data) > available {
// We can fit into existing primary, but message will fit in next one
// we allocate or utilize from the secondary. So copy what we can.
if available > 0 && len(data) < c.out.sz {
c.out.p = append(c.out.p, data[:available]...)
data = data[available:]
}
// Put the primary on the nb if it has a payload
if len(c.out.p) > 0 {
c.out.nb = append(c.out.nb, c.out.p)
c.out.p = nil
}
// Check for a big message, and if found place directly on nb
// FIXME(dlc) - do we need signaling of ownership here if we want len(data) < maxBufSize
if len(data) > maxBufSize {
c.out.nb = append(c.out.nb, data)
referenced = true
} else {
// We will copy to primary.
if c.out.p == nil {
// Grow here
if (c.out.sz << 1) <= maxBufSize {
c.out.sz <<= 1
}
if len(data) > c.out.sz {
c.out.p = make([]byte, 0, len(data))
} else {
if c.out.s != nil && cap(c.out.s) >= c.out.sz { // TODO(dlc) - Size mismatch?
c.out.p = c.out.s
c.out.s = nil
} else {
c.out.p = make([]byte, 0, c.out.sz)
}
}
}
c.out.p = append(c.out.p, data...)
}
} else {
c.out.p = append(c.out.p, data...)
}
return referenced
}
// Assume the lock is held upon entry.
func (c *client) sendProto(info []byte, doFlush bool) {
if c.nc == nil {
return
}
c.queueOutbound(info)
if !(doFlush && c.flushOutbound()) {
c.flushSignal()
}
}
// Assume the lock is held upon entry.
func (c *client) sendPong() {
c.traceOutOp("PONG", nil)
c.sendProto([]byte("PONG\r\n"), true)
}
// Assume the lock is held upon entry.
func (c *client) sendPing() {
c.rttStart = time.Now()
c.ping.out++
c.traceOutOp("PING", nil)
c.sendProto([]byte("PING\r\n"), true)
}
// Generates the INFO to be sent to the client with the client ID included.
// info arg will be copied since passed by value.
// Assume lock is held.
func (c *client) generateClientInfoJSON(info Info) []byte {
info.CID = c.cid
// Generate the info json
b, _ := json.Marshal(info)
pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)}
return bytes.Join(pcs, []byte(" "))
}
// Assume the lock is held upon entry.
func (c *client) sendInfo(info []byte) {
c.sendProto(info, true)
}
func (c *client) sendErr(err string) {
c.mu.Lock()
c.traceOutOp("-ERR", []byte(err))
c.sendProto([]byte(fmt.Sprintf("-ERR '%s'\r\n", err)), true)
c.mu.Unlock()
}
func (c *client) sendOK() {
c.mu.Lock()
c.traceOutOp("OK", nil)
// Can not autoflush this one, needs to be async.
c.sendProto([]byte("+OK\r\n"), false)
// FIXME(dlc) - ??
c.pcd[c] = needFlush
c.mu.Unlock()
}
func (c *client) processPing() {
c.mu.Lock()
c.traceInOp("PING", nil)
if c.nc == nil {
c.mu.Unlock()
return
}
c.sendPong()
// If not a CLIENT, we are done
if c.kind != CLIENT {
c.mu.Unlock()
return
}
// The CONNECT should have been received, but make sure it
// is so before proceeding
if !c.flags.isSet(connectReceived) {
c.mu.Unlock()
return
}
// If we are here, the CONNECT has been received so we know
// if this client supports async INFO or not.
var (
checkClusterChange bool
srv = c.srv
)
// For older clients, just flip the firstPongSent flag if not already
// set and we are done.
if c.opts.Protocol < ClientProtoInfo || srv == nil {
c.flags.setIfNotSet(firstPongSent)
} else {
// This is a client that supports async INFO protocols.
// If this is the first PING (so firstPongSent is not set yet),
// we will need to check if there was a change in cluster topology.
checkClusterChange = !c.flags.isSet(firstPongSent)
}
c.mu.Unlock()
if checkClusterChange {
srv.mu.Lock()
c.mu.Lock()
// Now that we are under both locks, we can flip the flag.
// This prevents sendAsyncInfoToClients() and and code here
// to send a double INFO protocol.
c.flags.set(firstPongSent)
// If there was a cluster update since this client was created,
// send an updated INFO protocol now.
if srv.lastCURLsUpdate >= c.start.UnixNano() {
c.sendInfo(c.generateClientInfoJSON(srv.copyInfo()))
}
c.mu.Unlock()
srv.mu.Unlock()
}
}
func (c *client) processPong() {
c.traceInOp("PONG", nil)
c.mu.Lock()
c.ping.out = 0
c.rtt = time.Since(c.rttStart)
srv := c.srv
reorderGWs := c.kind == GATEWAY && c.gw.outbound
c.mu.Unlock()
if reorderGWs {
srv.gateway.orderOutboundConnections()
}
}
func (c *client) processPub(trace bool, arg []byte) error {
if trace {
c.traceInOp("PUB", arg)
}
// Unroll splitArgs to avoid runtime/heap issues
a := [MAX_PUB_ARGS][]byte{}
args := a[:0]
start := -1
for i, b := range arg {
switch b {
case ' ', '\t':
if start >= 0 {
args = append(args, arg[start:i])
start = -1
}
default:
if start < 0 {
start = i
}
}
}
if start >= 0 {
args = append(args, arg[start:])
}
c.pa.arg = arg
switch len(args) {
case 2:
c.pa.subject = args[0]
c.pa.reply = nil
c.pa.size = parseSize(args[1])
c.pa.szb = args[1]
case 3:
c.pa.subject = args[0]
c.pa.reply = args[1]
c.pa.size = parseSize(args[2])
c.pa.szb = args[2]
default:
return fmt.Errorf("processPub Parse Error: '%s'", arg)
}
if c.pa.size < 0 {
return fmt.Errorf("processPub Bad or Missing Size: '%s'", arg)
}
maxPayload := atomic.LoadInt32(&c.mpay)
if maxPayload > 0 && int32(c.pa.size) > maxPayload {
c.maxPayloadViolation(c.pa.size, maxPayload)
return ErrMaxPayload
}
if c.opts.Pedantic && !IsValidLiteralSubject(string(c.pa.subject)) {
c.sendErr("Invalid Publish Subject")
}
return nil
}
func splitArg(arg []byte) [][]byte {
a := [MAX_MSG_ARGS][]byte{}
args := a[:0]
start := -1
for i, b := range arg {
switch b {
case ' ', '\t', '\r', '\n':
if start >= 0 {
args = append(args, arg[start:i])
start = -1
}
default:
if start < 0 {
start = i
}
}
}
if start >= 0 {
args = append(args, arg[start:])
}
return args
}
func (c *client) processSub(argo []byte) (err error) {
c.traceInOp("SUB", argo)
// Indicate activity.
c.in.subs++
// Copy so we do not reference a potentially large buffer
// FIXME(dlc) - make more efficient.
arg := make([]byte, len(argo))
copy(arg, argo)
args := splitArg(arg)
sub := &subscription{client: c}
switch len(args) {
case 2:
sub.subject = args[0]
sub.queue = nil
sub.sid = args[1]
case 3:
sub.subject = args[0]
sub.queue = args[1]
sub.sid = args[2]
default:
return fmt.Errorf("processSub Parse Error: '%s'", arg)
}
c.mu.Lock()
// Grab connection type.
kind := c.kind
if c.nc == nil && kind != SYSTEM {
c.mu.Unlock()
return nil
}
// Check permissions if applicable.
if kind == ROUTER {
if !c.canExport(string(sub.subject)) {
c.mu.Unlock()
return nil
}
} else if kind == CLIENT && !c.canSubscribe(string(sub.subject)) {
c.mu.Unlock()
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
c.Errorf("Subscription Violation - User %q, Subject %q, SID %s",
c.opts.Username, sub.subject, sub.sid)
return nil
}
// Check if we have a maximum on the number of subscriptions.
if c.msubs > 0 && len(c.subs) >= c.msubs {
c.mu.Unlock()
c.maxSubsExceeded()
return nil
}
sid := string(sub.sid)
acc := c.acc
// Subscribe here.
if c.subs[sid] == nil {
c.subs[sid] = sub
if acc != nil && acc.sl != nil {
err = acc.sl.Insert(sub)
if err != nil {
delete(c.subs, sid)
}
}
}
c.mu.Unlock()
if err != nil {
c.sendErr("Invalid Subject")
return nil
} else if c.opts.Verbose && kind != SYSTEM {
c.sendOK()
}
if acc != nil {
if err := c.addShadowSubscriptions(acc, sub); err != nil {
c.Errorf(err.Error())
}
// If we are routing and this is a local sub, add to the route map for the associated account.
if kind == CLIENT || kind == SYSTEM {
c.srv.updateRouteSubscriptionMap(acc, sub, 1)
}
}
return nil
}
// If the client's account has stream imports and there are matches for
// this subscription's subject, then add shadow subscriptions in
// other accounts that can export this subject.
func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error {
if acc == nil {
return ErrMissingAccount
}
var (
rims [32]*streamImport
ims = rims[:0]
rfroms [32]*streamImport
froms = rfroms[:0]
tokens []string
tsa [32]string
hasWC bool
)
acc.mu.RLock()
// Loop over the import subjects. We have 3 scenarios. If we exact
// match or we know the proposed subject is a strict subset of the
// import we can subscribe to the subscription's subject directly.
// The third scenario is where the proposed subject has a wildcard
// and may not be an exact subset, but is a match. Therefore we have to
// subscribe to the import subject, not the subscription's subject.
for _, im := range acc.imports.streams {
if im.invalid {
continue
}
subj := string(sub.subject)
if subj == im.prefix+im.from {
ims = append(ims, im)
continue
}
if tokens == nil {
tokens = tsa[:0]
start := 0
for i := 0; i < len(subj); i++ {
// This is not perfect, but the test below will
// be more exact, this is just to trigger the
// additional test.
if subj[i] == pwc || subj[i] == fwc {
hasWC = true
} else if subj[i] == btsep {
tokens = append(tokens, subj[start:i])
start = i + 1
}
}
tokens = append(tokens, subj[start:])
}
if isSubsetMatch(tokens, im.prefix+im.from) {
ims = append(ims, im)
} else if hasWC {
if subjectIsSubsetMatch(im.prefix+im.from, subj) {
froms = append(froms, im)
}
}
}
acc.mu.RUnlock()
var shadow []*subscription
if len(ims) > 0 || len(froms) > 0 {
shadow = make([]*subscription, 0, len(ims)+len(froms))
}
// Now walk through collected importMaps
for _, im := range ims {
// We will create a shadow subscription.
nsub, err := c.addShadowSub(sub, im, false)
if err != nil {
return err
}
shadow = append(shadow, nsub)
}
// Now walk through importMaps that we need to subscribe
// exactly to the from property.
for _, im := range froms {
// We will create a shadow subscription.
nsub, err := c.addShadowSub(sub, im, true)
if err != nil {
return err
}
shadow = append(shadow, nsub)
}
if shadow != nil {
c.mu.Lock()
sub.shadow = shadow
c.mu.Unlock()
}
return nil
}
// Add in the shadow subscription.
func (c *client) addShadowSub(sub *subscription, im *streamImport, useFrom bool) (*subscription, error) {
nsub := *sub // copy
nsub.im = im
if useFrom {
nsub.subject = []byte(im.from)
} else if im.prefix != "" {
// redo subject here to match subject in the publisher account space.
// Just remove prefix from what they gave us. That maps into other space.
nsub.subject = sub.subject[len(im.prefix):]
}
c.Debugf("Creating import subscription on %q from account %q", nsub.subject, im.acc.Name)
if err := im.acc.sl.Insert(&nsub); err != nil {
errs := fmt.Sprintf("Could not add shadow import subscription for account %q", im.acc.Name)
c.Debugf(errs)
return nil, fmt.Errorf(errs)
}
// Update our route map here.
c.srv.updateRouteSubscriptionMap(im.acc, &nsub, 1)
return &nsub, nil
}
// canSubscribe determines if the client is authorized to subscribe to the
// given subject. Assumes caller is holding lock.
func (c *client) canSubscribe(subject string) bool {
if c.perms == nil {
return true
}
allowed := true
// Check allow list. If no allow list that means all are allowed. Deny can overrule.
if c.perms.sub.allow != nil {
r := c.perms.sub.allow.Match(subject)
allowed = len(r.psubs) != 0
}
// If we have a deny list and we think we are allowed, check that as well.
if allowed && c.perms.sub.deny != nil {
r := c.perms.sub.deny.Match(subject)
allowed = len(r.psubs) == 0
// We use the actual subscription to signal us to spin up the deny mperms
// and cache. We check if the subject is a wildcard that contains any of
// the deny clauses.
// FIXME(dlc) - We could be smarter and track when these go away and remove.
if allowed && c.mperms == nil && subjectHasWildcard(subject) {
// Whip through the deny array and check if this wildcard subject is within scope.
for _, sub := range c.darray {
tokens := strings.Split(sub, tsep)
if isSubsetMatch(tokens, sub) {
c.loadMsgDenyFilter()
break
}
}
}
}
return allowed
}
// Low level unsubscribe for a given client.
func (c *client) unsubscribe(acc *Account, sub *subscription, force bool) {
c.mu.Lock()
defer c.mu.Unlock()
if !force && sub.max > 0 && sub.nm < sub.max {
c.Debugf(
"Deferring actual UNSUB(%s): %d max, %d received",
string(sub.subject), sub.max, sub.nm)
return
}
c.traceOp("<-> %s", "DELSUB", sub.sid)
delete(c.subs, string(sub.sid))
if c.kind != CLIENT && c.kind != SYSTEM {
c.removeReplySubTimeout(sub)
}
if acc != nil {
acc.sl.Remove(sub)
}
// Check to see if we have shadow subscriptions.
for _, nsub := range sub.shadow {
if err := nsub.im.acc.sl.Remove(nsub); err != nil {
c.Debugf("Could not remove shadow import subscription for account %q", nsub.im.acc.Name)
} else if c.kind == CLIENT || c.kind == SYSTEM && c.srv != nil {
c.srv.updateRouteSubscriptionMap(nsub.im.acc, nsub, -1)
}
}
sub.shadow = nil
}
func (c *client) processUnsub(arg []byte) error {
c.traceInOp("UNSUB", arg)
args := splitArg(arg)
var sid []byte
max := -1
switch len(args) {
case 1:
sid = args[0]
case 2:
sid = args[0]
max = parseSize(args[1])
default:
return fmt.Errorf("processUnsub Parse Error: '%s'", arg)
}
// Indicate activity.
c.in.subs++
var sub *subscription
unsub := false
ok := false
c.mu.Lock()
// Grab connection type.
kind := c.kind
var acc *Account
if sub, ok = c.subs[string(sid)]; ok {
acc = c.acc
if max > 0 {
sub.max = int64(max)
} else {
// Clear it here to override
sub.max = 0
unsub = true
}
}
c.mu.Unlock()
if c.opts.Verbose {
c.sendOK()
}
if unsub {
c.unsubscribe(acc, sub, false)
if acc != nil && kind == CLIENT || kind == SYSTEM {
c.srv.updateRouteSubscriptionMap(acc, sub, -1)
}
}
return nil
}
// checkDenySub will check if we are allowed to deliver this message in the
// presence of deny clauses for subscriptions. Deny clauses will not prevent
// larger scoped wildcard subscriptions, so we need to check at delivery time.
// Lock should be held.
func (c *client) checkDenySub(subject string) bool {
if denied, ok := c.mperms.dcache[subject]; ok {
return denied
} else if r := c.mperms.deny.Match(subject); len(r.psubs) != 0 {
c.mperms.dcache[subject] = true
return true
} else {
c.mperms.dcache[subject] = false
}
if len(c.mperms.dcache) > maxDenyPermCacheSize {
c.pruneDenyCache()
}
return false
}
func (c *client) msgHeader(mh []byte, sub *subscription, reply []byte) []byte {
if len(sub.sid) > 0 {
mh = append(mh, sub.sid...)
mh = append(mh, ' ')
}
if reply != nil {
mh = append(mh, reply...)
mh = append(mh, ' ')
}
mh = append(mh, c.pa.szb...)
mh = append(mh, _CRLF_...)
return mh
}
// Used to treat maps as efficient set
var needFlush = struct{}{}
func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
if sub.client == nil {
return false
}
client := sub.client
client.mu.Lock()
// Check echo
if c == client && !client.echo {
client.mu.Unlock()
return false
}
// Check if we have a subscribe deny clause. This will trigger us to check the subject
// for a match against the denied subjects.
if client.mperms != nil && client.checkDenySub(string(c.pa.subject)) {
client.mu.Unlock()
return false
}
srv := client.srv
sub.nm++
// Check if we should auto-unsubscribe.
if sub.max > 0 {
if client.kind == ROUTER && sub.nm >= sub.max {
// The only router based messages that we will see here are remoteReplies.
// We handle these slightly differently.
defer client.removeReplySub(sub)
} else {
// For routing..
shouldForward := client.kind == CLIENT || client.kind == SYSTEM && client.srv != nil
// If we are at the exact number, unsubscribe but
// still process the message in hand, otherwise
// unsubscribe and drop message on the floor.
if sub.nm == sub.max {
client.Debugf("Auto-unsubscribe limit of %d reached for sid '%s'", sub.max, string(sub.sid))
// Due to defer, reverse the code order so that execution
// is consistent with other cases where we unsubscribe.
if shouldForward {
defer srv.updateRouteSubscriptionMap(client.acc, sub, -1)
}
defer client.unsubscribe(client.acc, sub, true)
} else if sub.nm > sub.max {
client.Debugf("Auto-unsubscribe limit [%d] exceeded", sub.max)
client.mu.Unlock()
client.unsubscribe(client.acc, sub, true)
if shouldForward {
srv.updateRouteSubscriptionMap(client.acc, sub, -1)
}
return false
}
}
}
// Update statistics
// The msg includes the CR_LF, so pull back out for accounting.
msgSize := int64(len(msg) - LEN_CR_LF)
// No atomic needed since accessed under client lock.
// Monitor is reading those also under client's lock.
client.outMsgs++
client.outBytes += msgSize
atomic.AddInt64(&srv.outMsgs, 1)
atomic.AddInt64(&srv.outBytes, msgSize)
// Check for internal subscription.
if client.kind == SYSTEM {
s := client.srv
client.mu.Unlock()
s.deliverInternalMsg(sub, c.pa.subject, c.pa.reply, msg[:msgSize])
return true
}
// Check for closed connection
if client.nc == nil {
client.mu.Unlock()
return false
}
// Queue to outbound buffer
client.queueOutbound(mh)
client.queueOutbound(msg)
client.out.pm++
// Check outbound threshold and queue IO flush if needed.
if client.out.pm > 1 && client.out.pb > maxBufSize*2 {
client.flushSignal()
}
if c.trace {
client.traceOutOp(string(mh[:len(mh)-LEN_CR_LF]), nil)
}
// Increment the flush pending signals if we are setting for the first time.
if _, ok := c.pcd[client]; !ok {
client.out.fsp++
}
client.mu.Unlock()
// Remember for when we return to the top of the loop.
c.pcd[client] = needFlush
return true
}
// pruneDenyCache will prune the deny cache via randomly
// deleting items. Doing so pruneSize items at a time.
// Lock must be held for this one since it is shared under
// deliverMsg.
func (c *client) pruneDenyCache() {
r := 0
for subject := range c.mperms.dcache {
delete(c.mperms.dcache, subject)
if r++; r > pruneSize {
break
}
}
}
// prunePubPermsCache will prune the cache via randomly
// deleting items. Doing so pruneSize items at a time.
func (c *client) prunePubPermsCache() {
r := 0
for subject := range c.perms.pcache {
delete(c.perms.pcache, subject)
if r++; r > pruneSize {
break
}
}
}
// pubAllowed checks on publish permissioning.
func (c *client) pubAllowed(subject string) bool {
if c.perms == nil || (c.perms.pub.allow == nil && c.perms.pub.deny == nil) {
return true
}
// Check if published subject is allowed if we have permissions in place.
allowed, ok := c.perms.pcache[subject]
if ok {
return allowed
}
// Cache miss, check allow then deny as needed.
if c.perms.pub.allow != nil {
r := c.perms.pub.allow.Match(subject)
allowed = len(r.psubs) != 0
} else {
// No entries means all are allowed. Deny will overrule as needed.
allowed = true
}
// If we have a deny list and are currently allowed, check that as well.
if allowed && c.perms.pub.deny != nil {
r := c.perms.pub.deny.Match(subject)
allowed = len(r.psubs) == 0
}
// Update our cache here.
c.perms.pcache[string(subject)] = allowed
// Prune if needed.
if len(c.perms.pcache) > maxPermCacheSize {
c.prunePubPermsCache()
}
return allowed
}
// Used to mimic client like replies.
const (
replyPrefix = "_R_."
replyPrefixLen = len(replyPrefix)
digits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
base = 62
)
// newServiceReply is used when rewriting replies that cross account boundaries.
// These will look like _R_.XXXXXXXX.
func (c *client) newServiceReply() []byte {
// Check to see if we have our own rand yet. Global rand
// has contention with lots of clients, etc.
if c.in.prand == nil {
c.in.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
var b = [15]byte{'_', 'R', '_', '.'}
rn := c.in.prand.Int63()
for i, l := replyPrefixLen, rn; i < len(b); i++ {
b[i] = digits[l%base]
l /= base
}
return b[:]
}
// Test whether a reply subject is a service import reply.
func isServiceReply(reply []byte) bool {
return len(reply) > 3 && string(reply[:4]) == replyPrefix
}
// This will decide to call the client code or router code.
func (c *client) processInboundMsg(msg []byte) {
switch c.kind {
case CLIENT:
c.processInboundClientMsg(msg)
case ROUTER:
c.processInboundRoutedMsg(msg)
case GATEWAY:
c.processInboundGatewayMsg(msg)
}
}
// processInboundClientMsg is called to process an inbound msg from a client.
func (c *client) processInboundClientMsg(msg []byte) {
// Update statistics
// The msg includes the CR_LF, so pull back out for accounting.
c.in.msgs++
c.in.bytes += len(msg) - LEN_CR_LF
if c.trace {
c.traceMsg(msg)
}
// Check pub permissions
if c.perms != nil && (c.perms.pub.allow != nil || c.perms.pub.deny != nil) && !c.pubAllowed(string(c.pa.subject)) {
c.pubPermissionViolation(c.pa.subject)
return
}
// Now check for reserved replies. These are used for service imports.
if isServiceReply(c.pa.reply) {
c.replySubjectViolation(c.pa.reply)
return
}
if c.opts.Verbose {
c.sendOK()
}
// Mostly under testing scenarios.
if c.srv == nil || c.acc == nil {
return
}
// Match the subscriptions. We will use our own L1 map if
// it's still valid, avoiding contention on the shared sublist.
var r *SublistResult
var ok bool
genid := atomic.LoadUint64(&c.acc.sl.genid)
if genid == c.in.genid && c.in.results != nil {
r, ok = c.in.results[string(c.pa.subject)]
} else {
// Reset our L1 completely.
c.in.results = make(map[string]*SublistResult)
c.in.genid = genid
}
// Go back to the sublist data structure.
if !ok {
r = c.acc.sl.Match(string(c.pa.subject))
c.in.results[string(c.pa.subject)] = r
// Prune the results cache. Keeps us from unbounded growth. Random delete.
if len(c.in.results) > maxResultCacheSize {
n := 0
for subject := range c.in.results {
delete(c.in.results, subject)
if n++; n > pruneSize {
break
}
}
}
}
// Check to see if we need to map/route to another account.
if c.acc.imports.services != nil {
c.checkForImportServices(c.acc, msg)
}
var qa [16][]byte
queues := qa[:0]
// Check for no interest, short circuit if so.
// This is the fanout scale.
if len(r.psubs)+len(r.qsubs) > 0 {
var qnames *[][]byte
// If we have queue subs in this cluster, then if we run in gateway
// mode and the remote gateways have queue subs, then we need to
// collect the queue groups this message was sent to so that we
// exclude them when sending to gateways.
if len(r.qsubs) > 0 && c.srv.gateway.enabled && atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
qnames = &queues
}
c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, qnames)
}
// Now deal with gateways
if c.srv.gateway.enabled {
c.sendMsgToGateways(msg, queues)
}
}
// This checks and process import services by doing the mapping and sending the
// message onward if applicable.
func (c *client) checkForImportServices(acc *Account, msg []byte) {
if acc == nil || acc.imports.services == nil {
return
}
acc.mu.RLock()
rm := acc.imports.services[string(c.pa.subject)]
invalid := rm != nil && rm.invalid
acc.mu.RUnlock()
// Get the results from the other account for the mapped "to" subject.
// If we have been marked invalid simply return here.
if rm != nil && !invalid && rm.acc != nil && rm.acc.sl != nil {
var nrr []byte
if rm.ae {
acc.removeServiceImport(rm.from)
}
if c.pa.reply != nil {
// We want to remap this to provide anonymity.
nrr = c.newServiceReply()
rm.acc.addImplicitServiceImport(acc, string(nrr), string(c.pa.reply), true, nil)
}
// FIXME(dlc) - Do L1 cache trick from above.
rr := rm.acc.sl.Match(rm.to)
c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, nil)
}
}
func (c *client) addSubToRouteTargets(sub *subscription) {
if c.in.rts == nil {
c.in.rts = make([]routeTarget, 0, routeTargetInit)
}
for i := range c.in.rts {
rt := &c.in.rts[i]
if rt.sub.client == sub.client {
if sub.queue != nil {
rt.qs = append(rt.qs, sub.queue...)
rt.qs = append(rt.qs, ' ')
}
return
}
}
// If we are here we do not have the sub yet in our list
// If we have to grow do so here.
if len(c.in.rts) == cap(c.in.rts) {
c.in.rts = append(c.in.rts, routeTarget{})
}
var rt *routeTarget
lrts := len(c.in.rts)
c.in.rts = c.in.rts[:lrts+1]
rt = &c.in.rts[lrts]
rt.sub = sub
rt.qs = rt._qs[:0]
if sub.queue != nil {
rt.qs = append(rt.qs, sub.queue...)
rt.qs = append(rt.qs, ' ')
}
}
// This processes the sublist results for a given message.
func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte, queues *[][]byte) {
// msg header for clients.
msgh := c.msgb[1:msgHeadProtoLen]
msgh = append(msgh, subject...)
msgh = append(msgh, ' ')
si := len(msgh)
// For sending messages across routes. Reset it if we have one.
// We reuse this data structure.
if c.in.rts != nil {
c.in.rts = c.in.rts[:0]
}
// Loop over all normal subscriptions that match.
for _, sub := range r.psubs {
// Check if this is a send to a ROUTER. We now process
// these after everything else.
if sub.client.kind == ROUTER {
if c.kind == ROUTER {
continue
}
c.addSubToRouteTargets(sub)
continue
} else if sub.client.kind == GATEWAY {
// Never send to gateway from here.
continue
}
// Check for stream import mapped subs. These apply to local subs only.
if sub.im != nil && sub.im.prefix != "" {
// Redo the subject here on the fly.
msgh = c.msgb[1:msgHeadProtoLen]
msgh = append(msgh, sub.im.prefix...)
msgh = append(msgh, subject...)
msgh = append(msgh, ' ')
si = len(msgh)
}
// Normal delivery
mh := c.msgHeader(msgh[:si], sub, reply)
c.deliverMsg(sub, mh, msg)
}
// If we are sourced from a route we need to have direct filtered queues.
if c.kind == ROUTER && c.pa.queues == nil {
return
}
// Set these up to optionally filter based on the queue lists.
// This is for messages received from routes which will have directed
// guidance on which queue groups we should deliver to.
qf := c.pa.queues
// For gateway connections, we still want to send messages to routes
// even if there is no queue filters.
if c.kind == GATEWAY && qf == nil {
goto sendToRoutes
}
// Check to see if we have our own rand yet. Global rand
// has contention with lots of clients, etc.
if c.in.prand == nil {
c.in.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
// Process queue subs
for i := 0; i < len(r.qsubs); i++ {
qsubs := r.qsubs[i]
// If we have a filter check that here. We could make this a map or someting more
// complex but linear search since we expect queues to be small. Should be faster
// and more cache friendly.
if qf != nil && len(qsubs) > 0 {
tqn := qsubs[0].queue
for _, qn := range qf {
if bytes.Equal(qn, tqn) {
goto selectQSub
}
}
continue
}
selectQSub:
// We will hold onto remote qsubs when we are coming from a route
// just in case we can no longer do local delivery.
var rsub *subscription
// Find a subscription that is able to deliver this message
// starting at a random index.
startIndex := c.in.prand.Intn(len(qsubs))
for i := 0; i < len(qsubs); i++ {
index := (startIndex + i) % len(qsubs)
sub := qsubs[index]
if sub == nil {
continue
}
// Potentially sending to a remote sub across a route.
if sub.client.kind == ROUTER {
if c.kind == ROUTER {
// We just came from a route, so skip and prefer local subs.
// Keep our first rsub in case all else fails.
if rsub == nil {
rsub = sub
}
continue
} else {
c.addSubToRouteTargets(sub)
if queues != nil {
*queues = append(*queues, sub.queue)
}
}
break
}
// Check for mapped subs
if sub.im != nil && sub.im.prefix != "" {
// Redo the subject here on the fly.
msgh = c.msgb[1:msgHeadProtoLen]
msgh = append(msgh, sub.im.prefix...)
msgh = append(msgh, subject...)
msgh = append(msgh, ' ')
si = len(msgh)
}
mh := c.msgHeader(msgh[:si], sub, reply)
if c.deliverMsg(sub, mh, msg) {
// Clear rsub
rsub = nil
if queues != nil {
*queues = append(*queues, sub.queue)
}
break
}
}
if rsub != nil {
// If we are here we tried to deliver to a local qsub
// but failed. So we will send it to a remote.
c.addSubToRouteTargets(rsub)
if queues != nil {
*queues = append(*queues, rsub.queue)
}
}
}
sendToRoutes:
// If no messages for routes return here.
if len(c.in.rts) == 0 {
return
}
// We address by index to avoid struct copy.
// We have inline structs for memory layout and cache coherency.
for i := range c.in.rts {
rt := &c.in.rts[i]
mh := c.msgb[:msgHeadProtoLen]
mh = append(mh, acc.Name...)
mh = append(mh, ' ')
mh = append(mh, subject...)
mh = append(mh, ' ')
if len(rt.qs) > 0 {
if reply != nil {
mh = append(mh, "+ "...) // Signal that there is a reply.
mh = append(mh, reply...)
mh = append(mh, ' ')
} else {
mh = append(mh, "| "...) // Only queues
}
mh = append(mh, rt.qs...)
} else if reply != nil {
mh = append(mh, reply...)
mh = append(mh, ' ')
}
mh = append(mh, c.pa.szb...)
mh = append(mh, _CRLF_...)
c.deliverMsg(rt.sub, mh, msg)
}
}
func (c *client) pubPermissionViolation(subject []byte) {
c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subject))
c.Errorf("Publish Violation - User %q, Subject %q", c.opts.Username, subject)
}
func (c *client) replySubjectViolation(reply []byte) {
c.sendErr(fmt.Sprintf("Permissions Violation for Publish with Reply of %q", reply))
c.Errorf("Publish Violation - User %q, Reply %q", c.opts.Username, reply)
}
func (c *client) processPingTimer() {
c.mu.Lock()
defer c.mu.Unlock()
c.ping.tmr = nil
// Check if connection is still opened
if c.nc == nil {
return
}
c.Debugf("%s Ping Timer", c.typeString())
// If we have had activity within the PingInterval no
// need to send a ping.
if delta := time.Since(c.last); delta < c.srv.getOpts().PingInterval {
c.Debugf("Delaying PING due to activity %v ago", delta.Round(time.Second))
} else {
// Check for violation
if c.ping.out+1 > c.srv.getOpts().MaxPingsOut {
c.Debugf("Stale Client Connection - Closing")
c.sendProto([]byte(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection")), true)
c.clearConnection(StaleConnection)
return
}
// Send PING
c.sendPing()
}
// Reset to fire again.
c.setPingTimer()
}
// Lock should be held
func (c *client) setPingTimer() {
if c.srv == nil {
return
}
d := c.srv.getOpts().PingInterval
c.ping.tmr = time.AfterFunc(d, c.processPingTimer)
}
// Lock should be held
func (c *client) clearPingTimer() {
if c.ping.tmr == nil {
return
}
c.ping.tmr.Stop()
c.ping.tmr = nil
}
// Lock should be held
func (c *client) setAuthTimer(d time.Duration) {
c.atmr = time.AfterFunc(d, c.authTimeout)
}
// Lock should be held
func (c *client) clearAuthTimer() bool {
if c.atmr == nil {
return true
}
stopped := c.atmr.Stop()
c.atmr = nil
return stopped
}
// We may reuse atmr for expiring user jwts,
// so check connectReceived.
func (c *client) awaitingAuth() bool {
c.mu.Lock()
authSet := !c.flags.isSet(connectReceived) && c.atmr != nil
c.mu.Unlock()
return authSet
}
// This will set the atmr for the JWT expiration time.
// We will lock on entry.
func (c *client) setExpirationTimer(d time.Duration) {
c.mu.Lock()
c.atmr = time.AfterFunc(d, c.authExpired)
c.mu.Unlock()
}
// Lock should be held
func (c *client) clearConnection(reason ClosedState) {
if c.flags.isSet(clearConnection) {
return
}
c.flags.set(clearConnection)
nc := c.nc
if nc == nil || c.srv == nil {
return
}
// Flush any pending.
c.flushOutbound()
// Clear outbound here.
c.out.sg.Broadcast()
// With TLS, Close() is sending an alert (that is doing a write).
// Need to set a deadline otherwise the server could block there
// if the peer is not reading from socket.
if c.flags.isSet(handshakeComplete) {
nc.SetWriteDeadline(time.Now().Add(c.out.wdl))
}
nc.Close()
// Do this always to also kick out any IO writes.
nc.SetWriteDeadline(time.Time{})
// Save off the connection if its a client.
if c.kind == CLIENT && c.srv != nil {
go c.srv.saveClosedClient(c, nc, reason)
}
}
func (c *client) typeString() string {
switch c.kind {
case CLIENT:
return "Client"
case ROUTER:
return "Router"
case GATEWAY:
return "Gateway"
}
return "Unknown Type"
}
// processSubsOnConfigReload removes any subscriptions the client has that are no
// longer authorized, and check for imports (accounts) due to a config reload.
func (c *client) processSubsOnConfigReload(awcsti map[string]struct{}) {
c.mu.Lock()
var (
checkPerms = c.perms != nil
checkAcc = c.acc != nil
acc = c.acc
)
if !checkPerms && !checkAcc {
c.mu.Unlock()
return
}
var (
_subs [32]*subscription
subs = _subs[:0]
_removed [32]*subscription
removed = _removed[:0]
srv = c.srv
userInfo = c.opts.Nkey
)
if userInfo == "" {
userInfo = c.opts.Username
if userInfo == "" {
userInfo = fmt.Sprintf("%v", c.cid)
}
}
if checkAcc {
// We actually only want to check if stream imports have changed.
if _, ok := awcsti[acc.Name]; !ok {
checkAcc = false
}
}
// We will clear any mperms we have here. It will rebuild on the fly with canSubscribe,
// so we do that here as we collect them. We will check result down below.
c.mperms = nil
// Collect client's subs under the lock
for _, sub := range c.subs {
// Just checking to rebuild mperms under the lock, will collect removed though here.
// Only collect under subs array of canSubscribe and checkAcc true.
if !c.canSubscribe(string(sub.subject)) {
removed = append(removed, sub)
} else if checkAcc {
subs = append(subs, sub)
}
}
c.mu.Unlock()
// This list is all subs who are allowed and we need to check accounts.
for _, sub := range subs {
c.mu.Lock()
oldShadows := sub.shadow
sub.shadow = nil
c.mu.Unlock()
c.addShadowSubscriptions(acc, sub)
for _, nsub := range oldShadows {
nsub.im.acc.sl.Remove(nsub)
}
}
// Unsubscribe all that need to be removed and report back to client and logs.
for _, sub := range removed {
c.unsubscribe(acc, sub, true)
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %q)",
sub.subject, sub.sid))
srv.Noticef("Removed sub %q (sid %q) for user %q - not authorized",
sub.subject, sub.sid, userInfo)
}
}
// Allows us to count up all the queue subscribers during close.
type qsub struct {
sub *subscription
n int32
}
func (c *client) closeConnection(reason ClosedState) {
c.mu.Lock()
if c.nc == nil {
c.mu.Unlock()
return
}
// Be consistent with the creation: for routes and gateways,
// we use Noticef on create, so use that too for delete.
if c.kind == ROUTER || c.kind == GATEWAY {
c.Noticef("%s connection closed", c.typeString())
} else {
c.Debugf("%s connection closed", c.typeString())
}
c.clearAuthTimer()
c.clearPingTimer()
c.clearConnection(reason)
c.nc = nil
var (
retryImplicit bool
connectURLs []string
gwName string
gwIsOutbound bool
gwCfg *gatewayCfg
kind = c.kind
srv = c.srv
noReconnect = c.flags.isSet(noReconnect)
acc = c.acc
)
// Snapshot for use if we are a client connection.
// FIXME(dlc) - we can just stub in a new one for client
// and reference existing one.
var subs []*subscription
if kind == CLIENT {
subs = make([]*subscription, 0, len(c.subs))
for _, sub := range c.subs {
// Auto-unsubscribe subscriptions must be unsubscribed forcibly.
sub.max = 0
subs = append(subs, sub)
}
}
if c.route != nil {
if !noReconnect {
retryImplicit = c.route.retry
}
connectURLs = c.route.connectURLs
}
if kind == GATEWAY {
gwName = c.gw.name
gwIsOutbound = c.gw.outbound
gwCfg = c.gw.cfg
}
c.mu.Unlock()
// Remove clients subscriptions.
if kind == CLIENT {
acc.sl.RemoveBatch(subs)
} else {
go c.removeRemoteSubs()
}
if srv != nil {
// This is a route that disconnected, but we are not in lame duck mode...
if len(connectURLs) > 0 && !srv.isLameDuckMode() {
// Unless disabled, possibly update the server's INFO protocol
// and send to clients that know how to handle async INFOs.
if !srv.getOpts().Cluster.NoAdvertise {
srv.removeClientConnectURLsAndSendINFOToClients(connectURLs)
}
}
// Unregister
srv.removeClient(c)
// Update remote subscriptions.
if acc != nil && kind == CLIENT {
qsubs := map[string]*qsub{}
for _, sub := range subs {
if sub.queue == nil {
srv.updateRouteSubscriptionMap(acc, sub, -1)
} else {
// We handle queue subscribers special in case we
// have a bunch we can just send one update to the
// connected routes.
key := string(sub.subject) + " " + string(sub.queue)
if esub, ok := qsubs[key]; ok {
esub.n++
} else {
qsubs[key] = &qsub{sub, 1}
}
}
}
// Process any qsubs here.
for _, esub := range qsubs {
srv.updateRouteSubscriptionMap(acc, esub.sub, -(esub.n))
}
if prev := c.acc.removeClient(c); prev == 1 && c.srv != nil {
c.srv.mu.Lock()
c.srv.activeAccounts--
c.srv.mu.Unlock()
}
}
}
// Don't reconnect connections that have been marked with
// the no reconnect flag.
if noReconnect {
return
}
// Check for a solicited route. If it was, start up a reconnect unless
// we are already connected to the other end.
if c.isSolicitedRoute() || retryImplicit {
// Capture these under lock
c.mu.Lock()
rid := c.route.remoteID
rtype := c.route.routeType
rurl := c.route.url
c.mu.Unlock()
srv.mu.Lock()
defer srv.mu.Unlock()
// It is possible that the server is being shutdown.
// If so, don't try to reconnect
if !srv.running {
return
}
if rid != "" && srv.remotes[rid] != nil {
c.srv.Debugf("Not attempting reconnect for solicited route, already connected to \"%s\"", rid)
return
} else if rid == srv.info.ID {
c.srv.Debugf("Detected route to self, ignoring \"%s\"", rurl)
return
} else if rtype != Implicit || retryImplicit {
c.srv.Debugf("Attempting reconnect for solicited route \"%s\"", rurl)
// Keep track of this go-routine so we can wait for it on
// server shutdown.
srv.startGoRoutine(func() { srv.reConnectToRoute(rurl, rtype) })
}
} else if srv != nil && kind == GATEWAY && gwIsOutbound {
if gwCfg != nil {
srv.Debugf("Attempting reconnect for gateway %q", gwName)
// Run this as a go routine since we may be called within
// the solicitGateway itself if there was an error during
// the creation of the gateway connection.
srv.startGoRoutine(func() { srv.reconnectGateway(gwCfg) })
} else {
srv.Debugf("Gateway %q not in configuration, not attempting reconnect", gwName)
}
}
}
// Set the noReconnect flag. This is used before a call to closeConnection()
// to prevent the connection to reconnect (routes, gateways).
func (c *client) setNoReconnect() {
c.mu.Lock()
c.flags.set(noReconnect)
c.mu.Unlock()
}
// Returns the client's RTT value with the protection of the client's lock.
func (c *client) getRTTValue() time.Duration {
c.mu.Lock()
rtt := c.rtt
c.mu.Unlock()
return rtt
}
// This function is used by ROUTER and GATEWAY connections to
// look for a subject on a given account (since these type of
// connections are not bound to a specific account).
// If the c.pa.subject is found in the cache, the cached result
// is returned, otherwse, we match the account's sublist and update
// the cache. The cache is pruned if reaching a certain size.
func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) {
var (
acc *Account
pac *perAccountCache
r *SublistResult
ok bool
)
// Check our cache.
if pac, ok = c.in.pacache[string(c.pa.pacache)]; ok {
// Check the genid to see if it's still valid.
if genid := atomic.LoadUint64(&pac.acc.sl.genid); genid != pac.genid {
ok = false
delete(c.in.pacache, string(c.pa.pacache))
} else {
acc = pac.acc
r = pac.results
}
}
if !ok {
// Match correct account and sublist.
acc = c.srv.LookupAccount(string(c.pa.account))
if acc == nil {
return nil, nil
}
// Match against the account sublist.
r = acc.sl.Match(string(c.pa.subject))
// Store in our cache
c.in.pacache[string(c.pa.pacache)] = &perAccountCache{acc, r, atomic.LoadUint64(&acc.sl.genid)}
// Check if we need to prune.
if len(c.in.pacache) > maxPerAccountCacheSize {
c.prunePerAccountCache()
}
}
return acc, r
}
// prunePerAccountCache will prune off a random number of cache entries.
func (c *client) prunePerAccountCache() {
n := 0
for cacheKey := range c.in.pacache {
delete(c.in.pacache, cacheKey)
if n++; n > prunePerAccountCacheSize {
break
}
}
}
// Logging functionality scoped to a client or route.
func (c *client) Errorf(format string, v ...interface{}) {
format = fmt.Sprintf("%s - %s", c, format)
c.srv.Errorf(format, v...)
}
func (c *client) Debugf(format string, v ...interface{}) {
format = fmt.Sprintf("%s - %s", c, format)
c.srv.Debugf(format, v...)
}
func (c *client) Noticef(format string, v ...interface{}) {
format = fmt.Sprintf("%s - %s", c, format)
c.srv.Noticef(format, v...)
}
func (c *client) Tracef(format string, v ...interface{}) {
format = fmt.Sprintf("%s - %s", c, format)
c.srv.Tracef(format, v...)
}