mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Allow servers to send system events.
Specifically this is to support distributed tracking of number of account connections across clusters. Gateways may not work yet based on attempts to only generate payloads when we know there is outside interest. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -40,18 +40,21 @@ type rme struct {
|
||||
// Account are subject namespace definitions. By default no messages are shared between accounts.
|
||||
// You can share via exports and imports of streams and services.
|
||||
type Account struct {
|
||||
Name string
|
||||
Nkey string
|
||||
Issuer string
|
||||
claimJWT string
|
||||
updated time.Time
|
||||
mu sync.RWMutex
|
||||
sl *Sublist
|
||||
etmr *time.Timer
|
||||
clients map[*client]*client
|
||||
rm map[string]*rme
|
||||
imports importMap
|
||||
exports exportMap
|
||||
Name string
|
||||
Nkey string
|
||||
Issuer string
|
||||
claimJWT string
|
||||
updated time.Time
|
||||
mu sync.RWMutex
|
||||
sl *Sublist
|
||||
etmr *time.Timer
|
||||
ctmr *time.Timer
|
||||
strack map[string]int
|
||||
nrclients int
|
||||
clients map[*client]*client
|
||||
rm map[string]*rme
|
||||
imports importMap
|
||||
exports exportMap
|
||||
limits
|
||||
nae int
|
||||
pruning bool
|
||||
@@ -106,19 +109,28 @@ type exportMap struct {
|
||||
services map[string]*exportAuth
|
||||
}
|
||||
|
||||
// NumClients returns active number of clients for this account.
|
||||
// NumClients returns active number of clients for this account for
|
||||
// all known servers.
|
||||
func (a *Account) NumClients() int {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
return len(a.clients) + a.nrclients
|
||||
}
|
||||
|
||||
// NumLocalClients returns active number of clients for this account
|
||||
// on this server.
|
||||
func (a *Account) NumLocalClients() int {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
return len(a.clients)
|
||||
}
|
||||
|
||||
// MaxClientsReached returns if we have reached our limit for number of connections.
|
||||
func (a *Account) MaxClientsReached() bool {
|
||||
func (a *Account) MaxTotalClientsReached() bool {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
if a.mconns != 0 {
|
||||
return len(a.clients) >= a.mconns
|
||||
return len(a.clients)+a.nrclients >= a.mconns
|
||||
}
|
||||
return false
|
||||
}
|
||||
@@ -138,25 +150,39 @@ func (a *Account) TotalSubs() int {
|
||||
return int(a.sl.Count())
|
||||
}
|
||||
|
||||
// addClient keeps our accounting of active clients updated.
|
||||
// addClient keeps our accounting of local active clients updated.
|
||||
// Returns previous total.
|
||||
func (a *Account) addClient(c *client) int {
|
||||
a.mu.Lock()
|
||||
n := len(a.clients)
|
||||
a.clients[c] = c
|
||||
a.mu.Unlock()
|
||||
if c != nil && c.srv != nil && a != c.srv.gacc {
|
||||
c.srv.accConnsUpdate(a)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// removeClient keeps our accounting of active clients updated.
|
||||
// removeClient keeps our accounting of local active clients updated.
|
||||
func (a *Account) removeClient(c *client) int {
|
||||
a.mu.Lock()
|
||||
n := len(a.clients)
|
||||
delete(a.clients, c)
|
||||
a.mu.Unlock()
|
||||
if c != nil && c.srv != nil && a != c.srv.gacc {
|
||||
c.srv.accConnsUpdate(a)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (a *Account) randomClient() *client {
|
||||
var c *client
|
||||
for _, c = range a.clients {
|
||||
break
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
// AddServiceExport will configure the account with the defined export.
|
||||
func (a *Account) AddServiceExport(subject string, accounts []*Account) error {
|
||||
a.mu.Lock()
|
||||
|
||||
@@ -406,14 +406,14 @@ func (c *client) registerWithAccount(acc *Account) error {
|
||||
if acc == nil || acc.sl == nil {
|
||||
return ErrBadAccount
|
||||
}
|
||||
// If we were previously register, usually to $G, do accounting here to remove.
|
||||
// If we were previously registered, usually to $G, do accounting here to remove.
|
||||
if c.acc != nil {
|
||||
if prev := c.acc.removeClient(c); prev == 1 && c.srv != nil {
|
||||
c.srv.decActiveAccounts()
|
||||
}
|
||||
}
|
||||
// Check if we have a max connections violation
|
||||
if acc.MaxClientsReached() {
|
||||
if acc.MaxTotalClientsReached() {
|
||||
return ErrTooManyAccountConnections
|
||||
}
|
||||
|
||||
@@ -490,7 +490,6 @@ func (c *client) RegisterUser(user *User) {
|
||||
c.mperms = nil
|
||||
return
|
||||
}
|
||||
|
||||
c.setPermissions(user.Permissions)
|
||||
}
|
||||
|
||||
@@ -1514,7 +1513,7 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
c.Errorf(err.Error())
|
||||
}
|
||||
// If we are routing and this is a local sub, add to the route map for the associated account.
|
||||
if kind == CLIENT {
|
||||
if kind == CLIENT || kind == SYSTEM {
|
||||
c.srv.updateRouteSubscriptionMap(acc, sub, 1)
|
||||
}
|
||||
}
|
||||
@@ -1704,7 +1703,7 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force bool) {
|
||||
for _, nsub := range sub.shadow {
|
||||
if err := nsub.im.acc.sl.Remove(nsub); err != nil {
|
||||
c.Debugf("Could not remove shadow import subscription for account %q", nsub.im.acc.Name)
|
||||
} else if c.kind == CLIENT && c.srv != nil {
|
||||
} else if c.kind == CLIENT || c.kind == SYSTEM && c.srv != nil {
|
||||
c.srv.updateRouteSubscriptionMap(nsub.im.acc, nsub, -1)
|
||||
}
|
||||
}
|
||||
@@ -1758,7 +1757,7 @@ func (c *client) processUnsub(arg []byte) error {
|
||||
|
||||
if unsub {
|
||||
c.unsubscribe(acc, sub, false)
|
||||
if acc != nil && kind == CLIENT {
|
||||
if acc != nil && kind == CLIENT || kind == SYSTEM {
|
||||
c.srv.updateRouteSubscriptionMap(acc, sub, -1)
|
||||
}
|
||||
}
|
||||
@@ -1833,7 +1832,7 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
|
||||
defer client.removeReplySub(sub)
|
||||
} else {
|
||||
// For routing..
|
||||
shouldForward := client.kind == CLIENT && client.srv != nil
|
||||
shouldForward := client.kind == CLIENT || client.kind == SYSTEM && client.srv != nil
|
||||
// If we are at the exact number, unsubscribe but
|
||||
// still process the message in hand, otherwise
|
||||
// unsubscribe and drop message on the floor.
|
||||
|
||||
464
server/events.go
464
server/events.go
@@ -17,14 +17,36 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
connectEventSubj = "$SYS.%s.CLIENT.CONNECT"
|
||||
disconnectEventSubj = "$SYS.%s.CLIENT.DISCONNECT"
|
||||
connectEventSubj = "$SYS.ACCOUNT.%s.CONNECT"
|
||||
disconnectEventSubj = "$SYS.ACCOUNT.%s.DISCONNECT"
|
||||
accConnsEventSubj = "$SYS.SERVER.ACCOUNT.%s.CONNS"
|
||||
accConnsReqSubj = "$SYS.REQ.ACCOUNT.%s.CONNS"
|
||||
connsRespSubj = "$SYS._INBOX_.%s"
|
||||
shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN"
|
||||
shutdownEventTokens = 4
|
||||
serverSubjectIndex = 2
|
||||
)
|
||||
|
||||
// Used to send and receive messages from inside the server.
|
||||
type internal struct {
|
||||
account *Account
|
||||
client *client
|
||||
seq uint64
|
||||
sid uint64
|
||||
servers map[string]*serverUpdate
|
||||
sweeper *time.Timer
|
||||
subs map[string]msgHandler
|
||||
sendq chan *pubMsg
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// ConnectEventMsg is sent when a new connection is made that is part of an account.
|
||||
type ConnectEventMsg struct {
|
||||
Server ServerInfo `json:"server"`
|
||||
@@ -41,6 +63,22 @@ type DisconnectEventMsg struct {
|
||||
Reason string `json:"reason"`
|
||||
}
|
||||
|
||||
// accNumConns is an event that will be sent from a server that is tracking
|
||||
// a given account when the number of connections changes. It will also HB
|
||||
// updates in the absence of any changes.
|
||||
type accNumConns struct {
|
||||
Server ServerInfo `json:"server"`
|
||||
Account string `json:"acc"`
|
||||
Conns int `json:"conns"`
|
||||
}
|
||||
|
||||
// accNumConnsReq is sent when we are starting to track an account for the first
|
||||
// time. We will request others send info to us about their local state.
|
||||
type accNumConnsReq struct {
|
||||
Server ServerInfo `json:"server"`
|
||||
Account string `json:"acc"`
|
||||
}
|
||||
|
||||
type ServerInfo struct {
|
||||
Host string `json:"host"`
|
||||
ID string `json:"id"`
|
||||
@@ -68,30 +106,52 @@ type DataStats struct {
|
||||
|
||||
// Used for internally queueing up messages that the server wants to send.
|
||||
type pubMsg struct {
|
||||
r *SublistResult
|
||||
sub string
|
||||
si *ServerInfo
|
||||
msg interface{}
|
||||
r *SublistResult
|
||||
sub string
|
||||
rply string
|
||||
si *ServerInfo
|
||||
msg interface{}
|
||||
last bool
|
||||
}
|
||||
|
||||
func (s *Server) internalSendLoop() {
|
||||
defer s.grWG.Done()
|
||||
// Used to track server updates.
|
||||
type serverUpdate struct {
|
||||
seq uint64
|
||||
ltime time.Time
|
||||
}
|
||||
|
||||
// internalSendLoop will be responsible for serializing all messages that
|
||||
// a server wants to send.
|
||||
func (s *Server) internalSendLoop(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
s.mu.Lock()
|
||||
if s.sys == nil {
|
||||
if s.sys == nil || s.sys.sendq == nil {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
c := s.sys.client
|
||||
acc := s.sys.account
|
||||
sendq := s.sys.sendq
|
||||
id := s.info.ID
|
||||
host := s.info.Host
|
||||
seqp := &s.sys.seq
|
||||
s.mu.Unlock()
|
||||
|
||||
for s.isRunning() {
|
||||
for s.eventsRunning() {
|
||||
// Setup information for next message
|
||||
seq := atomic.AddUint64(seqp, 1)
|
||||
select {
|
||||
case pm := <-sendq:
|
||||
s.stampServerInfo(pm.si)
|
||||
b, _ := json.MarshalIndent(pm.msg, "", " ")
|
||||
|
||||
if pm.si != nil {
|
||||
pm.si.Host = host
|
||||
pm.si.ID = id
|
||||
pm.si.Seq = seq
|
||||
}
|
||||
var b []byte
|
||||
if pm.msg != nil {
|
||||
b, _ = json.MarshalIndent(pm.msg, _EMPTY_, " ")
|
||||
}
|
||||
// Prep internal structures needed to send message.
|
||||
c.pa.subject = []byte(pm.sub)
|
||||
c.pa.size = len(b)
|
||||
@@ -102,32 +162,356 @@ func (s *Server) internalSendLoop() {
|
||||
if acc.imports.services != nil {
|
||||
c.checkForImportServices(acc, b)
|
||||
}
|
||||
c.processMsgResults(acc, pm.r, b, []byte(pm.sub), nil, nil)
|
||||
c.processMsgResults(acc, pm.r, b, c.pa.subject, []byte(pm.rply), nil)
|
||||
c.flushClients()
|
||||
// See if we are doing graceful shutdown.
|
||||
if pm.last {
|
||||
return
|
||||
}
|
||||
case <-s.quitCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This will queue up a message to be sent.
|
||||
func (s *Server) sendInternalMsg(r *SublistResult, sub string, si *ServerInfo, msg interface{}) {
|
||||
if s.sys == nil {
|
||||
// Will send a shutdown message.
|
||||
func (s *Server) sendShutdownEvent() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.sys == nil || s.sys.sendq == nil {
|
||||
return
|
||||
}
|
||||
s.sys.sendq <- &pubMsg{r, sub, si, msg}
|
||||
subj := fmt.Sprintf(shutdownEventSubj, s.info.ID)
|
||||
r := s.sys.account.sl.Match(subj)
|
||||
sendq := s.sys.sendq
|
||||
// Stop any more messages from queueing up.
|
||||
s.sys.sendq = nil
|
||||
// Unhook all msgHandlers. Normal client cleanup will deal with subs, etc.
|
||||
s.sys.subs = nil
|
||||
// Send to the internal queue and mark as last.
|
||||
sendq <- &pubMsg{r, subj, _EMPTY_, nil, nil, true}
|
||||
}
|
||||
|
||||
// accountConnectEvent will send an account client connect event if there is interest.
|
||||
func (s *Server) accountConnectEvent(c *client) {
|
||||
if s.sys == nil || s.sys.client == nil || s.sys.account == nil {
|
||||
// This will queue up a message to be sent.
|
||||
func (s *Server) sendInternalMsg(r *SublistResult, sub, rply string, si *ServerInfo, msg interface{}) {
|
||||
if s.sys == nil || s.sys.sendq == nil {
|
||||
return
|
||||
}
|
||||
s.sys.sendq <- &pubMsg{r, sub, rply, si, msg, false}
|
||||
}
|
||||
|
||||
// Locked version of checking if events system running. Also checks server.
|
||||
func (s *Server) eventsRunning() bool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.running && s.eventsEnabled()
|
||||
}
|
||||
|
||||
func (s *Server) eventsEnabled() bool {
|
||||
return s.sys != nil && s.sys.client != nil && s.sys.account != nil
|
||||
}
|
||||
|
||||
// orphanServerDuration is how long we have to not hear from a remote server
|
||||
// top consider it orphaned. We will remove any accounting associated with it.
|
||||
var orphanServerDuration = 5 * connHBInterval
|
||||
var checkRemoteServerInterval = 3 * connHBInterval
|
||||
|
||||
// Check for orphan servers who may have gone away without notification.
|
||||
func (s *Server) checkRemoteServers() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if !s.eventsEnabled() {
|
||||
return
|
||||
}
|
||||
now := time.Now()
|
||||
for sid, su := range s.sys.servers {
|
||||
if now.Sub(su.ltime) > orphanServerDuration {
|
||||
s.Debugf("Detected orphan remote server: %q", sid)
|
||||
// Simulate it going away.
|
||||
s.processRemoteServerShutdown(sid)
|
||||
delete(s.sys.servers, sid)
|
||||
}
|
||||
}
|
||||
s.sys.sweeper.Reset(checkRemoteServerInterval)
|
||||
}
|
||||
|
||||
// Start a ticker that will fire periodically and check for orphaned servers.
|
||||
func (s *Server) startRemoteServerSweepTimer() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if !s.eventsEnabled() {
|
||||
return
|
||||
}
|
||||
s.sys.sweeper = time.AfterFunc(checkRemoteServerInterval, s.checkRemoteServers)
|
||||
}
|
||||
|
||||
// This will setup our system wide tracking subs.
|
||||
// For now we will setup one wildcard subscription to
|
||||
// monitor all accounts for changes in number of connections.
|
||||
// We can make this on a per account tracking basis if needed.
|
||||
// Tradeoff is subscription and interest graph events vs connect and
|
||||
// disconnect events, etc.
|
||||
func (s *Server) initEventTracking() {
|
||||
if !s.eventsEnabled() {
|
||||
return
|
||||
}
|
||||
subject := fmt.Sprintf(accConnsEventSubj, "*")
|
||||
if _, err := s.sysSubscribe(subject, s.remoteConnsUpdate); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
// This will be for responses for account info that we send out.
|
||||
subject = fmt.Sprintf(connsRespSubj, s.info.ID)
|
||||
if _, err := s.sysSubscribe(subject, s.remoteConnsUpdate); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
// Listen for broad requests to respond with account info.
|
||||
subject = fmt.Sprintf(accConnsReqSubj, "*")
|
||||
if _, err := s.sysSubscribe(subject, s.connsRequest); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
// Listen for all server shutdowns.
|
||||
subject = fmt.Sprintf(shutdownEventSubj, "*")
|
||||
if _, err := s.sysSubscribe(subject, s.remoteServerShutdown); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// processRemoteServerShutdown will update any affected accounts.
|
||||
// Will upidate the remote count for clients.
|
||||
// Lock assume held.
|
||||
func (s *Server) processRemoteServerShutdown(sid string) {
|
||||
for _, a := range s.accounts {
|
||||
a.mu.Lock()
|
||||
prev := a.strack[sid]
|
||||
delete(a.strack, sid)
|
||||
a.nrclients -= prev
|
||||
a.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// serverShutdownEvent is called when we get an event from another server shutting down.
|
||||
func (s *Server) remoteServerShutdown(sub *subscription, subject, reply string, msg []byte) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if !s.eventsEnabled() {
|
||||
return
|
||||
}
|
||||
toks := strings.Split(subject, tsep)
|
||||
if len(toks) != shutdownEventTokens {
|
||||
s.Debugf("Received remote server shutdown on bad subject %q", subject)
|
||||
}
|
||||
sid := toks[serverSubjectIndex]
|
||||
su := s.sys.servers[sid]
|
||||
if su != nil {
|
||||
s.processRemoteServerShutdown(sid)
|
||||
}
|
||||
}
|
||||
|
||||
// updateRemoteServer is called when we have an update from a remote server.
|
||||
// This allows us to track remote servers, respond to shutdown messages properly,
|
||||
// make sure that messages are ordered, and allow us to prune dead servers.
|
||||
// Lock should be held upon entry.
|
||||
func (s *Server) updateRemoteServer(ms *ServerInfo) {
|
||||
su := s.sys.servers[ms.ID]
|
||||
if su == nil {
|
||||
s.sys.servers[ms.ID] = &serverUpdate{ms.Seq, time.Now()}
|
||||
} else {
|
||||
if ms.Seq <= su.seq {
|
||||
s.Errorf("Received out of order remote server update from: %q", ms.ID)
|
||||
return
|
||||
}
|
||||
if ms.Seq != su.seq+1 {
|
||||
s.Errorf("Missed [%d] remote server updates from: %q", ms.Seq-su.seq+1, ms.ID)
|
||||
}
|
||||
su.seq = ms.Seq
|
||||
su.ltime = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
// shutdownEventing will clean up all eventing state.
|
||||
// Lock is held upon entry.
|
||||
func (s *Server) shutdownEventing() {
|
||||
if !s.eventsRunning() {
|
||||
return
|
||||
}
|
||||
|
||||
if s.sys.sweeper != nil {
|
||||
s.sys.sweeper.Stop()
|
||||
s.sys.sweeper = nil
|
||||
}
|
||||
|
||||
// We will queue up a shutdown event and wait for the
|
||||
// internal send loop to exit.
|
||||
s.sendShutdownEvent()
|
||||
s.sys.wg.Wait()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Whip through all accounts.
|
||||
for _, a := range s.accounts {
|
||||
a.mu.Lock()
|
||||
a.nrclients = 0
|
||||
// Now clear state
|
||||
if a.etmr != nil {
|
||||
a.etmr.Stop()
|
||||
a.etmr = nil
|
||||
}
|
||||
if a.ctmr != nil {
|
||||
a.ctmr.Stop()
|
||||
a.ctmr = nil
|
||||
}
|
||||
a.clients = nil
|
||||
a.strack = nil
|
||||
a.mu.Unlock()
|
||||
}
|
||||
// Turn everything off here.
|
||||
s.sys = nil
|
||||
}
|
||||
|
||||
func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []byte) {
|
||||
if !s.eventsRunning() {
|
||||
return
|
||||
}
|
||||
|
||||
m := accNumConnsReq{}
|
||||
if err := json.Unmarshal(msg, &m); err != nil {
|
||||
s.sys.client.Errorf("Error unmarshalling account connections request message: %v", err)
|
||||
return
|
||||
}
|
||||
acc := s.LookupAccount(m.Account)
|
||||
if acc == nil {
|
||||
return
|
||||
}
|
||||
if nlc := acc.NumLocalClients(); nlc > 0 {
|
||||
s.sendAccConnsUpdate(acc, reply)
|
||||
}
|
||||
}
|
||||
|
||||
// remoteConnsUpdate gets called when we receive a remote update from another server.
|
||||
func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg []byte) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if !s.eventsEnabled() {
|
||||
return
|
||||
}
|
||||
|
||||
m := accNumConns{}
|
||||
if err := json.Unmarshal(msg, &m); err != nil {
|
||||
s.sys.client.Errorf("Error unmarshalling account connection event message: %v", err)
|
||||
return
|
||||
}
|
||||
// Double check that this is not us, should never happen, so error if it does.
|
||||
if m.Server.ID == s.info.ID {
|
||||
s.sys.client.Errorf("Processing our own account connection event message: ignored")
|
||||
return
|
||||
}
|
||||
// See if we have the account registered, if not drop it.
|
||||
acc := s.lookupAccount(m.Account)
|
||||
if acc == nil {
|
||||
s.sys.client.Debugf("Received account connection event for unknown account: %s", m.Account)
|
||||
return
|
||||
}
|
||||
// If we are here we have interest in tracking this account. Update our accounting.
|
||||
acc.mu.Lock()
|
||||
if acc.strack == nil {
|
||||
acc.strack = make(map[string]int)
|
||||
}
|
||||
// This does not depend on receiving all updates since each one is idempotent.
|
||||
prev := acc.strack[m.Server.ID]
|
||||
acc.strack[m.Server.ID] = m.Conns
|
||||
acc.nrclients += (m.Conns - prev)
|
||||
acc.mu.Unlock()
|
||||
|
||||
s.updateRemoteServer(&m.Server)
|
||||
}
|
||||
|
||||
// Setup tracking for this account. This allows us to track globally
|
||||
// account activity.
|
||||
func (s *Server) enableAccountTracking(a *Account) {
|
||||
if a == nil || !s.eventsEnabled() || a == s.sys.account {
|
||||
return
|
||||
}
|
||||
acc := s.sys.account
|
||||
sc := s.sys.client
|
||||
|
||||
subj := fmt.Sprintf(accConnsReqSubj, a.Name)
|
||||
r := acc.sl.Match(subj)
|
||||
if noOutSideInterest(sc, r) {
|
||||
return
|
||||
}
|
||||
reply := fmt.Sprintf(connsRespSubj, s.info.ID)
|
||||
m := accNumConnsReq{Account: a.Name}
|
||||
s.sendInternalMsg(r, subj, reply, &m.Server, &m)
|
||||
}
|
||||
|
||||
// FIXME(dlc) - make configurable.
|
||||
const connHBInterval = 30 * time.Second
|
||||
|
||||
// sendAccConnsUpdate is called to send out our information on the
|
||||
// account's local connections.
|
||||
func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
|
||||
if !s.eventsEnabled() || a == nil || a == s.sys.account || a == s.gacc {
|
||||
return
|
||||
}
|
||||
acc := s.sys.account
|
||||
sc := s.sys.client
|
||||
|
||||
r := acc.sl.Match(subj)
|
||||
if noOutSideInterest(sc, r) {
|
||||
return
|
||||
}
|
||||
|
||||
a.mu.Lock()
|
||||
// If no limits set, don't update, no need to.
|
||||
if a.mconns == 0 {
|
||||
a.mu.Unlock()
|
||||
return
|
||||
}
|
||||
// Build event with account name and number of local clients.
|
||||
m := accNumConns{
|
||||
Account: a.Name,
|
||||
Conns: len(a.clients),
|
||||
}
|
||||
// Check to see if we have an HB running and update.
|
||||
if a.ctmr == nil {
|
||||
a.etmr = time.AfterFunc(connHBInterval, func() { s.accConnsUpdate(a) })
|
||||
} else {
|
||||
a.etmr.Reset(connHBInterval)
|
||||
}
|
||||
a.mu.Unlock()
|
||||
|
||||
s.sendInternalMsg(r, subj, "", &m.Server, &m)
|
||||
}
|
||||
|
||||
// accConnsUpdate is called whenever there is a change to the account's
|
||||
// number of active connections, or during a heartbeat.
|
||||
func (s *Server) accConnsUpdate(a *Account) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if !s.eventsEnabled() || a == nil || a == s.sys.account {
|
||||
return
|
||||
}
|
||||
subj := fmt.Sprintf(accConnsEventSubj, a.Name)
|
||||
s.sendAccConnsUpdate(a, subj)
|
||||
}
|
||||
|
||||
// accountConnectEvent will send an account client connect event if there is interest.
|
||||
// This is a billing event.
|
||||
func (s *Server) accountConnectEvent(c *client) {
|
||||
s.mu.Lock()
|
||||
if !s.eventsEnabled() {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
acc := s.sys.account
|
||||
sc := s.sys.client
|
||||
s.mu.Unlock()
|
||||
|
||||
subj := fmt.Sprintf(connectEventSubj, c.acc.Name)
|
||||
r := acc.sl.Match(subj)
|
||||
if s.noOutSideInterest(r) {
|
||||
if noOutSideInterest(sc, r) {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -145,20 +529,24 @@ func (s *Server) accountConnectEvent(c *client) {
|
||||
},
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
s.sendInternalMsg(r, subj, &m.Server, &m)
|
||||
s.sendInternalMsg(r, subj, "", &m.Server, &m)
|
||||
}
|
||||
|
||||
// accountDisconnectEvent will send an account client disconnect event if there is interest.
|
||||
// This is a billing event.
|
||||
func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string) {
|
||||
if s.sys == nil || s.sys.client == nil || s.sys.account == nil {
|
||||
s.mu.Lock()
|
||||
if !s.eventsEnabled() {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
acc := s.sys.account
|
||||
sc := s.sys.client
|
||||
s.mu.Unlock()
|
||||
|
||||
subj := fmt.Sprintf(disconnectEventSubj, c.acc.Name)
|
||||
r := acc.sl.Match(subj)
|
||||
if s.noOutSideInterest(r) {
|
||||
if noOutSideInterest(sc, r) {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -186,8 +574,7 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
|
||||
Reason: reason,
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
s.sendInternalMsg(r, subj, &m.Server, &m)
|
||||
s.sendInternalMsg(r, subj, "", &m.Server, &m)
|
||||
}
|
||||
|
||||
// Internal message callback. If the msg is needed past the callback it is
|
||||
@@ -195,7 +582,7 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
|
||||
type msgHandler func(sub *subscription, subject, reply string, msg []byte)
|
||||
|
||||
func (s *Server) deliverInternalMsg(sub *subscription, subject, reply, msg []byte) {
|
||||
if s.sys == nil {
|
||||
if !s.eventsEnabled() || s.sys.subs == nil {
|
||||
return
|
||||
}
|
||||
s.mu.Lock()
|
||||
@@ -208,7 +595,7 @@ func (s *Server) deliverInternalMsg(sub *subscription, subject, reply, msg []byt
|
||||
|
||||
// Create an internal subscription. No support for queue groups atm.
|
||||
func (s *Server) sysSubscribe(subject string, cb msgHandler) (*subscription, error) {
|
||||
if s.sys == nil {
|
||||
if !s.eventsEnabled() {
|
||||
return nil, ErrNoSysAccount
|
||||
}
|
||||
if cb == nil {
|
||||
@@ -232,7 +619,7 @@ func (s *Server) sysSubscribe(subject string, cb msgHandler) (*subscription, err
|
||||
}
|
||||
|
||||
func (s *Server) sysUnsubscribe(sub *subscription) {
|
||||
if sub == nil || s.sys == nil {
|
||||
if sub == nil || !s.eventsEnabled() {
|
||||
return
|
||||
}
|
||||
s.mu.Lock()
|
||||
@@ -243,8 +630,7 @@ func (s *Server) sysUnsubscribe(sub *subscription) {
|
||||
c.unsubscribe(acc, sub, true)
|
||||
}
|
||||
|
||||
func (s *Server) noOutSideInterest(r *SublistResult) bool {
|
||||
sc := s.sys.client
|
||||
func noOutSideInterest(sc *client, r *SublistResult) bool {
|
||||
if sc == nil || r == nil {
|
||||
return true
|
||||
}
|
||||
@@ -264,18 +650,6 @@ func (s *Server) noOutSideInterest(r *SublistResult) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *Server) stampServerInfo(si *ServerInfo) {
|
||||
if si == nil {
|
||||
return
|
||||
}
|
||||
s.mu.Lock()
|
||||
si.ID = s.info.ID
|
||||
si.Seq = s.sys.seq
|
||||
si.Host = s.info.Host
|
||||
s.sys.seq++
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *client) flushClients() {
|
||||
last := time.Now()
|
||||
for cp := range c.pcd {
|
||||
|
||||
@@ -36,27 +36,6 @@ func createAccount(s *Server) (*Account, nkeys.KeyPair) {
|
||||
return s.LookupAccount(pub), akp
|
||||
}
|
||||
|
||||
func TestSystemAccount(t *testing.T) {
|
||||
s := opTrustBasicSetup()
|
||||
defer s.Shutdown()
|
||||
buildMemAccResolver(s)
|
||||
|
||||
acc, _ := createAccount(s)
|
||||
s.setSystemAccount(acc)
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.sys == nil || s.sys.account == nil {
|
||||
t.Fatalf("Expected sys.account to be non-nil")
|
||||
}
|
||||
if s.sys.client == nil {
|
||||
t.Fatalf("Expected sys.client to be non-nil")
|
||||
}
|
||||
if s.sys.client.echo {
|
||||
t.Fatalf("Internal clients should always have echo false")
|
||||
}
|
||||
}
|
||||
|
||||
func createUserCreds(t *testing.T, s *Server, akp nkeys.KeyPair) nats.Option {
|
||||
t.Helper()
|
||||
kp, _ := nkeys.CreateUser()
|
||||
@@ -82,11 +61,68 @@ func runTrustedServer(t *testing.T) (*Server, *Options) {
|
||||
kp, _ := nkeys.FromSeed(oSeed)
|
||||
pub, _ := kp.PublicKey()
|
||||
opts.TrustedNkeys = []string{pub}
|
||||
opts.accResolver = &MemAccResolver{}
|
||||
s := RunServer(opts)
|
||||
buildMemAccResolver(s)
|
||||
return s, opts
|
||||
}
|
||||
|
||||
func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options) {
|
||||
t.Helper()
|
||||
|
||||
kp, _ := nkeys.FromSeed(oSeed)
|
||||
pub, _ := kp.PublicKey()
|
||||
|
||||
mr := &MemAccResolver{}
|
||||
|
||||
// Now create a system account.
|
||||
// NOTE: This can NOT be shared directly between servers.
|
||||
// Set via server options.
|
||||
okp, _ := nkeys.FromSeed(oSeed)
|
||||
akp, _ := nkeys.CreateAccount()
|
||||
apub, _ := akp.PublicKey()
|
||||
nac := jwt.NewAccountClaims(apub)
|
||||
jwt, _ := nac.Encode(okp)
|
||||
|
||||
mr.Store(apub, jwt)
|
||||
|
||||
optsA := DefaultOptions()
|
||||
optsA.Cluster.Host = "127.0.0.1"
|
||||
optsA.TrustedNkeys = []string{pub}
|
||||
optsA.accResolver = mr
|
||||
optsA.SystemAccount = apub
|
||||
|
||||
sa := RunServer(optsA)
|
||||
|
||||
optsB := nextServerOpts(optsA)
|
||||
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port))
|
||||
sb := RunServer(optsB)
|
||||
|
||||
checkClusterFormed(t, sa, sb)
|
||||
|
||||
return sa, optsA, sb, optsB
|
||||
}
|
||||
|
||||
func TestSystemAccount(t *testing.T) {
|
||||
s, _ := runTrustedServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
acc, _ := createAccount(s)
|
||||
s.setSystemAccount(acc)
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.sys == nil || s.sys.account == nil {
|
||||
t.Fatalf("Expected sys.account to be non-nil")
|
||||
}
|
||||
if s.sys.client == nil {
|
||||
t.Fatalf("Expected sys.client to be non-nil")
|
||||
}
|
||||
if s.sys.client.echo {
|
||||
t.Fatalf("Internal clients should always have echo false")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSystemAccountNewConnection(t *testing.T) {
|
||||
s, opts := runTrustedServer(t)
|
||||
defer s.Shutdown()
|
||||
@@ -102,7 +138,7 @@ func TestSystemAccountNewConnection(t *testing.T) {
|
||||
}
|
||||
defer ncs.Close()
|
||||
|
||||
sub, _ := ncs.SubscribeSync(">")
|
||||
sub, _ := ncs.SubscribeSync("$SYS.ACCOUNT.>")
|
||||
defer sub.Unsubscribe()
|
||||
ncs.Flush()
|
||||
|
||||
@@ -121,14 +157,14 @@ func TestSystemAccountNewConnection(t *testing.T) {
|
||||
t.Fatalf("Error receiving msg: %v", err)
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.%s.CLIENT.CONNECT", acc2.Name)) {
|
||||
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.<ACCOUNT>.CLIENT.CONNECT", msg.Subject)
|
||||
if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.CONNECT", acc2.Name)) {
|
||||
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.ACCOUNT.<account>.CONNECT", msg.Subject)
|
||||
}
|
||||
tokens := strings.Split(msg.Subject, ".")
|
||||
if len(tokens) < 4 {
|
||||
t.Fatalf("Expected 4 tokens, got %d", len(tokens))
|
||||
}
|
||||
account := tokens[1]
|
||||
account := tokens[2]
|
||||
if account != acc2.Name {
|
||||
t.Fatalf("Expected %q for account, got %q", acc2.Name, account)
|
||||
}
|
||||
@@ -168,14 +204,14 @@ func TestSystemAccountNewConnection(t *testing.T) {
|
||||
t.Fatalf("Error receiving msg: %v", err)
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.%s.CLIENT.DISCONNECT", acc2.Name)) {
|
||||
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.<ACCOUNT>.CLIENT.DISCONNECT", msg.Subject)
|
||||
if !strings.HasPrefix(msg.Subject, fmt.Sprintf("$SYS.ACCOUNT.%s.DISCONNECT", acc2.Name)) {
|
||||
t.Fatalf("Expected subject to start with %q, got %q", "$SYS.ACCOUNT.<account>.DISCONNECT", msg.Subject)
|
||||
}
|
||||
tokens = strings.Split(msg.Subject, ".")
|
||||
if len(tokens) < 4 {
|
||||
t.Fatalf("Expected 4 tokens, got %d", len(tokens))
|
||||
}
|
||||
account = tokens[1]
|
||||
account = tokens[2]
|
||||
if account != acc2.Name {
|
||||
t.Fatalf("Expected %q for account, got %q", acc2.Name, account)
|
||||
}
|
||||
@@ -216,7 +252,7 @@ func TestSystemAccountNewConnection(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSystemInternalSubscriptions(t *testing.T) {
|
||||
func TestSystemAccountInternalSubscriptions(t *testing.T) {
|
||||
s, opts := runTrustedServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
@@ -289,7 +325,7 @@ func TestSystemInternalSubscriptions(t *testing.T) {
|
||||
// Now make sure we do not hear ourselves. We optimize this for internally
|
||||
// generated messages.
|
||||
r := SublistResult{psubs: []*subscription{sub}}
|
||||
s.sendInternalMsg(&r, "foo", nil, msg.Data)
|
||||
s.sendInternalMsg(&r, "foo", "", nil, msg.Data)
|
||||
|
||||
select {
|
||||
case <-received:
|
||||
@@ -298,3 +334,201 @@ func TestSystemInternalSubscriptions(t *testing.T) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
func TestSystemAccountConnectionLimits(t *testing.T) {
|
||||
sa, optsA, sb, optsB := runTrustedCluster(t)
|
||||
|
||||
// We want to test that we are limited to a certain number of active connections
|
||||
// across multiple servers.
|
||||
|
||||
// Let's create a user account.
|
||||
okp, _ := nkeys.FromSeed(oSeed)
|
||||
akp, _ := nkeys.CreateAccount()
|
||||
pub, _ := akp.PublicKey()
|
||||
nac := jwt.NewAccountClaims(pub)
|
||||
nac.Limits.Conn = 4 // Limit to 4 connections.
|
||||
jwt, _ := nac.Encode(okp)
|
||||
|
||||
addAccountToMemResolver(sa, pub, jwt)
|
||||
|
||||
urlA := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
|
||||
urlB := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
|
||||
|
||||
// Create a user on each server. Break on first failure.
|
||||
for {
|
||||
nca1, err := nats.Connect(urlA, createUserCreds(t, sa, akp))
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
defer nca1.Close()
|
||||
ncb1, err := nats.Connect(urlB, createUserCreds(t, sb, akp))
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
defer ncb1.Close()
|
||||
}
|
||||
|
||||
total := sa.NumClients() + sb.NumClients()
|
||||
if total > int(nac.Limits.Conn) {
|
||||
t.Fatalf("Expected only %d connections, was allowed to connect %d", nac.Limits.Conn, total)
|
||||
}
|
||||
}
|
||||
|
||||
// Test that the remote accounting works when a server is started some time later.
|
||||
func TestSystemAccountConnectionLimitsServersStaggered(t *testing.T) {
|
||||
sa, optsA, sb, optsB := runTrustedCluster(t)
|
||||
defer sa.Shutdown()
|
||||
sb.Shutdown()
|
||||
|
||||
// Let's create a user account.
|
||||
okp, _ := nkeys.FromSeed(oSeed)
|
||||
akp, _ := nkeys.CreateAccount()
|
||||
pub, _ := akp.PublicKey()
|
||||
nac := jwt.NewAccountClaims(pub)
|
||||
nac.Limits.Conn = 4 // Limit to 4 connections.
|
||||
jwt, _ := nac.Encode(okp)
|
||||
|
||||
addAccountToMemResolver(sa, pub, jwt)
|
||||
|
||||
urlA := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
|
||||
// Create max connections on sa.
|
||||
for i := 0; i < int(nac.Limits.Conn); i++ {
|
||||
nc, err := nats.Connect(urlA, createUserCreds(t, sa, akp))
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error on #%d try: %v", i+1, err)
|
||||
}
|
||||
defer nc.Close()
|
||||
}
|
||||
|
||||
// Restart server B.
|
||||
optsB.accResolver = sa.accResolver
|
||||
optsB.SystemAccount = sa.systemAccount().Name
|
||||
sb = RunServer(optsB)
|
||||
defer sb.Shutdown()
|
||||
checkClusterFormed(t, sa, sb)
|
||||
|
||||
// Trigger a load of the user account on the new server
|
||||
// NOTE: If we do not load the user can be the first to request this
|
||||
// account, hence the connection will succeed.
|
||||
sb.LookupAccount(pub)
|
||||
|
||||
// Expect this to fail.
|
||||
urlB := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
|
||||
if _, err := nats.Connect(urlB, createUserCreds(t, sb, akp)); err == nil {
|
||||
t.Fatalf("Expected connection to fail due to max limit")
|
||||
}
|
||||
}
|
||||
|
||||
// Test that the remote accounting works when a server is shutdown.
|
||||
func TestSystemAccountConnectionLimitsServerShutdownGraceful(t *testing.T) {
|
||||
sa, optsA, sb, optsB := runTrustedCluster(t)
|
||||
defer sa.Shutdown()
|
||||
defer sb.Shutdown()
|
||||
|
||||
// Let's create a user account.
|
||||
okp, _ := nkeys.FromSeed(oSeed)
|
||||
akp, _ := nkeys.CreateAccount()
|
||||
pub, _ := akp.PublicKey()
|
||||
nac := jwt.NewAccountClaims(pub)
|
||||
nac.Limits.Conn = 10 // Limit to 10 connections.
|
||||
jwt, _ := nac.Encode(okp)
|
||||
|
||||
addAccountToMemResolver(sa, pub, jwt)
|
||||
addAccountToMemResolver(sb, pub, jwt)
|
||||
|
||||
urlA := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
|
||||
urlB := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
_, err := nats.Connect(urlA, nats.NoReconnect(), createUserCreds(t, sa, akp))
|
||||
if err != nil {
|
||||
t.Fatalf("Expected to connect, got %v", err)
|
||||
}
|
||||
_, err = nats.Connect(urlB, nats.NoReconnect(), createUserCreds(t, sb, akp))
|
||||
if err != nil {
|
||||
t.Fatalf("Expected to connect, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// We are at capacity so both of these should fail.
|
||||
if _, err := nats.Connect(urlA, createUserCreds(t, sa, akp)); err == nil {
|
||||
t.Fatalf("Expected connection to fail due to max limit")
|
||||
}
|
||||
if _, err := nats.Connect(urlB, createUserCreds(t, sb, akp)); err == nil {
|
||||
t.Fatalf("Expected connection to fail due to max limit")
|
||||
}
|
||||
|
||||
// Now shutdown Server B.
|
||||
sb.Shutdown()
|
||||
|
||||
// Now we should be able to create more on A now.
|
||||
for i := 0; i < 5; i++ {
|
||||
_, err := nats.Connect(urlA, createUserCreds(t, sa, akp))
|
||||
if err != nil {
|
||||
t.Fatalf("Expected to connect on %d, got %v", i, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test that the remote accounting works when a server goes away.
|
||||
func TestSystemAccountConnectionLimitsServerShutdownForced(t *testing.T) {
|
||||
sa, optsA, sb, optsB := runTrustedCluster(t)
|
||||
|
||||
// Let's create a user account.
|
||||
okp, _ := nkeys.FromSeed(oSeed)
|
||||
akp, _ := nkeys.CreateAccount()
|
||||
pub, _ := akp.PublicKey()
|
||||
nac := jwt.NewAccountClaims(pub)
|
||||
nac.Limits.Conn = 20 // Limit to 20 connections.
|
||||
jwt, _ := nac.Encode(okp)
|
||||
|
||||
addAccountToMemResolver(sa, pub, jwt)
|
||||
addAccountToMemResolver(sb, pub, jwt)
|
||||
|
||||
urlA := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
|
||||
urlB := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
_, err := nats.Connect(urlA, nats.NoReconnect(), createUserCreds(t, sa, akp))
|
||||
if err != nil {
|
||||
t.Fatalf("Expected to connect, got %v", err)
|
||||
}
|
||||
_, err = nats.Connect(urlB, nats.NoReconnect(), createUserCreds(t, sb, akp))
|
||||
if err != nil {
|
||||
t.Fatalf("Expected to connect, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// We are at capacity so both of these should fail.
|
||||
if _, err := nats.Connect(urlA, createUserCreds(t, sa, akp)); err == nil {
|
||||
t.Fatalf("Expected connection to fail due to max limit")
|
||||
}
|
||||
if _, err := nats.Connect(urlB, createUserCreds(t, sb, akp)); err == nil {
|
||||
t.Fatalf("Expected connection to fail due to max limit")
|
||||
}
|
||||
|
||||
// Now shutdown Server B. Do so such that now communications goo out.
|
||||
sb.mu.Lock()
|
||||
sb.sys = nil
|
||||
sb.mu.Unlock()
|
||||
sb.Shutdown()
|
||||
|
||||
if _, err := nats.Connect(urlA, createUserCreds(t, sa, akp)); err == nil {
|
||||
t.Fatalf("Expected connection to fail due to max limit")
|
||||
}
|
||||
|
||||
// Let's speed up the checking process.
|
||||
checkRemoteServerInterval = 10 * time.Millisecond
|
||||
orphanServerDuration = 30 * time.Millisecond
|
||||
sa.mu.Lock()
|
||||
sa.sys.sweeper.Reset(checkRemoteServerInterval)
|
||||
sa.mu.Unlock()
|
||||
|
||||
// We should eventually be able to connect.
|
||||
checkFor(t, 5*time.Second, 50*time.Millisecond, func() error {
|
||||
if _, err := nats.Connect(urlA, createUserCreds(t, sa, akp)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -94,6 +94,7 @@ type Options struct {
|
||||
Nkeys []*NkeyUser `json:"-"`
|
||||
Users []*User `json:"-"`
|
||||
Accounts []*Account `json:"-"`
|
||||
SystemAccount string `json:"-"`
|
||||
AllowNewAccounts bool `json:"-"`
|
||||
Username string `json:"-"`
|
||||
Password string `json:"-"`
|
||||
@@ -138,6 +139,9 @@ type Options struct {
|
||||
|
||||
// private fields, used for testing
|
||||
gatewaysSolicitDelay time.Duration
|
||||
|
||||
// Used to spin up a memory account resolver for testing.
|
||||
accResolver AccountResolver
|
||||
}
|
||||
|
||||
type netResolver interface {
|
||||
@@ -2058,7 +2062,7 @@ func getInterfaceIPs() ([]net.IP, error) {
|
||||
return localIPs, nil
|
||||
}
|
||||
|
||||
func processOptions(opts *Options) {
|
||||
func setBaselineOptions(opts *Options) {
|
||||
// Setup non-standard Go defaults
|
||||
if opts.Host == "" {
|
||||
opts.Host = DEFAULT_HOST
|
||||
|
||||
@@ -47,7 +47,7 @@ func TestDefaultOptions(t *testing.T) {
|
||||
}
|
||||
|
||||
opts := &Options{}
|
||||
processOptions(opts)
|
||||
setBaselineOptions(opts)
|
||||
|
||||
if !reflect.DeepEqual(golden, opts) {
|
||||
t.Fatalf("Default Options are incorrect.\nexpected: %+v\ngot: %+v",
|
||||
@@ -57,7 +57,7 @@ func TestDefaultOptions(t *testing.T) {
|
||||
|
||||
func TestOptions_RandomPort(t *testing.T) {
|
||||
opts := &Options{Port: RANDOM_PORT}
|
||||
processOptions(opts)
|
||||
setBaselineOptions(opts)
|
||||
|
||||
if opts.Port != 0 {
|
||||
t.Fatalf("Process of options should have resolved random port to "+
|
||||
@@ -450,7 +450,7 @@ func TestListenConfig(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Received an error reading config file: %v\n", err)
|
||||
}
|
||||
processOptions(opts)
|
||||
setBaselineOptions(opts)
|
||||
|
||||
// Normal clients
|
||||
host := "10.0.1.22"
|
||||
@@ -500,7 +500,7 @@ func TestListenPortOnlyConfig(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Received an error reading config file: %v\n", err)
|
||||
}
|
||||
processOptions(opts)
|
||||
setBaselineOptions(opts)
|
||||
|
||||
port := 8922
|
||||
|
||||
@@ -520,7 +520,7 @@ func TestListenPortWithColonConfig(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Received an error reading config file: %v\n", err)
|
||||
}
|
||||
processOptions(opts)
|
||||
setBaselineOptions(opts)
|
||||
|
||||
port := 8922
|
||||
|
||||
@@ -539,7 +539,7 @@ func TestListenMonitoringDefault(t *testing.T) {
|
||||
opts := &Options{
|
||||
Host: "10.0.1.22",
|
||||
}
|
||||
processOptions(opts)
|
||||
setBaselineOptions(opts)
|
||||
|
||||
host := "10.0.1.22"
|
||||
if opts.Host != host {
|
||||
@@ -558,7 +558,7 @@ func TestMultipleUsersConfig(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Received an error reading config file: %v\n", err)
|
||||
}
|
||||
processOptions(opts)
|
||||
setBaselineOptions(opts)
|
||||
}
|
||||
|
||||
// Test highly depends on contents of the config file listed below. Any changes to that file
|
||||
@@ -568,7 +568,7 @@ func TestAuthorizationConfig(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Received an error reading config file: %v\n", err)
|
||||
}
|
||||
processOptions(opts)
|
||||
setBaselineOptions(opts)
|
||||
lu := len(opts.Users)
|
||||
if lu != 3 {
|
||||
t.Fatalf("Expected 3 users, got %d\n", lu)
|
||||
@@ -655,7 +655,7 @@ func TestNewStyleAuthorizationConfig(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Received an error reading config file: %v\n", err)
|
||||
}
|
||||
processOptions(opts)
|
||||
setBaselineOptions(opts)
|
||||
|
||||
lu := len(opts.Users)
|
||||
if lu != 2 {
|
||||
|
||||
@@ -503,7 +503,7 @@ func (s *Server) Reload() error {
|
||||
|
||||
// Apply flags over config file settings.
|
||||
newOpts = MergeOptions(newOpts, FlagSnapshot)
|
||||
processOptions(newOpts)
|
||||
setBaselineOptions(newOpts)
|
||||
|
||||
// processOptions sets Port to 0 if set to -1 (RANDOM port)
|
||||
// If that's the case, set it to the saved value when the accept loop was
|
||||
|
||||
@@ -149,7 +149,7 @@ func TestConfigReloadUnsupported(t *testing.T) {
|
||||
},
|
||||
NoSigs: true,
|
||||
}
|
||||
processOptions(golden)
|
||||
setBaselineOptions(golden)
|
||||
|
||||
if !reflect.DeepEqual(golden, server.getOpts()) {
|
||||
t.Fatalf("Options are incorrect.\nexpected: %+v\ngot: %+v",
|
||||
@@ -228,7 +228,7 @@ func TestConfigReloadInvalidConfig(t *testing.T) {
|
||||
},
|
||||
NoSigs: true,
|
||||
}
|
||||
processOptions(golden)
|
||||
setBaselineOptions(golden)
|
||||
|
||||
if !reflect.DeepEqual(golden, server.getOpts()) {
|
||||
t.Fatalf("Options are incorrect.\nexpected: %+v\ngot: %+v",
|
||||
@@ -299,7 +299,7 @@ func TestConfigReload(t *testing.T) {
|
||||
},
|
||||
NoSigs: true,
|
||||
}
|
||||
processOptions(golden)
|
||||
setBaselineOptions(golden)
|
||||
|
||||
if !reflect.DeepEqual(golden, opts) {
|
||||
t.Fatalf("Options are incorrect.\nexpected: %+v\ngot: %+v",
|
||||
|
||||
@@ -871,14 +871,19 @@ func (s *Server) sendSubsToRoute(route *client) {
|
||||
a.mu.RLock()
|
||||
for key, rme := range a.rm {
|
||||
// FIXME(dlc) - Just pass rme around.
|
||||
// Construct a sub on the fly
|
||||
// Construct a sub on the fly. We need to place
|
||||
// a client (or im) to properly set the account.
|
||||
var qn []byte
|
||||
subEnd := len(key)
|
||||
if qi := rme.qi; qi > 0 {
|
||||
subEnd = int(qi) - 1
|
||||
qn = []byte(key[qi:])
|
||||
}
|
||||
sub := &subscription{subject: []byte(key[:subEnd]), queue: qn, qw: rme.n}
|
||||
c := a.randomClient()
|
||||
if c == nil {
|
||||
continue
|
||||
}
|
||||
sub := &subscription{client: c, subject: []byte(key[:subEnd]), queue: qn, qw: rme.n}
|
||||
subs = append(subs, sub)
|
||||
|
||||
}
|
||||
@@ -917,7 +922,7 @@ func (c *client) sendRouteUnSubProtos(subs []*subscription, trace bool, filter f
|
||||
return c.sendRouteSubOrUnSubProtos(subs, false, trace, filter)
|
||||
}
|
||||
|
||||
// Low-level function that sends SUBs or UNSUBs protcols for the given subscriptions.
|
||||
// Low-level function that sends RS+ or RS- protocols for the given subscriptions.
|
||||
// Use sendRouteSubProtos or sendRouteUnSubProtos instead for clarity.
|
||||
// Lock is held on entry.
|
||||
func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, trace bool, filter func(sub *subscription) bool) bool {
|
||||
@@ -945,6 +950,7 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra
|
||||
} else if sub.client != nil && sub.client.acc != nil {
|
||||
accName = sub.client.acc.Name
|
||||
} else {
|
||||
c.Debugf("Falling back to default account for sending subs")
|
||||
accName = globalAccountName
|
||||
}
|
||||
|
||||
@@ -1232,7 +1238,7 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del
|
||||
}
|
||||
|
||||
// We only store state on local subs for transmission across routes.
|
||||
if sub.client == nil || sub.client.kind != CLIENT {
|
||||
if sub.client == nil || (sub.client.kind != CLIENT && sub.client.kind != SYSTEM) {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
134
server/server.go
134
server/server.go
@@ -77,16 +77,6 @@ type Info struct {
|
||||
GatewayCmd byte `json:"gateway_cmd,omitempty"` // Command code for the receiving server to know what to do
|
||||
}
|
||||
|
||||
// Used to send and receive messages from inside the server.
|
||||
type internal struct {
|
||||
account *Account
|
||||
client *client
|
||||
seq uint64
|
||||
sid uint64
|
||||
subs map[string]msgHandler
|
||||
sendq chan *pubMsg
|
||||
}
|
||||
|
||||
// Server is our main struct.
|
||||
type Server struct {
|
||||
gcid uint64
|
||||
@@ -185,11 +175,13 @@ type stats struct {
|
||||
|
||||
// New will setup a new server struct after parsing the options.
|
||||
func New(opts *Options) *Server {
|
||||
processOptions(opts)
|
||||
setBaselineOptions(opts)
|
||||
|
||||
// Process TLS options, including whether we require client certificates.
|
||||
tlsReq := opts.TLSConfig != nil
|
||||
verify := (tlsReq && opts.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert)
|
||||
|
||||
// Created server nkey identity.
|
||||
kp, _ := nkeys.CreateServer()
|
||||
pub, _ := kp.PublicKey()
|
||||
|
||||
@@ -229,6 +221,7 @@ func New(opts *Options) *Server {
|
||||
configTime: now,
|
||||
}
|
||||
|
||||
// Trusted root keys.
|
||||
if !s.processTrustedNkeys() {
|
||||
return nil
|
||||
}
|
||||
@@ -256,13 +249,6 @@ func New(opts *Options) *Server {
|
||||
}
|
||||
s.gateway = gws
|
||||
|
||||
// For tracking accounts
|
||||
s.accounts = make(map[string]*Account)
|
||||
|
||||
// Create global account.
|
||||
s.gacc = &Account{Name: globalAccountName}
|
||||
s.registerAccount(s.gacc)
|
||||
|
||||
// For tracking clients
|
||||
s.clients = make(map[uint64]*client)
|
||||
|
||||
@@ -281,8 +267,10 @@ func New(opts *Options) *Server {
|
||||
// to shutdown.
|
||||
s.quitCh = make(chan struct{})
|
||||
|
||||
// Used to setup Accounts.
|
||||
s.configureAccounts()
|
||||
// For tracking accounts
|
||||
if err := s.configureAccounts(); err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Used to setup Authorization.
|
||||
s.configureAuthorization()
|
||||
@@ -312,11 +300,35 @@ func (s *Server) setOpts(opts *Options) {
|
||||
s.optsMu.Unlock()
|
||||
}
|
||||
|
||||
func (s *Server) configureAccounts() {
|
||||
func (s *Server) configureAccounts() error {
|
||||
// Used to setup Accounts.
|
||||
if s.accounts == nil {
|
||||
s.accounts = make(map[string]*Account)
|
||||
}
|
||||
// Create global account.
|
||||
if s.gacc == nil {
|
||||
s.gacc = &Account{Name: globalAccountName}
|
||||
s.registerAccount(s.gacc)
|
||||
}
|
||||
|
||||
opts := s.opts
|
||||
|
||||
// Check opts and walk through them. Making sure to create SLs.
|
||||
for _, acc := range s.opts.Accounts {
|
||||
s.registerAccount(acc)
|
||||
}
|
||||
// Check for configured account resolvers.
|
||||
if opts.accResolver != nil {
|
||||
s.accResolver = opts.accResolver
|
||||
}
|
||||
// Check that if we have a SystemAccount it can
|
||||
// be properly resolved.
|
||||
if opts.SystemAccount != _EMPTY_ {
|
||||
if acc := s.lookupAccount(opts.SystemAccount); acc == nil {
|
||||
return ErrMissingAccount
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) generateRouteInfoJSON() {
|
||||
@@ -501,6 +513,9 @@ func (s *Server) RegisterAccount(name string) (*Account, error) {
|
||||
// This sets up a server to send and receive messages from inside
|
||||
// the server itself.
|
||||
func (s *Server) setSystemAccount(acc *Account) error {
|
||||
if acc == nil {
|
||||
return fmt.Errorf("system account is nil")
|
||||
}
|
||||
if !s.isTrustedIssuer(acc.Issuer) {
|
||||
return fmt.Errorf("system account not a trusted account")
|
||||
}
|
||||
@@ -509,29 +524,48 @@ func (s *Server) setSystemAccount(acc *Account) error {
|
||||
s.mu.Unlock()
|
||||
return fmt.Errorf("system account already exists")
|
||||
}
|
||||
|
||||
s.sys = &internal{
|
||||
account: acc,
|
||||
client: &client{srv: s, kind: SYSTEM, opts: defaultOpts, start: time.Now(), last: time.Now()},
|
||||
seq: 1,
|
||||
sid: 1,
|
||||
subs: make(map[string]msgHandler, 8),
|
||||
servers: make(map[string]*serverUpdate),
|
||||
subs: make(map[string]msgHandler),
|
||||
sendq: make(chan *pubMsg, 128),
|
||||
}
|
||||
s.sys.client.initClient()
|
||||
s.sys.client.echo = false
|
||||
s.sys.wg.Add(1)
|
||||
s.mu.Unlock()
|
||||
|
||||
// Start our internal loop to serialize outbound messages.
|
||||
// We do our own wg here since we will stop first during shutdown.
|
||||
go s.internalSendLoop(&s.sys.wg)
|
||||
|
||||
// Register with the account.
|
||||
s.sys.client.registerWithAccount(acc)
|
||||
|
||||
// Start our internal loop to serialize outbound messages.
|
||||
s.startGoRoutine(func() {
|
||||
s.internalSendLoop()
|
||||
})
|
||||
// Start up our general subscriptions
|
||||
s.initEventTracking()
|
||||
|
||||
// Track for dead remote servers.
|
||||
s.startRemoteServerSweepTimer()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) systemAccount() *Account {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.sys == nil {
|
||||
return nil
|
||||
}
|
||||
return s.sys.account
|
||||
}
|
||||
|
||||
// Place common account setup here.
|
||||
// Lock should be held on entry.
|
||||
func (s *Server) registerAccount(acc *Account) {
|
||||
if acc.sl == nil {
|
||||
acc.sl = NewSublist()
|
||||
@@ -561,14 +595,13 @@ func (s *Server) registerAccount(acc *Account) {
|
||||
// we had sent an A- because account did not exist at that time.
|
||||
s.endAccountNoInterestForGateways(acc.Name)
|
||||
}
|
||||
s.enableAccountTracking(acc)
|
||||
}
|
||||
|
||||
// LookupAccount is a public function to return the account structure
|
||||
// associated with name.
|
||||
func (s *Server) LookupAccount(name string) *Account {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// lookupAccount is a function to return the account structure
|
||||
// associated with an account name.
|
||||
// Lock shiould be held on entry.
|
||||
func (s *Server) lookupAccount(name string) *Account {
|
||||
acc := s.accounts[name]
|
||||
if acc != nil {
|
||||
// If we are expired and we have a resolver, then
|
||||
@@ -582,6 +615,14 @@ func (s *Server) LookupAccount(name string) *Account {
|
||||
return s.fetchAccount(name)
|
||||
}
|
||||
|
||||
// LookupAccount is a public function to return the account structure
|
||||
// associated with name.
|
||||
func (s *Server) LookupAccount(name string) *Account {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.lookupAccount(name)
|
||||
}
|
||||
|
||||
// This will fetch new claims and if found update the account with new claims.
|
||||
// Lock should be held upon entry.
|
||||
func (s *Server) updateAccount(acc *Account) bool {
|
||||
@@ -599,8 +640,9 @@ func (s *Server) updateAccount(acc *Account) bool {
|
||||
s.Debugf("Requested account update for [%s], same claims detected", acc.Name)
|
||||
return false
|
||||
}
|
||||
accClaims, err := s.verifyAccountClaims(claimJWT)
|
||||
accClaims, _, err := s.verifyAccountClaims(claimJWT)
|
||||
if err == nil && accClaims != nil {
|
||||
acc.claimJWT = claimJWT
|
||||
s.updateAccountClaims(acc, accClaims)
|
||||
return true
|
||||
}
|
||||
@@ -625,33 +667,34 @@ func (s *Server) fetchRawAccountClaims(name string) (string, error) {
|
||||
}
|
||||
|
||||
// fetchAccountClaims will attempt to fetch new claims if a resolver is present.
|
||||
func (s *Server) fetchAccountClaims(name string) (*jwt.AccountClaims, error) {
|
||||
func (s *Server) fetchAccountClaims(name string) (*jwt.AccountClaims, string, error) {
|
||||
claimJWT, err := s.fetchRawAccountClaims(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, _EMPTY_, err
|
||||
}
|
||||
return s.verifyAccountClaims(claimJWT)
|
||||
}
|
||||
|
||||
// verifyAccountClaims will decode and validate any account claims.
|
||||
func (s *Server) verifyAccountClaims(claimJWT string) (*jwt.AccountClaims, error) {
|
||||
func (s *Server) verifyAccountClaims(claimJWT string) (*jwt.AccountClaims, string, error) {
|
||||
if accClaims, err := jwt.DecodeAccountClaims(claimJWT); err != nil {
|
||||
return nil, err
|
||||
return nil, _EMPTY_, err
|
||||
} else {
|
||||
vr := jwt.CreateValidationResults()
|
||||
accClaims.Validate(vr)
|
||||
if vr.IsBlocking(true) {
|
||||
return nil, ErrAccountValidation
|
||||
return nil, _EMPTY_, ErrAccountValidation
|
||||
}
|
||||
return accClaims, nil
|
||||
return accClaims, claimJWT, nil
|
||||
}
|
||||
}
|
||||
|
||||
// This will fetch an account from a resolver if defined.
|
||||
// Lock should be held upon entry.
|
||||
func (s *Server) fetchAccount(name string) *Account {
|
||||
if accClaims, _ := s.fetchAccountClaims(name); accClaims != nil {
|
||||
if accClaims, claimJWT, _ := s.fetchAccountClaims(name); accClaims != nil {
|
||||
if acc := s.buildInternalAccount(accClaims); acc != nil {
|
||||
acc.claimJWT = claimJWT
|
||||
s.registerAccount(acc)
|
||||
return acc
|
||||
}
|
||||
@@ -698,6 +741,11 @@ func (s *Server) Start() {
|
||||
return
|
||||
}
|
||||
|
||||
// Setup system account which will start eventing stack.
|
||||
if sa := opts.SystemAccount; sa != _EMPTY_ {
|
||||
s.setSystemAccount(s.lookupAccount(sa))
|
||||
}
|
||||
|
||||
// Start up gateway if needed. Do this before starting the routes, because
|
||||
// we want to resolve the gateway host:port so that this information can
|
||||
// be sent to other routes.
|
||||
@@ -732,6 +780,12 @@ func (s *Server) Start() {
|
||||
// Shutdown will shutdown the server instance by kicking out the AcceptLoop
|
||||
// and closing all associated clients.
|
||||
func (s *Server) Shutdown() {
|
||||
// Shutdown the eventing system as needed.
|
||||
// This is done first to send out any messages for
|
||||
// account status. We will also clean up any
|
||||
// eventing items associated with accounts.
|
||||
s.shutdownEventing()
|
||||
|
||||
s.mu.Lock()
|
||||
// Prevent issues with multiple calls.
|
||||
if s.shutdown {
|
||||
|
||||
@@ -78,8 +78,6 @@ func TestNewRouteConnectSubs(t *testing.T) {
|
||||
|
||||
info := checkInfoMsg(t, rc)
|
||||
|
||||
// Send our info back with a larger number of accounts, should trigger to send the
|
||||
// subscriptions for their accounts.
|
||||
info.ID = routeID
|
||||
b, err := json.Marshal(info)
|
||||
if err != nil {
|
||||
@@ -124,6 +122,80 @@ func TestNewRouteConnectSubs(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewRouteConnectSubsWithAccount(t *testing.T) {
|
||||
s, opts := runNewRouteServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
accName := "$FOO"
|
||||
s.RegisterAccount(accName)
|
||||
|
||||
c := createClientConn(t, opts.Host, opts.Port)
|
||||
defer c.Close()
|
||||
|
||||
send, expect := setupConnWithAccount(t, c, accName)
|
||||
|
||||
// Create 10 normal subs and 10 queue subscribers.
|
||||
for i := 0; i < 10; i++ {
|
||||
send(fmt.Sprintf("SUB foo %d\r\n", i))
|
||||
send(fmt.Sprintf("SUB foo bar %d\r\n", 100+i))
|
||||
}
|
||||
send("PING\r\n")
|
||||
expect(pongRe)
|
||||
|
||||
// This client should not be considered active since no subscriptions or
|
||||
// messages have been published.
|
||||
rc := createRouteConn(t, opts.Cluster.Host, opts.Cluster.Port)
|
||||
defer rc.Close()
|
||||
|
||||
routeID := "RTEST_NEW:22"
|
||||
routeSend, routeExpect := setupRouteEx(t, rc, opts, routeID)
|
||||
|
||||
info := checkInfoMsg(t, rc)
|
||||
|
||||
info.ID = routeID
|
||||
b, err := json.Marshal(info)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not marshal test route info: %v", err)
|
||||
}
|
||||
routeSend(fmt.Sprintf("INFO %s\r\n", b))
|
||||
|
||||
buf := routeExpect(rsubRe)
|
||||
|
||||
matches := rsubRe.FindAllSubmatch(buf, -1)
|
||||
if len(matches) != 2 {
|
||||
t.Fatalf("Expected 2 results, got %d", len(matches))
|
||||
}
|
||||
for _, m := range matches {
|
||||
if string(m[1]) != accName {
|
||||
t.Fatalf("Expected global account name of %q, got %q", accName, m[1])
|
||||
}
|
||||
if string(m[2]) != "foo" {
|
||||
t.Fatalf("Expected subject of 'foo', got %q", m[2])
|
||||
}
|
||||
if m[3] != nil {
|
||||
if string(m[3]) != "bar" {
|
||||
t.Fatalf("Expected group of 'bar', got %q", m[3])
|
||||
}
|
||||
// Expect the SID to be the total weighted count for the queue group
|
||||
if len(m) != 5 {
|
||||
t.Fatalf("Expected a SID for the queue group")
|
||||
}
|
||||
if m[4] == nil || string(m[4]) != "10" {
|
||||
t.Fatalf("Expected SID of '10', got %q", m[4])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close the client connection, check the results.
|
||||
c.Close()
|
||||
|
||||
// Expect 2
|
||||
for numUnSubs := 0; numUnSubs != 2; {
|
||||
buf := routeExpect(runsubRe)
|
||||
numUnSubs += len(runsubRe.FindAllSubmatch(buf, -1))
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewRouteRSubs(t *testing.T) {
|
||||
s, opts := runNewRouteServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
Reference in New Issue
Block a user