Merge pull request #928 from nats-io/leafnodes

Basic Leafnode support.
This commit is contained in:
Derek Collison
2019-03-26 10:18:31 -07:00
committed by GitHub
21 changed files with 3195 additions and 207 deletions

View File

@@ -27,19 +27,12 @@ import (
"github.com/nats-io/jwt"
)
// For backwards compatibility, users who are not explicitly defined into an
// For backwards compatibility with NATS < 2.0, users who are not explicitly defined into an
// account will be grouped in the default global account.
const globalAccountName = "$G"
// Route Map Entry - used for efficient interest graph propagation.
// TODO(dlc) - squeeze even more?
type rme struct {
qi int32 // used to index into key from map for optional queue name
n int32 // number of subscriptions directly matching, local subs only.
}
// Account are subject namespace definitions. By default no messages are shared between accounts.
// You can share via exports and imports of streams and services.
// You can share via Exports and Imports of Streams and Services.
type Account struct {
Name string
Nkey string
@@ -54,7 +47,7 @@ type Account struct {
nrclients int32
sysclients int32
clients map[*client]*client
rm map[string]*rme
rm map[string]int32
imports importMap
exports exportMap
limits
@@ -459,7 +452,7 @@ func (a *Account) AddStreamImport(account *Account, from, prefix string) error {
return a.AddStreamImportWithClaim(account, from, prefix, nil)
}
// IsPublicExport is a placeholder to denote public export.
// IsPublicExport is a placeholder to denote a public export.
var IsPublicExport = []*Account(nil)
// AddStreamExport will add an export to the account. If accounts is nil
@@ -824,6 +817,11 @@ func (s *Server) AccountResolver() AccountResolver {
return s.accResolver
}
// UpdateAccountClaims will call updateAccountClaims.
func (s *Server) UpdateAccountClaims(a *Account, ac *jwt.AccountClaims) {
s.updateAccountClaims(a, ac)
}
// updateAccountClaims will update and existing account with new claims.
// This will replace any exports or imports previously defined.
func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {

View File

@@ -235,6 +235,8 @@ func (s *Server) checkAuthentication(c *client) bool {
return s.isRouterAuthorized(c)
case GATEWAY:
return s.isGatewayAuthorized(c)
case LEAF:
return s.isLeafNodeAuthorized(c)
default:
return false
}
@@ -527,6 +529,108 @@ func (s *Server) isGatewayAuthorized(c *client) bool {
return comparePasswords(opts.Gateway.Password, c.opts.Password)
}
// isLeafNodeAuthorized will check for auth for an inbound leaf node connection.
func (s *Server) isLeafNodeAuthorized(c *client) bool {
// FIXME(dlc) - This is duplicated from client auth, should be able to combine
// and not fail so bad on DRY.
// Grab under lock but process after.
var (
juc *jwt.UserClaims
acc *Account
err error
)
s.mu.Lock()
// Check if we have trustedKeys defined in the server. If so we require a user jwt.
if s.trustedKeys != nil {
if c.opts.JWT == "" {
s.mu.Unlock()
c.Debugf("Authentication requires a user JWT")
return false
}
// So we have a valid user jwt here.
juc, err = jwt.DecodeUserClaims(c.opts.JWT)
if err != nil {
s.mu.Unlock()
c.Debugf("User JWT not valid: %v", err)
return false
}
vr := jwt.CreateValidationResults()
juc.Validate(vr)
if vr.IsBlocking(true) {
s.mu.Unlock()
c.Debugf("User JWT no longer valid: %+v", vr)
return false
}
}
s.mu.Unlock()
// If we have a jwt and a userClaim, make sure we have the Account, etc associated.
// We need to look up the account. This will use an account resolver if one is present.
if juc != nil {
if acc, _ = s.LookupAccount(juc.Issuer); acc == nil {
c.Debugf("Account JWT can not be found")
return false
}
// FIXME(dlc) - Add in check for account allowed to do leaf nodes?
// Bool or active count like client?
if !s.isTrustedIssuer(acc.Issuer) {
c.Debugf("Account JWT not signed by trusted operator")
return false
}
if acc.IsExpired() {
c.Debugf("Account JWT has expired")
return false
}
// Verify the signature against the nonce.
if c.opts.Sig == "" {
c.Debugf("Signature missing")
return false
}
sig, err := base64.RawURLEncoding.DecodeString(c.opts.Sig)
if err != nil {
// Allow fallback to normal base64.
sig, err = base64.StdEncoding.DecodeString(c.opts.Sig)
if err != nil {
c.Debugf("Signature not valid base64")
return false
}
}
pub, err := nkeys.FromPublicKey(juc.Subject)
if err != nil {
c.Debugf("User nkey not valid: %v", err)
return false
}
if err := pub.Verify(c.nonce, sig); err != nil {
c.Debugf("Signature not verified")
return false
}
nkey := buildInternalNkeyUser(juc, acc)
c.RegisterNkeyUser(nkey)
// Generate a connect event if we have a system account.
// FIXME(dlc) - Make one for leafnodes if we track active connections.
// Check if we need to set an auth timer if the user jwt expires.
c.checkExpiration(juc.Claims())
return true
}
// Snapshot server options.
opts := s.getOpts()
if opts.LeafNode.Username == "" {
return true
}
if opts.LeafNode.Username != c.opts.Username {
return false
}
return comparePasswords(opts.LeafNode.Password, c.opts.Password)
}
// Support for bcrypt stored passwords and tokens.
const bcryptPrefix = "$2a$"

View File

@@ -40,6 +40,8 @@ const (
GATEWAY
// SYSTEM is an internal system client.
SYSTEM
// LEAF is for leaf node connections.
LEAF
)
const (
@@ -176,8 +178,8 @@ type client struct {
rttStart time.Time
route *route
gw *gateway
gw *gateway
leaf *leaf
debug bool
trace bool
@@ -392,6 +394,8 @@ func (c *client) initClient() {
c.ncs = fmt.Sprintf("%s - rid:%d", conn, c.cid)
case GATEWAY:
c.ncs = fmt.Sprintf("%s - gid:%d", conn, c.cid)
case LEAF:
c.ncs = fmt.Sprintf("%s - lid:%d", conn, c.cid)
case SYSTEM:
c.ncs = "SYSTEM"
}
@@ -450,11 +454,6 @@ func (c *client) registerWithAccount(acc *Account) error {
return nil
}
// Helper to determine if we have exceeded max subs.
func (c *client) subsExceeded() bool {
return c.msubs != jwt.NoLimit && len(c.subs) > int(c.msubs)
}
// Helper to determine if we have met or exceeded max subs.
func (c *client) subsAtLimit() bool {
return c.msubs != jwt.NoLimit && len(c.subs) >= int(c.msubs)
@@ -490,7 +489,7 @@ func (c *client) applyAccountLimits() {
c.msubs = int32(opts.MaxSubs)
}
if c.subsExceeded() {
if c.subsAtLimit() {
go func() {
c.maxSubsExceeded()
time.Sleep(20 * time.Millisecond)
@@ -1016,6 +1015,8 @@ func (c *client) processInfo(arg []byte) error {
c.processRouteInfo(&info)
case GATEWAY:
c.processGatewayInfo(&info)
case LEAF:
c.processLeafnodeInfo(&info)
}
return nil
}
@@ -1179,6 +1180,9 @@ func (c *client) processConnect(arg []byte) error {
case GATEWAY:
// Delegate the rest of processing to the gateway
return c.processGatewayConnect(arg)
case LEAF:
// Delegate the rest of processing to the leaf node
return c.processLeafNodeConnect(srv, arg, lang)
}
return nil
}
@@ -1258,7 +1262,7 @@ func (c *client) maxPayloadViolation(sz int, max int32) {
c.closeConnection(MaxPayloadExceeded)
}
// queueOutbound queues data for client/route connections.
// queueOutbound queues data for a clientconnection.
// Return if the data is referenced or not. If referenced, the caller
// should not reuse the `data` array.
// Lock should be held.
@@ -1581,8 +1585,12 @@ func (c *client) processSub(argo []byte) (err error) {
c.mu.Lock()
// Grab connection type.
// Grab connection type, account and server info.
kind := c.kind
acc := c.acc
srv := c.srv
sid := string(sub.sid)
if c.nc == nil && kind != SYSTEM {
c.mu.Unlock()
@@ -1590,12 +1598,7 @@ func (c *client) processSub(argo []byte) (err error) {
}
// Check permissions if applicable.
if kind == ROUTER {
if !c.canExport(string(sub.subject)) {
c.mu.Unlock()
return nil
}
} else if kind == CLIENT && !c.canSubscribe(string(sub.subject)) {
if kind == CLIENT && !c.canSubscribe(string(sub.subject)) {
c.mu.Unlock()
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
c.Errorf("Subscription Violation - %s, Subject %q, SID %s",
@@ -1610,9 +1613,6 @@ func (c *client) processSub(argo []byte) (err error) {
return nil
}
sid := string(sub.sid)
acc := c.acc
updateGWs := false
// Subscribe here.
if c.subs[sid] == nil {
@@ -1626,6 +1626,7 @@ func (c *client) processSub(argo []byte) (err error) {
}
}
}
// Unlocked from here onward
c.mu.Unlock()
if err != nil {
@@ -1635,19 +1636,23 @@ func (c *client) processSub(argo []byte) (err error) {
c.sendOK()
}
if acc != nil {
if err := c.addShadowSubscriptions(acc, sub); err != nil {
c.Errorf(err.Error())
}
// If we are routing and this is a local sub, add to the route map for the associated account.
if kind == CLIENT || kind == SYSTEM {
c.srv.updateRouteSubscriptionMap(acc, sub, 1)
if updateGWs {
c.srv.gatewayUpdateSubInterest(acc.Name, sub, 1)
}
}
// No account just return.
if acc == nil {
return nil
}
if err := c.addShadowSubscriptions(acc, sub); err != nil {
c.Errorf(err.Error())
}
// If we are routing and this is a local sub, add to the route map for the associated account.
if kind == CLIENT || kind == SYSTEM {
srv.updateRouteSubscriptionMap(acc, sub, 1)
if updateGWs {
srv.gatewayUpdateSubInterest(acc.Name, sub, 1)
}
}
// Now check on leafnode updates.
srv.updateLeafNodes(acc, sub, 1)
return nil
}
@@ -1834,7 +1839,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force bool) {
shadowSubs := sub.shadow
sub.shadow = nil
if len(shadowSubs) > 0 {
updateRoute = (c.kind == CLIENT || c.kind == SYSTEM) && c.srv != nil
updateRoute = (c.kind == CLIENT || c.kind == SYSTEM || c.kind == LEAF) && c.srv != nil
}
c.mu.Unlock()
@@ -1844,6 +1849,8 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force bool) {
} else if updateRoute {
c.srv.updateRouteSubscriptionMap(nsub.im.acc, nsub, -1)
}
// Now check on leafnode updates.
c.srv.updateLeafNodes(nsub.im.acc, nsub, -1)
}
}
@@ -1874,6 +1881,7 @@ func (c *client) processUnsub(arg []byte) error {
// Grab connection type.
kind := c.kind
srv := c.srv
var acc *Account
updateGWs := false
@@ -1886,7 +1894,7 @@ func (c *client) processUnsub(arg []byte) error {
sub.max = 0
unsub = true
}
updateGWs = c.srv.gateway.enabled
updateGWs = srv.gateway.enabled
}
c.mu.Unlock()
@@ -1897,11 +1905,13 @@ func (c *client) processUnsub(arg []byte) error {
if unsub {
c.unsubscribe(acc, sub, false)
if acc != nil && kind == CLIENT || kind == SYSTEM {
c.srv.updateRouteSubscriptionMap(acc, sub, -1)
srv.updateRouteSubscriptionMap(acc, sub, -1)
if updateGWs {
c.srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
}
}
// Now check on leafnode updates.
srv.updateLeafNodes(acc, sub, -1)
}
return nil
@@ -1956,6 +1966,8 @@ func (c *client) stalledWait(producer *client) {
// Used to treat maps as efficient set
var needFlush = struct{}{}
// deliverMsg will deliver a message to a matching subscription and its underlying client.
// We process all connection/client types. mh is the part that will be protocol/client specific.
func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
if sub.client == nil {
return false
@@ -2174,6 +2186,8 @@ func (c *client) processInboundMsg(msg []byte) {
c.processInboundRoutedMsg(msg)
case GATEWAY:
c.processInboundGatewayMsg(msg)
case LEAF:
c.processInboundLeafMsg(msg)
}
}
@@ -2303,6 +2317,7 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
if (c.kind == ROUTER || c.kind == GATEWAY) && c.pa.queues == nil && len(rr.qsubs) > 0 {
c.makeQFilter(rr.qsubs)
}
sendToGWs := c.srv.gateway.enabled && (c.kind == CLIENT || c.kind == SYSTEM)
queues := c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, sendToGWs)
// If this is not a gateway connection but gateway is enabled,
@@ -2367,15 +2382,21 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
for _, sub := range r.psubs {
// Check if this is a send to a ROUTER. We now process
// these after everything else.
if sub.client.kind == ROUTER {
switch sub.client.kind {
case ROUTER:
if c.kind == ROUTER {
continue
}
c.addSubToRouteTargets(sub)
continue
} else if sub.client.kind == GATEWAY {
case GATEWAY:
// Never send to gateway from here.
continue
case LEAF:
// We handle similarly to routes and use the same data structures.
// Leaf node delivery audience is different however.
c.addSubToRouteTargets(sub)
continue
}
// Check for stream import mapped subs. These apply to local subs only.
if sub.im != nil && sub.im.prefix != "" {
@@ -2391,20 +2412,27 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
c.deliverMsg(sub, mh, msg)
}
// If we are sourced from a route we need to have direct filtered queues.
if c.kind == ROUTER && c.pa.queues == nil {
return queues
}
// Set these up to optionally filter based on the queue lists.
// This is for messages received from routes which will have directed
// guidance on which queue groups we should deliver to.
qf := c.pa.queues
// For route connections, we still want to send messages to
// leaf nodes even if there are no queue filters since we collect
// them above and do not process inline like normal clients.
if c.kind == ROUTER && qf == nil {
goto sendToRoutesOrLeafs
}
// If we are sourced from a route or leaf node we need to have direct filtered queues.
if (c.kind == ROUTER || c.kind == LEAF) && qf == nil {
return queues
}
// For gateway connections, we still want to send messages to routes
// even if there are no queue filters.
// and leaf nodes even if there are no queue filters.
if c.kind == GATEWAY && qf == nil {
goto sendToRoutes
goto sendToRoutesOrLeafs
}
// Check to see if we have our own rand yet. Global rand
@@ -2414,7 +2442,6 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
}
// Process queue subs
for i := 0; i < len(r.qsubs); i++ {
qsubs := r.qsubs[i]
// If we have a filter check that here. We could make this a map or someting more
@@ -2432,8 +2459,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
selectQSub:
// We will hold onto remote qsubs when we are coming from a route
// just in case we can no longer do local delivery.
// We will hold onto remote or lead qsubs when we are coming from
// a route or a leaf node just in case we can no longer do local delivery.
var rsub *subscription
// Find a subscription that is able to deliver this message
@@ -2445,10 +2472,11 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
if sub == nil {
continue
}
// Potentially sending to a remote sub across a route.
if sub.client.kind == ROUTER {
if c.kind == ROUTER {
// We just came from a route, so skip and prefer local subs.
kind := sub.client.kind
// Potentially sending to a remote sub across a route or leaf node.
if kind == ROUTER || kind == LEAF {
if c.kind == ROUTER || c.kind == LEAF || (c.kind == CLIENT && kind == LEAF) {
// We just came from a route/leaf, so skip and prefer local subs.
// Keep our first rsub in case all else fails.
if rsub == nil {
rsub = sub
@@ -2462,6 +2490,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
}
break
}
// Check for mapped subs
if sub.im != nil && sub.im.prefix != "" {
// Redo the subject here on the fly.
@@ -2485,7 +2514,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
if rsub != nil {
// If we are here we tried to deliver to a local qsub
// but failed. So we will send it to a remote.
// but failed. So we will send it to a remote or leaf node.
c.addSubToRouteTargets(rsub)
if collect {
queues = append(queues, rsub.queue)
@@ -2493,7 +2522,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
}
}
sendToRoutes:
sendToRoutesOrLeafs:
// If no messages for routes return here.
if len(c.in.rts) == 0 {
@@ -2504,10 +2533,19 @@ sendToRoutes:
// We have inline structs for memory layout and cache coherency.
for i := range c.in.rts {
rt := &c.in.rts[i]
kind := rt.sub.client.kind
mh := c.msgb[:msgHeadProtoLen]
mh = append(mh, acc.Name...)
mh = append(mh, ' ')
if kind == ROUTER {
mh = append(mh, acc.Name...)
mh = append(mh, ' ')
} else {
// Leaf nodes are LMSG
mh[0] = 'L'
// Remap subject if its a shadow subscription, treat like a normal client.
if rt.sub.im != nil && rt.sub.im.prefix != "" {
mh = append(mh, rt.sub.im.prefix...)
}
}
mh = append(mh, subject...)
mh = append(mh, ' ')
@@ -2671,6 +2709,8 @@ func (c *client) typeString() string {
return "Router"
case GATEWAY:
return "Gateway"
case LEAF:
return "LeafNode"
}
return "Unknown Type"
}
@@ -2755,7 +2795,7 @@ func (c *client) closeConnection(reason ClosedState) {
// we use Noticef on create, so use that too for delete.
if c.kind == ROUTER || c.kind == GATEWAY {
c.Noticef("%s connection closed", c.typeString())
} else {
} else { // Client and Leaf Node connections.
c.Debugf("%s connection closed", c.typeString())
}
@@ -2780,7 +2820,7 @@ func (c *client) closeConnection(reason ClosedState) {
// FIXME(dlc) - we can just stub in a new one for client
// and reference existing one.
var subs []*subscription
if kind == CLIENT {
if kind == CLIENT || kind == LEAF {
subs = make([]*subscription, 0, len(c.subs))
for _, sub := range c.subs {
// Auto-unsubscribe subscriptions must be unsubscribed forcibly.
@@ -2803,8 +2843,8 @@ func (c *client) closeConnection(reason ClosedState) {
c.mu.Unlock()
// Remove clients subscriptions.
if kind == CLIENT {
// Remove client's or leaf node subscriptions.
if kind == CLIENT || kind == LEAF && acc != nil {
acc.sl.RemoveBatch(subs)
} else if kind == ROUTER {
go c.removeRemoteSubs()
@@ -2824,7 +2864,7 @@ func (c *client) closeConnection(reason ClosedState) {
srv.removeClient(c)
// Update remote subscriptions.
if acc != nil && kind == CLIENT {
if acc != nil && (kind == CLIENT || kind == LEAF) {
qsubs := map[string]*qsub{}
for _, sub := range subs {
if sub.queue == nil {
@@ -2843,15 +2883,18 @@ func (c *client) closeConnection(reason ClosedState) {
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
}
// Now check on leafnode updates.
srv.updateLeafNodes(acc, sub, -1)
}
// Process any qsubs here.
for _, esub := range qsubs {
srv.updateRouteSubscriptionMap(acc, esub.sub, -(esub.n))
srv.updateLeafNodes(acc, esub.sub, -(esub.n))
}
if prev := c.acc.removeClient(c); prev == 1 && c.srv != nil {
c.srv.mu.Lock()
c.srv.activeAccounts--
c.srv.mu.Unlock()
if prev := acc.removeClient(c); prev == 1 && srv != nil {
srv.mu.Lock()
srv.activeAccounts--
srv.mu.Unlock()
}
}
}
@@ -2882,13 +2925,13 @@ func (c *client) closeConnection(reason ClosedState) {
}
if rid != "" && srv.remotes[rid] != nil {
c.srv.Debugf("Not attempting reconnect for solicited route, already connected to \"%s\"", rid)
srv.Debugf("Not attempting reconnect for solicited route, already connected to \"%s\"", rid)
return
} else if rid == srv.info.ID {
c.srv.Debugf("Detected route to self, ignoring \"%s\"", rurl)
srv.Debugf("Detected route to self, ignoring \"%s\"", rurl)
return
} else if rtype != Implicit || retryImplicit {
c.srv.Debugf("Attempting reconnect for solicited route \"%s\"", rurl)
srv.Debugf("Attempting reconnect for solicited route \"%s\"", rurl)
// Keep track of this go-routine so we can wait for it on
// server shutdown.
srv.startGoRoutine(func() { srv.reConnectToRoute(rurl, rtype) })
@@ -2903,6 +2946,10 @@ func (c *client) closeConnection(reason ClosedState) {
} else {
srv.Debugf("Gateway %q not in configuration, not attempting reconnect", gwName)
}
} else if c.isSolicitedLeafNode() {
// Check if this is a solicited leaf node. Start up a reconnect.
// FIXME(dlc) - use the connectedURLs info like a normal client.
srv.startGoRoutine(func() { srv.reConnectToRemoteLeafNode(c.leaf.remote) })
}
}
@@ -2967,6 +3014,16 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) {
return acc, r
}
// Account will return the associated account for this client.
func (c *client) Account() *Account {
if c == nil {
return nil
}
c.mu.Lock()
defer c.mu.Unlock()
return c.acc
}
// prunePerAccountCache will prune off a random number of cache entries.
func (c *client) prunePerAccountCache() {
n := 0

View File

@@ -121,9 +121,6 @@ const (
// MAX_PUB_ARGS Maximum possible number of arguments from PUB proto.
MAX_PUB_ARGS = 3
// DEFAULT_REMOTE_QSUBS_SWEEPER is how often we sweep for orphans. Deprecated
DEFAULT_REMOTE_QSUBS_SWEEPER = 30 * time.Second
// DEFAULT_MAX_CLOSED_CLIENTS is the maximum number of closed connections we hold onto.
DEFAULT_MAX_CLOSED_CLIENTS = 10000
@@ -136,4 +133,7 @@ const (
// DEFAULT_LAME_DUCK_DURATION is the time in which the server spreads
// the closing of clients when signaled to go in lame duck mode.
DEFAULT_LAME_DUCK_DURATION = 2 * time.Minute
// DEFAULT_LEAFNODE_INFO_WAIT Route dial timeout.
DEFAULT_LEAFNODE_INFO_WAIT = 1 * time.Second
)

View File

@@ -1,4 +1,4 @@
// Copyright 2012-2018 The NATS Authors
// Copyright 2012-2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -59,6 +59,10 @@ var (
// attempted to connect to the route listen port.
ErrClientConnectedToRoutePort = errors.New("attempted to connect to route port")
// ErrClientConnectedToLeafNodePort represents an error condition when a client
// attempted to connect to the leaf node listen port.
ErrClientConnectedToLeafNodePort = errors.New("attempted to connect to leaf node port")
// ErrAccountExists is returned when an account is attempted to be registered
// but already exists.
ErrAccountExists = errors.New("account exists")

View File

@@ -26,17 +26,18 @@ import (
)
const (
connectEventSubj = "$SYS.ACCOUNT.%s.CONNECT"
disconnectEventSubj = "$SYS.ACCOUNT.%s.DISCONNECT"
accConnsReqSubj = "$SYS.REQ.ACCOUNT.%s.CONNS"
accUpdateEventSubj = "$SYS.ACCOUNT.%s.CLAIMS.UPDATE"
connsRespSubj = "$SYS._INBOX_.%s"
accConnsEventSubj = "$SYS.SERVER.ACCOUNT.%s.CONNS"
shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN"
authErrorEventSubj = "$SYS.SERVER.%s.CLIENT.AUTH.ERR"
serverStatsSubj = "$SYS.SERVER.%s.STATSZ"
serverStatsReqSubj = "$SYS.REQ.SERVER.%s.STATSZ"
serverStatsPingReqSubj = "$SYS.REQ.SERVER.PING"
connectEventSubj = "$SYS.ACCOUNT.%s.CONNECT"
disconnectEventSubj = "$SYS.ACCOUNT.%s.DISCONNECT"
accConnsReqSubj = "$SYS.REQ.ACCOUNT.%s.CONNS"
accUpdateEventSubj = "$SYS.ACCOUNT.%s.CLAIMS.UPDATE"
connsRespSubj = "$SYS._INBOX_.%s"
accConnsEventSubj = "$SYS.SERVER.ACCOUNT.%s.CONNS"
shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN"
authErrorEventSubj = "$SYS.SERVER.%s.CLIENT.AUTH.ERR"
serverStatsSubj = "$SYS.SERVER.%s.STATSZ"
serverStatsReqSubj = "$SYS.REQ.SERVER.%s.STATSZ"
serverStatsPingReqSubj = "$SYS.REQ.SERVER.PING"
leafNodeConnectEventSubj = "$SYS.ACCOUNT.%s.LEAFNODE.CONNECT"
shutdownEventTokens = 4
serverSubjectIndex = 2
@@ -437,7 +438,6 @@ func (s *Server) initEventTracking() {
if _, err := s.sysSubscribe(subject, s.accountClaimUpdate); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}
// Listen for requests for our statsz.
subject = fmt.Sprintf(serverStatsReqSubj, s.info.ID)
if _, err := s.sysSubscribe(subject, s.statszReq); err != nil {
@@ -447,6 +447,12 @@ func (s *Server) initEventTracking() {
if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.statszReq); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}
// Listen for updates when leaf nodes connect for a given account. This will
// force any gateway connections to move to `modeInterestOnly`
subject = fmt.Sprintf(leafNodeConnectEventSubj, "*")
if _, err := s.sysSubscribe(subject, s.leafNodeConnected); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}
}
// accountClaimUpdate will receive claim updates for accounts.
@@ -466,7 +472,7 @@ func (s *Server) accountClaimUpdate(sub *subscription, subject, reply string, ms
}
// processRemoteServerShutdown will update any affected accounts.
// Will upidate the remote count for clients.
// Will update the remote count for clients.
// Lock assume held.
func (s *Server) processRemoteServerShutdown(sid string) {
for _, a := range s.accounts {
@@ -571,6 +577,29 @@ func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []by
}
}
// leafNodeConnected is an event we will receive when a leaf node for a given account
// connects.
func (s *Server) leafNodeConnected(sub *subscription, subject, reply string, msg []byte) {
m := accNumConnsReq{}
if err := json.Unmarshal(msg, &m); err != nil {
s.sys.client.Errorf("Error unmarshalling account connections request message: %v", err)
return
}
s.mu.Lock()
gateway := s.gateway
if m.Account == "" || !s.eventsEnabled() || !gateway.enabled {
s.mu.Unlock()
return
}
acc, _ := s.lookupAccount(m.Account)
s.mu.Unlock()
if acc != nil {
s.switchAccountToInterestMode(acc.Name)
}
}
// statszReq is a request for us to respond with current statz.
func (s *Server) statszReq(sub *subscription, subject, reply string, msg []byte) {
s.mu.Lock()
@@ -636,6 +665,24 @@ func (s *Server) enableAccountTracking(a *Account) {
s.sendInternalMsg(subj, reply, &m.Server, &m)
}
// Event on leaf node connect.
// Lock should NOT be held on entry.
func (s *Server) sendLeafNodeConnect(a *Account) {
s.mu.Lock()
// If we do not have any gateways defined this should also be a no-op.
// FIXME(dlc) - if we do accounting for operator limits might have to send regardless.
if a == nil || !s.eventsEnabled() || !s.gateway.enabled {
s.mu.Unlock()
return
}
subj := fmt.Sprintf(leafNodeConnectEventSubj, a.Name)
m := accNumConnsReq{Account: a.Name}
s.sendInternalMsg(subj, "", &m.Server, &m)
s.mu.Unlock()
s.switchAccountToInterestMode(a.Name)
}
// FIXME(dlc) - make configurable.
const eventsHBInterval = 30 * time.Second

View File

@@ -1008,7 +1008,7 @@ func TestSystemAccountWithGateways(t *testing.T) {
nca.Flush()
// If this tests fails with wrong number after 10 seconds we may have
// added a new inititial subscription for the eventing system.
checkExpectedSubs(t, 8, sa)
checkExpectedSubs(t, 9, sa)
// Create a client on B and see if we receive the event
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)

View File

@@ -1194,6 +1194,11 @@ func (s *Server) processImplicitGateway(info *Info) {
})
}
// NumOutboundGateways is public here mostly for testing.
func (s *Server) NumOutboundGateways() int {
return s.numOutboundGateways()
}
// Returns the number of outbound gateway connections
func (s *Server) numOutboundGateways() int {
s.gateway.RLock()
@@ -1540,8 +1545,22 @@ func (c *client) processGatewayRUnsub(arg []byte) error {
return fmt.Errorf("processGatewaySubjectUnsub %s", err.Error())
}
var e *outsie
var useSl, newe bool
var (
e *outsie
useSl bool
newe bool
callUpdate bool
srv *Server
sub *subscription
)
// Possibly execute this on exit after all locks have been released.
// If callUpdate is true, srv and sub will be not nil.
defer func() {
if callUpdate {
srv.updateInterestForAccountOnGateway(accName, sub, -1)
}
}()
c.mu.Lock()
defer c.mu.Unlock()
@@ -1566,9 +1585,10 @@ func (c *client) processGatewayRUnsub(arg []byte) error {
// This is when a sub or queue sub is supposed to be in
// the sublist. Look for it and remove.
if useSl {
var ok bool
key := arg
// m[string()] does not cause mem allocation
sub, ok := c.subs[string(key)]
sub, ok = c.subs[string(key)]
// if RS- for a sub that we don't have, just ignore.
if !ok {
return nil
@@ -1586,6 +1606,9 @@ func (c *client) processGatewayRUnsub(arg []byte) error {
c.gw.outsim.Delete(accName)
}
}
// We are going to call updateInterestForAccountOnGateway on exit.
srv = c.srv
callUpdate = true
} else {
e.ni[string(subject)] = struct{}{}
if newe {
@@ -1624,8 +1647,22 @@ func (c *client) processGatewayRSub(arg []byte) error {
accName := args[0]
subject := args[1]
var e *outsie
var useSl, newe bool
var (
e *outsie
useSl bool
newe bool
callUpdate bool
srv *Server
sub *subscription
)
// Possibly execute this on exit after all locks have been released.
// If callUpdate is true, srv and sub will be not nil.
defer func() {
if callUpdate {
srv.updateInterestForAccountOnGateway(string(accName), sub, 1)
}
}()
c.mu.Lock()
defer c.mu.Unlock()
@@ -1676,7 +1713,7 @@ func (c *client) processGatewayRSub(arg []byte) error {
csubject = make([]byte, len(subject))
copy(csubject, subject)
}
sub := &subscription{client: c, subject: csubject, queue: cqueue, qw: qw}
sub = &subscription{client: c, subject: csubject, queue: cqueue, qw: qw}
// If no error inserting in sublist...
if e.sl.Insert(sub) == nil {
c.subs[string(key)] = sub
@@ -1688,6 +1725,9 @@ func (c *client) processGatewayRSub(arg []byte) error {
c.gw.outsim.Store(string(accName), e)
}
}
// We are going to call updateInterestForAccountOnGateway on exit.
srv = c.srv
callUpdate = true
} else {
subj := string(subject)
// If this is an RS+ for a wc subject, then
@@ -1748,6 +1788,27 @@ func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) {
return psi, r
}
// switchAccountToInterestMode will switch an account over to interestMode.
// Lock should NOT be held.
func (s *Server) switchAccountToInterestMode(accName string) {
gwsa := [16]*client{}
gws := gwsa[:0]
s.getInboundGatewayConnections(&gws)
for _, gin := range gws {
var e *insie
var ok bool
gin.mu.Lock()
if e, ok = gin.gw.insim[accName]; !ok {
e = &insie{}
gin.gw.insim[accName] = e
}
gin.gatewaySwitchAccountToSendAllSubs(e, []byte(accName))
gin.mu.Unlock()
}
}
// This is invoked when registering (or unregistering) the first
// (or last) subscription on a given account/subject. For each
// GWs inbound connections, we will check if we need to send an RS+ or A+
@@ -1960,8 +2021,8 @@ func (s *Server) gatewayUpdateSubInterest(accName string, sub *subscription, cha
}
// May send a message to all outbound gateways. It is possible
// that message is not sent to a given gateway if for instance
// it is known that this gateway has no interest in account or
// that the message is not sent to a given gateway if for instance
// it is known that this gateway has no interest in the account or
// subject, etc..
// <Invoked from any client connection's readLoop>
func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte) {
@@ -2249,6 +2310,12 @@ func (c *client) gatewayAllSubsReceiveStart(info *Info) {
e.Lock()
e.mode = modeTransitioning
e.Unlock()
} else {
e := &outsie{sl: NewSublist()}
e.mode = modeTransitioning
c.mu.Lock()
c.gw.outsim.Store(account, e)
c.mu.Unlock()
}
}
@@ -2315,6 +2382,7 @@ func (c *client) gatewaySwitchAccountToSendAllSubs(e *insie, accName []byte) {
GatewayCmd: cmd,
GatewayCmdPayload: []byte(account),
}
b, _ := json.Marshal(&info)
infoJSON := []byte(fmt.Sprintf(InfoProto, b))
if useLock {

1076
server/leafnode.go Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -80,6 +80,31 @@ type RemoteGatewayOpts struct {
URLs []*url.URL `json:"urls,omitempty"`
}
// LeafNodeOpts are options for a given server to accept leaf node connections and/or connect to a remote cluster.
type LeafNodeOpts struct {
Host string `json:"addr,omitempty"`
Port int `json:"port,omitempty"`
Username string `json:"-"`
Password string `json:"-"`
AuthTimeout float64 `json:"auth_timeout,omitempty"`
TLSConfig *tls.Config `json:"-"`
TLSTimeout float64 `json:"tls_timeout,omitempty"`
TLSMap bool `json:"-"`
Remotes []*RemoteLeafOpts `json:"remotes,omitempty"`
Advertise string `json:"-"`
NoAdvertise bool `json:"-"`
}
// RemoteLeafOpts are options for connecting to a remote server as a leaf node.
type RemoteLeafOpts struct {
LocalAccount string `json:"local_account,omitempty"`
URL *url.URL `json:"url,omitempty"`
Credentials string `json:"-"`
TLS bool `json:"-"`
TLSConfig *tls.Config `json:"-"`
TLSTimeout float64 `json:"tls_timeout,omitempty"`
}
// Options block for gnatsd server.
type Options struct {
ConfigFile string `json:"-"`
@@ -112,6 +137,7 @@ type Options struct {
MaxPending int64 `json:"max_pending"`
Cluster ClusterOpts `json:"cluster,omitempty"`
Gateway GatewayOpts `json:"gateway,omitempty"`
LeafNode LeafNodeOpts `json:"leaf_node,omitempty"`
ProfPort int `json:"-"`
PidFile string `json:"-"`
PortsFileDir string `json:"-"`
@@ -129,7 +155,6 @@ type Options struct {
TLSCaCert string `json:"-"`
TLSConfig *tls.Config `json:"-"`
WriteDeadline time.Duration `json:"-"`
RQSubsSweep time.Duration `json:"-"` // Deprecated
MaxClosedClients int `json:"-"`
LameDuckDuration time.Duration `json:"-"`
@@ -431,6 +456,12 @@ func (o *Options) ProcessConfigFile(configFile string) error {
errors = append(errors, err)
continue
}
case "leaf", "leafnodes":
err := parseLeafNodes(tk, o, &errors, &warnings)
if err != nil {
errors = append(errors, err)
continue
}
case "logfile", "log_file":
o.LogFile = v.(string)
case "syslog":
@@ -894,6 +925,145 @@ func parseGateway(v interface{}, o *Options, errors *[]error, warnings *[]error)
return nil
}
// parseLeafNodes will parse the leaf node config.
func parseLeafNodes(v interface{}, opts *Options, errors *[]error, warnings *[]error) error {
tk, v := unwrapValue(v)
cm, ok := v.(map[string]interface{})
if !ok {
return &configErr{tk, fmt.Sprintf("Expected map to define a leafnode, got %T", v)}
}
for mk, mv := range cm {
// Again, unwrap token value if line check is required.
tk, mv = unwrapValue(mv)
switch strings.ToLower(mk) {
case "listen":
hp, err := parseListen(mv)
if err != nil {
err := &configErr{tk, err.Error()}
*errors = append(*errors, err)
continue
}
opts.LeafNode.Host = hp.host
opts.LeafNode.Port = hp.port
case "port":
opts.LeafNode.Port = int(mv.(int64))
case "host", "net":
opts.LeafNode.Host = mv.(string)
case "authorization":
auth, err := parseAuthorization(tk, opts, errors, warnings)
if err != nil {
*errors = append(*errors, err)
continue
}
if auth.users != nil {
err := &configErr{tk, fmt.Sprintf("Leafnode authorization does not allow multiple users")}
*errors = append(*errors, err)
continue
}
opts.LeafNode.Username = auth.user
opts.LeafNode.Password = auth.pass
opts.LeafNode.AuthTimeout = auth.timeout
case "remotes":
// Parse the remote options here.
remotes, err := parseRemoteLeafNodes(mv, errors, warnings)
if err != nil {
continue
}
opts.LeafNode.Remotes = remotes
case "tls":
tc, err := parseTLS(tk)
if err != nil {
*errors = append(*errors, err)
continue
}
if opts.LeafNode.TLSConfig, err = GenTLSConfig(tc); err != nil {
err := &configErr{tk, err.Error()}
*errors = append(*errors, err)
continue
}
opts.LeafNode.TLSTimeout = tc.Timeout
case "leafnode_advertise", "advertise":
opts.LeafNode.Advertise = mv.(string)
case "no_advertise":
opts.LeafNode.NoAdvertise = mv.(bool)
trackExplicitVal(opts, &opts.inConfig, "LeafNode.NoAdvertise", opts.LeafNode.NoAdvertise)
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
field: mk,
configErr: configErr{
token: tk,
},
}
*errors = append(*errors, err)
continue
}
}
}
return nil
}
func parseRemoteLeafNodes(v interface{}, errors *[]error, warnings *[]error) ([]*RemoteLeafOpts, error) {
tk, v := unwrapValue(v)
ra, ok := v.([]interface{})
if !ok {
return nil, &configErr{tk, fmt.Sprintf("Expected remotes field to be an array, got %T", v)}
}
remotes := make([]*RemoteLeafOpts, 0, len(ra))
for _, r := range ra {
tk, r = unwrapValue(r)
// Check its a map/struct
rm, ok := r.(map[string]interface{})
if !ok {
*errors = append(*errors, &configErr{tk, fmt.Sprintf("Expected remote leafnode entry to be a map/struct, got %v", r)})
continue
}
remote := &RemoteLeafOpts{}
for k, v := range rm {
tk, v = unwrapValue(v)
switch strings.ToLower(k) {
case "url":
url, err := parseURL(v.(string), "leafnode")
if err != nil {
*errors = append(*errors, &configErr{tk, err.Error()})
continue
}
remote.URL = url
case "account", "local":
remote.LocalAccount = v.(string)
case "creds", "credentials":
remote.Credentials = v.(string)
case "tls":
tc, err := parseTLS(tk)
if err != nil {
*errors = append(*errors, err)
continue
}
if remote.TLSConfig, err = GenTLSConfig(tc); err != nil {
*errors = append(*errors, &configErr{tk, err.Error()})
continue
}
remote.TLSTimeout = tc.Timeout
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
field: k,
configErr: configErr{
token: tk,
},
}
*errors = append(*errors, err)
continue
}
}
}
remotes = append(remotes, remote)
}
return remotes, nil
}
// Parse TLS and returns a TLSConfig and TLSTimeout.
// Used by cluster and gateway parsing.
func getTLSConfig(tk token) (*tls.Config, *TLSConfigOpts, error) {
@@ -2185,6 +2355,17 @@ func setBaselineOptions(opts *Options) {
opts.Cluster.AuthTimeout = float64(AUTH_TIMEOUT) / float64(time.Second)
}
}
if opts.LeafNode.Port != 0 {
if opts.LeafNode.Host == "" {
opts.LeafNode.Host = DEFAULT_HOST
}
if opts.LeafNode.TLSTimeout == 0 {
opts.LeafNode.TLSTimeout = float64(TLS_TIMEOUT) / float64(time.Second)
}
if opts.LeafNode.AuthTimeout == 0 {
opts.LeafNode.AuthTimeout = float64(AUTH_TIMEOUT) / float64(time.Second)
}
}
if opts.MaxControlLine == 0 {
opts.MaxControlLine = MAX_CONTROL_LINE_SIZE
}
@@ -2197,9 +2378,6 @@ func setBaselineOptions(opts *Options) {
if opts.WriteDeadline == time.Duration(0) {
opts.WriteDeadline = DEFAULT_FLUSH_DEADLINE
}
if opts.RQSubsSweep == time.Duration(0) {
opts.RQSubsSweep = DEFAULT_REMOTE_QSUBS_SWEEPER
}
if opts.MaxClosedClients == 0 {
opts.MaxClosedClients = DEFAULT_MAX_CLOSED_CLIENTS
}

View File

@@ -54,7 +54,6 @@ func TestDefaultOptions(t *testing.T) {
MaxPayload: MAX_PAYLOAD_SIZE,
MaxPending: MAX_PENDING_SIZE,
WriteDeadline: DEFAULT_FLUSH_DEADLINE,
RQSubsSweep: DEFAULT_REMOTE_QSUBS_SWEEPER,
MaxClosedClients: DEFAULT_MAX_CLOSED_CLIENTS,
LameDuckDuration: DEFAULT_LAME_DUCK_DURATION,
}
@@ -1709,6 +1708,81 @@ func TestParsingGatewaysErrors(t *testing.T) {
}
}
func TestParsingLeafNodesListener(t *testing.T) {
content := `
leafnodes {
listen: "127.0.0.1:3333"
host: "127.0.0.1"
port: 3333
advertise: "me:22"
authorization {
user: "derek"
password: "s3cr3t!"
timeout: 2.2
}
tls {
cert_file: "./configs/certs/server.pem"
key_file: "./configs/certs/key.pem"
timeout: 3.3
}
}
`
conf := createConfFile(t, []byte(content))
defer os.Remove(conf)
opts, err := ProcessConfigFile(conf)
if err != nil {
t.Fatalf("Error processing file: %v", err)
}
expected := &LeafNodeOpts{
Host: "127.0.0.1",
Port: 3333,
Username: "derek",
Password: "s3cr3t!",
AuthTimeout: 2.2,
Advertise: "me:22",
TLSTimeout: 3.3,
}
if opts.LeafNode.TLSConfig == nil {
t.Fatalf("Expected TLSConfig, got none")
}
opts.LeafNode.TLSConfig = nil
if !reflect.DeepEqual(&opts.LeafNode, expected) {
t.Fatalf("Expected %v, got %v", expected, opts.LeafNode)
}
}
func TestParsingLeafNodeRemotes(t *testing.T) {
content := `
leafnodes {
remotes = [
{
url: nats-leaf://127.0.0.1:2222
account: foobar // Local Account to bind to..
credentials: "./my.creds"
}
]
}
`
conf := createConfFile(t, []byte(content))
defer os.Remove(conf)
opts, err := ProcessConfigFile(conf)
if err != nil {
t.Fatalf("Error processing file: %v", err)
}
if len(opts.LeafNode.Remotes) != 1 {
t.Fatalf("Expected 1 remote, got %d", len(opts.LeafNode.Remotes))
}
expected := &RemoteLeafOpts{
LocalAccount: "foobar",
Credentials: "./my.creds",
}
expected.URL, _ = url.Parse("nats-leaf://127.0.0.1:2222")
if !reflect.DeepEqual(opts.LeafNode.Remotes[0], expected) {
t.Fatalf("Expected %v, got %v", expected, opts.LeafNode.Remotes[0])
}
}
func TestLargeMaxControlLine(t *testing.T) {
confFileName := "big_mcl.conf"
defer os.Remove(confFileName)

View File

@@ -85,6 +85,8 @@ const (
OP_AUSUB
OP_AUSUB_SPC
AUSUB_ARG
OP_L
OP_LS
OP_R
OP_RS
OP_U
@@ -141,6 +143,12 @@ func (c *client) parse(buf []byte) error {
} else {
c.state = OP_R
}
case 'L', 'l':
if c.kind != LEAF {
goto parseErr
} else {
c.state = OP_L
}
case 'A', 'a':
if c.kind == CLIENT {
goto parseErr
@@ -395,6 +403,8 @@ func (c *client) parse(buf []byte) error {
err = c.processRemoteSub(arg)
case GATEWAY:
err = c.processGatewayRSub(arg)
case LEAF:
err = c.processLeafSub(arg)
}
if err != nil {
return err
@@ -405,6 +415,24 @@ func (c *client) parse(buf []byte) error {
c.argBuf = append(c.argBuf, b)
}
}
case OP_L:
switch b {
case 'S', 's':
c.state = OP_LS
case 'M', 'm':
c.state = OP_M
default:
goto parseErr
}
case OP_LS:
switch b {
case '+':
c.state = OP_SUB
case '-':
c.state = OP_UNSUB
default:
goto parseErr
}
case OP_R:
switch b {
case 'S', 's':
@@ -487,6 +515,8 @@ func (c *client) parse(buf []byte) error {
err = c.processRemoteUnsub(arg)
case GATEWAY:
err = c.processGatewayRUnsub(arg)
case LEAF:
err = c.processLeafUnsub(arg)
}
if err != nil {
return err
@@ -653,7 +683,13 @@ func (c *client) parse(buf []byte) error {
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processRoutedMsgArgs(c.trace, arg); err != nil {
var err error
if c.kind == ROUTER || c.kind == GATEWAY {
err = c.processRoutedMsgArgs(c.trace, arg)
} else if c.kind == LEAF {
err = c.processLeafMsgArgs(c.trace, arg)
}
if err != nil {
return err
}
c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD

View File

@@ -525,6 +525,7 @@ func (s *Server) Reload() error {
clientOrgPort := curOpts.Port
clusterOrgPort := curOpts.Cluster.Port
gatewayOrgPort := curOpts.Gateway.Port
leafnodesOrgPort := curOpts.LeafNode.Port
s.mu.Unlock()
@@ -551,6 +552,9 @@ func (s *Server) Reload() error {
if newOpts.Gateway.Port == -1 {
newOpts.Gateway.Port = gatewayOrgPort
}
if newOpts.LeafNode.Port == -1 {
newOpts.LeafNode.Port = leafnodesOrgPort
}
if err := s.reloadOptions(curOpts, newOpts); err != nil {
return err

View File

@@ -3649,6 +3649,7 @@ func TestConfigReloadIgnoreCustomAuth(t *testing.T) {
conf := createConfFile(t, []byte(`
port: -1
`))
defer os.Remove(conf)
opts := LoadConfig(conf)
ca := &testCustomAuth{}
@@ -3666,3 +3667,31 @@ func TestConfigReloadIgnoreCustomAuth(t *testing.T) {
t.Fatalf("Custom auth missing")
}
}
func TestConfigReloadLeafNodeRandomPort(t *testing.T) {
conf := createConfFile(t, []byte(`
port: -1
leafnodes {
port: -1
}
`))
defer os.Remove(conf)
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()
s.mu.Lock()
lnPortBefore := s.leafNodeListener.Addr().(*net.TCPAddr).Port
s.mu.Unlock()
if err := s.Reload(); err != nil {
t.Fatalf("Error during reload: %v", err)
}
s.mu.Lock()
lnPortAfter := s.leafNodeListener.Addr().(*net.TCPAddr).Port
s.mu.Unlock()
if lnPortBefore != lnPortAfter {
t.Fatalf("Expected leafnodes listen port to be same, was %v is now %v", lnPortBefore, lnPortAfter)
}
}

View File

@@ -131,7 +131,7 @@ func (c *client) removeReplySub(sub *subscription) {
// Lock should be held upon entering.
func (c *client) removeReplySubTimeout(sub *subscription) {
// Remove any reply sub timer if it exists.
if c.route.replySubs == nil {
if c.route == nil || c.route.replySubs == nil {
return
}
if t, ok := c.route.replySubs[sub]; ok {
@@ -162,7 +162,6 @@ func (c *client) processRoutedMsgArgs(trace bool, arg []byte) error {
if trace {
c.traceInOp("RMSG", arg)
}
// Unroll splitArgs to avoid runtime/heap issues
a := [MAX_MSG_ARGS][]byte{}
args := a[:0]
@@ -292,7 +291,6 @@ func (c *client) processInboundRoutedMsg(msg []byte) {
c.mu.Unlock()
}
}
c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false)
}
@@ -309,7 +307,7 @@ func (c *client) makeQFilter(qsubs [][]*subscription) {
}
// Lock should be held entering here.
func (c *client) sendConnect(tlsRequired bool) {
func (c *client) sendRouteConnect(tlsRequired bool) {
var user, pass string
if userInfo := c.route.url.User; userInfo != nil {
user = userInfo.Username()
@@ -761,6 +759,9 @@ func (c *client) processRemoteUnsub(arg []byte) (err error) {
srv.gatewayUpdateSubInterest(accountName, sub, -1)
}
// Now check on leafnode updates.
srv.updateLeafNodes(acc, sub, -1)
if c.opts.Verbose {
c.sendOK()
}
@@ -822,7 +823,7 @@ func (c *client) processRemoteSub(argo []byte) (err error) {
}
// Check if we have a maximum on the number of subscriptions.
if c.subsExceeded() {
if c.subsAtLimit() {
c.mu.Unlock()
c.maxSubsExceeded()
return nil
@@ -839,7 +840,7 @@ func (c *client) processRemoteSub(argo []byte) (err error) {
osub := c.subs[key]
updateGWs := false
if osub == nil {
c.subs[string(key)] = sub
c.subs[key] = sub
// Now place into the account sl.
if err = acc.sl.Insert(sub); err != nil {
delete(c.subs, key)
@@ -856,12 +857,17 @@ func (c *client) processRemoteSub(argo []byte) (err error) {
}
c.mu.Unlock()
if c.opts.Verbose {
c.sendOK()
}
if updateGWs {
srv.gatewayUpdateSubInterest(acc.Name, sub, 1)
}
// Now check on leafnode updates.
srv.updateLeafNodes(acc, sub, 1)
if c.opts.Verbose {
c.sendOK()
}
return nil
}
@@ -886,22 +892,26 @@ func (s *Server) sendSubsToRoute(route *client) {
route.mu.Lock()
for _, a := range accs {
subs := raw[:0]
a.mu.RLock()
for key, rme := range a.rm {
c := a.randomClient()
if c == nil {
a.mu.RUnlock()
continue
}
for key, n := range a.rm {
// FIXME(dlc) - Just pass rme around.
// Construct a sub on the fly. We need to place
// a client (or im) to properly set the account.
var qn []byte
subEnd := len(key)
if qi := rme.qi; qi > 0 {
subEnd = int(qi) - 1
qn = []byte(key[qi:])
var subj, qn []byte
s := strings.Split(key, " ")
subj = []byte(s[0])
if len(s) > 1 {
qn = []byte(s[1])
}
c := a.randomClient()
if c == nil {
continue
}
sub := &subscription{client: c, subject: []byte(key[:subEnd]), queue: qn, qw: rme.n}
// TODO(dlc) - This code needs to change, but even if left alone could be more
// efficient with these tmp subs.
sub := &subscription{client: c, subject: subj, queue: qn, qw: n}
subs = append(subs, sub)
}
@@ -1079,7 +1089,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
// Check for TLS
if tlsRequired {
// Copy off the config to add in ServerName if we
// Copy off the config to add in ServerName if we need to.
tlsConfig := opts.Cluster.TLSConfig.Clone()
// If we solicited, we will act like the client, otherwise the server.
@@ -1169,7 +1179,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
// Queue Connect proto if we solicited the connection.
if didSolicit {
c.Debugf("Route connect msg sent")
c.sendConnect(tlsRequired)
c.sendRouteConnect(tlsRequired)
}
// Send our info to the other side.
@@ -1262,56 +1272,50 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del
return
}
// We only store state on local subs for transmission across routes.
if sub.client == nil || (sub.client.kind != CLIENT && sub.client.kind != SYSTEM) {
// We only store state on local subs for transmission across all other routes.
if sub.client == nil || (sub.client.kind != CLIENT && sub.client.kind != SYSTEM && sub.client.kind != LEAF) {
return
}
// Create the fast key which will use the subject or 'subject<spc>queue' for queue subscribers.
var (
_rkey [1024]byte
key []byte
qi int32
_rkey [1024]byte
key []byte
update bool
)
if sub.queue != nil {
// Just make the key subject spc group, e.g. 'foo bar'
key = _rkey[:0]
key = append(key, sub.subject...)
key = append(key, byte(' '))
qi = int32(len(key))
key = append(key, sub.queue...)
// We always update for a queue subscriber since we need to send our relative weight.
update = true
} else {
key = sub.subject
}
// We always update for a queue subscriber since we need to send our relative weight.
var entry *rme
// Copy to hold outside acc lock.
var n int32
var ok bool
// Always update if a queue subscriber.
update := qi > 0
// Copy to hold outside acc lock.
var entryN int32
acc.mu.Lock()
if entry, ok = rm[string(key)]; ok {
entry.n += delta
if entry.n <= 0 {
if n, ok = rm[string(key)]; ok {
n += delta
if n <= 0 {
delete(rm, string(key))
update = true // Update for deleting,
update = true // Update for deleting (N->0)
} else {
rm[string(key)] = n
}
} else if delta > 0 {
entry = &rme{qi, delta}
rm[string(key)] = entry
update = true // Adding for normal sub means update.
}
if entry != nil {
entryN = entry.n
n = delta
rm[string(key)] = delta
update = true // Adding a new entry for normal sub means update (0->1)
}
acc.mu.Unlock()
if !update || entry == nil {
if !update {
return
}
// We need to send out this update.
@@ -1321,13 +1325,13 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del
sub.client.mu.Lock()
nsub := *sub
sub.client.mu.Unlock()
nsub.qw = entryN
nsub.qw = n
sub = &nsub
}
// Note that queue unsubs where entry.n > 0 are still
// subscribes with a smaller weight.
if entryN > 0 {
if n > 0 {
s.broadcastSubscribe(sub)
} else {
s.broadcastUnSubscribe(sub)

View File

@@ -47,8 +47,8 @@ const lameDuckModeDefaultInitialDelay = int64(10 * time.Second)
// Make this a variable so that we can change during tests
var lameDuckModeInitialDelay = int64(lameDuckModeDefaultInitialDelay)
// Info is the information sent to clients to help them understand information
// about this server.
// Info is the information sent to clients, routes, gateways, and leaf nodes,
// to help them understand information about this server.
type Info struct {
ID string `json:"server_id"`
Version string `json:"version"`
@@ -83,38 +83,43 @@ type Info struct {
type Server struct {
gcid uint64
stats
mu sync.Mutex
kp nkeys.KeyPair
prand *rand.Rand
info Info
configFile string
optsMu sync.RWMutex
opts *Options
running bool
shutdown bool
listener net.Listener
gacc *Account
sys *internal
accounts map[string]*Account
activeAccounts int
accResolver AccountResolver
clients map[uint64]*client
routes map[uint64]*client
remotes map[string]*client
users map[string]*User
nkeys map[string]*NkeyUser
totalClients uint64
closed *closedRingBuffer
done chan bool
start time.Time
http net.Listener
httpHandler http.Handler
profiler net.Listener
httpReqStats map[string]uint64
routeListener net.Listener
routeInfo Info
routeInfoJSON []byte
quitCh chan struct{}
mu sync.Mutex
kp nkeys.KeyPair
prand *rand.Rand
info Info
configFile string
optsMu sync.RWMutex
opts *Options
running bool
shutdown bool
listener net.Listener
gacc *Account
sys *internal
accounts map[string]*Account
activeAccounts int
accResolver AccountResolver
clients map[uint64]*client
routes map[uint64]*client
remotes map[string]*client
leafs map[uint64]*client
users map[string]*User
nkeys map[string]*NkeyUser
totalClients uint64
closed *closedRingBuffer
done chan bool
start time.Time
http net.Listener
httpHandler http.Handler
profiler net.Listener
httpReqStats map[string]uint64
routeListener net.Listener
routeInfo Info
routeInfoJSON []byte
leafNodeListener net.Listener
leafNodeInfo Info
leafNodeInfoJSON []byte
quitCh chan struct{}
// Tracking Go routines
grMu sync.Mutex
@@ -181,7 +186,7 @@ func NewServer(opts *Options) (*Server, error) {
tlsReq := opts.TLSConfig != nil
verify := (tlsReq && opts.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert)
// Created server nkey identity.
// Created server's nkey identity.
kp, _ := nkeys.CreateServer()
pub, _ := kp.PublicKey()
@@ -263,6 +268,9 @@ func NewServer(opts *Options) (*Server, error) {
s.routes = make(map[uint64]*client)
s.remotes = make(map[string]*client)
// For tracking leaf nodes.
s.leafs = make(map[uint64]*client)
// Used to kick out all go routines possibly waiting on server
// to shutdown.
s.quitCh = make(chan struct{})
@@ -291,6 +299,11 @@ func validateOptions(o *Options) error {
if err := validateTrustedOperators(o); err != nil {
return err
}
// Check on leaf nodes which will require a system
// account when gateways are also configured.
if err := validateLeafNode(o); err != nil {
return err
}
// Check that gateway is properly configured. Returns no error
// if there is no gateway defined.
return validateGatewayOptions(o)
@@ -401,10 +414,14 @@ func (s *Server) generateRouteInfoJSON() {
}
// isTrustedIssuer will check that the issuer is a trusted public key.
// This is used to make sure and account was signed by a trusted operator.
// This is used to make sure an account was signed by a trusted operator.
func (s *Server) isTrustedIssuer(issuer string) bool {
s.mu.Lock()
defer s.mu.Unlock()
// If we are not running in trusted mode and there is no issuer, that is ok.
if len(s.trustedKeys) == 0 && issuer == "" {
return true
}
for _, tk := range s.trustedKeys {
if tk == issuer {
return true
@@ -413,8 +430,8 @@ func (s *Server) isTrustedIssuer(issuer string) bool {
return false
}
// processTrustedKeys will process stamped and option based
// trusted nkeys. Returns success.
// processTrustedKeys will process binary stamped and
// options-based trusted nkeys. Returns success.
func (s *Server) processTrustedKeys() bool {
if trustedKeys != "" && !s.initStampedTrustedKeys() {
return false
@@ -596,7 +613,7 @@ func (s *Server) SystemAccount() *Account {
return nil
}
// Assign an system account. Should only be called once.
// Assign a system account. Should only be called once.
// This sets up a server to send and receive messages from inside
// the server itself.
func (s *Server) setSystemAccount(acc *Account) error {
@@ -607,10 +624,14 @@ func (s *Server) setSystemAccount(acc *Account) error {
if acc.IsExpired() {
return ErrAccountExpired
}
// If we are running with trusted keys for an operator
// make sure we check the account is legit.
if !s.isTrustedIssuer(acc.Issuer) {
return ErrAccountValidation
}
s.mu.Lock()
if s.sys != nil {
s.mu.Unlock()
return ErrAccountExists
@@ -661,6 +682,13 @@ func (s *Server) systemAccount() *Account {
return s.sys.account
}
// Determine if accounts should track subscriptions for
// efficient propagation.
// Lock should be held on entry.
func (s *Server) shouldTrackSubscriptions() bool {
return (s.opts.Cluster.Port != 0 || s.opts.Gateway.Port != 0)
}
// Place common account setup here.
// Lock should be held on entry.
func (s *Server) registerAccount(acc *Account) {
@@ -682,8 +710,9 @@ func (s *Server) registerAccount(acc *Account) {
// already created (global account), so use locking and
// make sure we create only if needed.
acc.mu.Lock()
if acc.rm == nil && s.opts != nil && (s.opts.Cluster.Port != 0 || s.opts.Gateway.Port != 0) {
acc.rm = make(map[string]*rme, 256)
// TODO(dlc)- Double check that we need this for GWs.
if acc.rm == nil && s.opts != nil && s.shouldTrackSubscriptions() {
acc.rm = make(map[string]int32)
}
acc.srv = s
acc.mu.Unlock()
@@ -898,6 +927,19 @@ func (s *Server) Start() {
s.startGateways()
}
// Start up listen if we want to accept leaf node connections.
if opts.LeafNode.Port != 0 {
// Spin up the accept loop if needed
ch := make(chan struct{})
go s.leafNodeAcceptLoop(ch)
<-ch
}
// Solict remote servers for leaf node connections.
if len(opts.LeafNode.Remotes) > 0 {
s.solicitLeafNodeRemotes(opts.LeafNode.Remotes)
}
// The Routing routine needs to wait for the client listen
// port to be opened and potential ephemeral port selected.
clientListenReady := make(chan struct{})
@@ -967,6 +1009,11 @@ func (s *Server) Shutdown() {
// Copy off the gateways
s.getAllGatewayConnections(conns)
// Copy off the leaf nodes
for i, c := range s.leafs {
conns[i] = c
}
// Number of done channel responses we expect.
doneExpected := 0
@@ -977,6 +1024,13 @@ func (s *Server) Shutdown() {
s.listener = nil
}
// Kick leafnodes AcceptLoop()
if s.leafNodeListener != nil {
doneExpected++
s.leafNodeListener.Close()
s.leafNodeListener = nil
}
// Kick route AcceptLoop()
if s.routeListener != nil {
doneExpected++
@@ -1628,6 +1682,8 @@ func (s *Server) removeClient(c *client) {
s.removeRoute(c)
case GATEWAY:
s.removeRemoteGatewayConnection(c)
case LEAF:
s.removeLeafNodeConnection(c)
}
}
@@ -1667,6 +1723,13 @@ func (s *Server) NumRemotes() int {
return len(s.remotes)
}
// NumLeafNodes will report number of leaf node connections.
func (s *Server) NumLeafNodes() int {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.leafs)
}
// NumClients will report the number of registered clients.
func (s *Server) NumClients() int {
s.mu.Lock()
@@ -1674,6 +1737,11 @@ func (s *Server) NumClients() int {
return len(s.clients)
}
// GetClient will return the client associated with cid.
func (s *Server) GetClient(cid uint64) *client {
return s.getClient(cid)
}
// getClient will return the client associated with cid.
func (s *Server) getClient(cid uint64) *client {
s.mu.Lock()
@@ -1681,6 +1749,12 @@ func (s *Server) getClient(cid uint64) *client {
return s.clients[cid]
}
func (s *Server) GetLeafNode(cid uint64) *client {
s.mu.Lock()
defer s.mu.Unlock()
return s.leafs[cid]
}
// NumSubscriptions will report how many subscriptions are active.
func (s *Server) NumSubscriptions() uint32 {
s.mu.Lock()
@@ -1752,7 +1826,7 @@ func (s *Server) ProfilerAddr() *net.TCPAddr {
return s.profiler.Addr().(*net.TCPAddr)
}
// ReadyForConnections returns `true` if the server is ready to accept client
// ReadyForConnections returns `true` if the server is ready to accept clients
// and, if routing is enabled, route connections. If after the duration
// `dur` the server is still not ready, returns `false`.
func (s *Server) ReadyForConnections(dur time.Duration) bool {

View File

@@ -1,4 +1,4 @@
// Copyright 2016-2018 The NATS Authors
// Copyright 2016-2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -967,3 +967,42 @@ func (s *Sublist) localSubs(subs *[]*subscription) {
s.collectLocalSubs(s.root, subs)
s.RUnlock()
}
// Used to collect all subscriptions.
func (s *Sublist) All(subs *[]*subscription) {
s.RLock()
s.collectAllSubs(s.root, subs)
s.RUnlock()
}
func (s *Sublist) addAllNodeToSubs(n *node, subs *[]*subscription) {
// Normal subscriptions
if n.plist != nil {
*subs = append(*subs, n.plist...)
} else {
for _, sub := range n.psubs {
*subs = append(*subs, sub)
}
}
// Queue subscriptions
for _, qr := range n.qsubs {
for _, sub := range qr {
*subs = append(*subs, sub)
}
}
}
func (s *Sublist) collectAllSubs(l *level, subs *[]*subscription) {
for _, n := range l.nodes {
s.addAllNodeToSubs(n, subs)
s.collectLocalSubs(n.next, subs)
}
if l.pwc != nil {
s.addAllNodeToSubs(l.pwc, subs)
s.collectLocalSubs(l.pwc.next, subs)
}
if l.fwc != nil {
s.addAllNodeToSubs(l.fwc, subs)
s.collectLocalSubs(l.fwc.next, subs)
}
}

View File

@@ -0,0 +1,25 @@
# Cluster Server A
listen: 127.0.0.1:5222
leafnodes {
listen: 127.0.0.1:5223
}
cluster {
listen: 127.0.0.1:5244
authorization {
user: ruser
password: top_secret
timeout: 0.5
}
# Routes are actively solicited and connected to from this server.
# Other servers can connect to us if they supply the correct credentials
# in their routes definitions from above.
routes = [
nats-route://ruser:top_secret@127.0.0.1:5246
]
}

1143
test/leafnode_test.go Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -103,12 +103,12 @@ func TestNewRouteConnectSubs(t *testing.T) {
if string(m[3]) != "bar" {
t.Fatalf("Expected group of 'bar', got %q", m[3])
}
// Expect the SID to be the total weighted count for the queue group
// Expect a weighted count for the queue group
if len(m) != 5 {
t.Fatalf("Expected a SID for the queue group")
t.Fatalf("Expected a weight for the queue group")
}
if m[4] == nil || string(m[4]) != "10" {
t.Fatalf("Expected SID of '10', got %q", m[4])
t.Fatalf("Expected Weight of '10', got %q", m[4])
}
}
}
@@ -179,10 +179,10 @@ func TestNewRouteConnectSubsWithAccount(t *testing.T) {
}
// Expect the SID to be the total weighted count for the queue group
if len(m) != 5 {
t.Fatalf("Expected a SID for the queue group")
t.Fatalf("Expected a weight for the queue group")
}
if m[4] == nil || string(m[4]) != "10" {
t.Fatalf("Expected SID of '10', got %q", m[4])
t.Fatalf("Expected Weight of '10', got %q", m[4])
}
}
}

View File

@@ -23,6 +23,7 @@ import (
"regexp"
"runtime"
"strings"
"testing"
"time"
"github.com/nats-io/gnatsd/server"
@@ -279,6 +280,7 @@ var (
pingRe = regexp.MustCompile(`^PING\r\n`)
pongRe = regexp.MustCompile(`^PONG\r\n`)
msgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`)
rawMsgRe = regexp.MustCompile(`(?:(?:MSG\s+([^\s]+)\s+([^\s]+)\s+(([^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n(.*?)))`)
okRe = regexp.MustCompile(`\A\+OK\r\n`)
errRe = regexp.MustCompile(`\A\-ERR\s+([^\r\n]+)\r\n`)
connectRe = regexp.MustCompile(`CONNECT\s+([^\r\n]+)\r\n`)
@@ -287,6 +289,9 @@ var (
rmsgRe = regexp.MustCompile(`(?:(?:RMSG\s+([^\s]+)\s+([^\s]+)\s+(?:([|+]\s+([\w\s]+)|[^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`)
asubRe = regexp.MustCompile(`A\+\s+([^\r\n]+)\r\n`)
aunsubRe = regexp.MustCompile(`A\-\s+([^\r\n]+)\r\n`)
lsubRe = regexp.MustCompile(`LS\+\s+([^\s]+)\s*([^\s]+)?\s*(\d+)?\r\n`)
lunsubRe = regexp.MustCompile(`LS\-\s+([^\s]+)\s*([^\s]+)?\r\n`)
lmsgRe = regexp.MustCompile(`(?:(?:LMSG\s+([^\s]+)\s+(?:([|+]\s+([\w\s]+)|[^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`)
)
const (
@@ -333,6 +338,17 @@ func peek(c net.Conn) []byte {
return expBuf
}
func expectDisconnect(t *testing.T, c net.Conn) {
t.Helper()
var b [8]byte
c.SetReadDeadline(time.Now().Add(200 * time.Millisecond))
_, err := c.Read(b[:])
c.SetReadDeadline(time.Time{})
if err != io.EOF {
t.Fatalf("Expected a disconnect")
}
}
func expectNothing(t tLogger, c net.Conn) {
expBuf := make([]byte, 32)
c.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
@@ -377,6 +393,18 @@ func checkRmsg(t tLogger, m [][]byte, account, subject, replyAndQueues, len, msg
}
}
func checkLmsg(t tLogger, m [][]byte, subject, replyAndQueues, len, msg string) {
if string(m[rsubIndex-1]) != subject {
stackFatalf(t, "Did not get correct subject: expected '%s' got '%s'\n", subject, m[rsubIndex-1])
}
if string(m[lenIndex-1]) != len {
stackFatalf(t, "Did not get correct msg length: expected '%s' got '%s'\n", len, m[lenIndex-1])
}
if string(m[replyAndQueueIndex-1]) != replyAndQueues {
stackFatalf(t, "Did not get correct reply/queues: expected '%s' got '%s'\n", replyAndQueues, m[replyAndQueueIndex-1])
}
}
// Closure for expectMsgs
func expectRmsgsCommand(t tLogger, ef expectFun) func(int) [][][]byte {
return func(expected int) [][][]byte {