mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
System level services for debugging.
This is the first pass at introducing exported services to the system account for generally debugging of blackbox systems. The first service reports number of subscribers for a given subject. The payload of the request is the subject, and optional queue group, and can contain wildcards. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -189,6 +189,60 @@ func (a *Account) shallowCopy() *Account {
|
||||
return na
|
||||
}
|
||||
|
||||
// Called to track a remote server and connections and leafnodes it
|
||||
// has for this account.
|
||||
func (a *Account) updateRemoteServer(m *AccountNumConns) {
|
||||
a.mu.Lock()
|
||||
if a.strack == nil {
|
||||
a.strack = make(map[string]sconns)
|
||||
}
|
||||
// This does not depend on receiving all updates since each one is idempotent.
|
||||
// FIXME(dlc) - We should cleanup when these both go to zero.
|
||||
prev := a.strack[m.Server.ID]
|
||||
a.strack[m.Server.ID] = sconns{conns: int32(m.Conns), leafs: int32(m.LeafNodes)}
|
||||
a.nrclients += int32(m.Conns) - prev.conns
|
||||
a.nrleafs += int32(m.LeafNodes) - prev.leafs
|
||||
a.mu.Unlock()
|
||||
}
|
||||
|
||||
// Removes tracking for a remote server that has shutdown.
|
||||
func (a *Account) removeRemoteServer(sid string) {
|
||||
a.mu.Lock()
|
||||
if a.strack != nil {
|
||||
prev := a.strack[sid]
|
||||
delete(a.strack, sid)
|
||||
a.nrclients -= prev.conns
|
||||
a.nrleafs -= prev.leafs
|
||||
}
|
||||
a.mu.Unlock()
|
||||
}
|
||||
|
||||
// When querying for subject interest this is the number of
|
||||
// expected responses. We need to actually check that the entry
|
||||
// has active connections.
|
||||
func (a *Account) expectedRemoteResponses() (expected int32) {
|
||||
a.mu.RLock()
|
||||
for _, sc := range a.strack {
|
||||
if sc.conns > 0 || sc.leafs > 0 {
|
||||
expected++
|
||||
}
|
||||
}
|
||||
a.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
// Clears eventing and tracking for this account.
|
||||
func (a *Account) clearEventing() {
|
||||
a.mu.Lock()
|
||||
a.nrclients = 0
|
||||
// Now clear state
|
||||
clearTimer(&a.etmr)
|
||||
clearTimer(&a.ctmr)
|
||||
a.clients = nil
|
||||
a.strack = nil
|
||||
a.mu.Unlock()
|
||||
}
|
||||
|
||||
// NumConnections returns active number of clients for this account for
|
||||
// all known servers.
|
||||
func (a *Account) NumConnections() int {
|
||||
@@ -198,6 +252,15 @@ func (a *Account) NumConnections() int {
|
||||
return nc
|
||||
}
|
||||
|
||||
// NumRemoteConnections returns the number of client or leaf connections that
|
||||
// are not on this server.
|
||||
func (a *Account) NumRemoteConnections() int {
|
||||
a.mu.RLock()
|
||||
nc := int(a.nrclients + a.nrleafs)
|
||||
a.mu.RUnlock()
|
||||
return nc
|
||||
}
|
||||
|
||||
// NumLocalConnections returns active number of clients for this account
|
||||
// on this server.
|
||||
func (a *Account) NumLocalConnections() int {
|
||||
@@ -212,6 +275,15 @@ func (a *Account) numLocalConnections() int {
|
||||
return len(a.clients) - int(a.sysclients) - int(a.nleafs)
|
||||
}
|
||||
|
||||
// This is for extended local interest.
|
||||
// Lock should not be held.
|
||||
func (a *Account) numLocalAndLeafConnections() int {
|
||||
a.mu.RLock()
|
||||
nlc := len(a.clients) - int(a.sysclients)
|
||||
a.mu.RUnlock()
|
||||
return nlc
|
||||
}
|
||||
|
||||
func (a *Account) numLocalLeafNodes() int {
|
||||
return int(a.nleafs)
|
||||
}
|
||||
@@ -613,8 +685,9 @@ func (a *Account) sendTrackingLatency(si *serviceImport, requestor, responder *c
|
||||
// numServiceRoutes returns the number of service routes on this account.
|
||||
func (a *Account) numServiceRoutes() int {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
return len(a.imports.services)
|
||||
num := len(a.imports.services)
|
||||
a.mu.RUnlock()
|
||||
return num
|
||||
}
|
||||
|
||||
// AddServiceImportWithClaim will add in the service import via the jwt claim.
|
||||
|
||||
@@ -460,6 +460,8 @@ func (s *Server) isClientAuthorized(c *client) bool {
|
||||
// for pub/sub authorizations.
|
||||
if ok {
|
||||
c.RegisterUser(user)
|
||||
// Generate an event if we have a system account and this is not the $G account.
|
||||
s.accountConnectEvent(c)
|
||||
}
|
||||
return ok
|
||||
}
|
||||
@@ -581,13 +583,14 @@ func (s *Server) isLeafNodeAuthorized(c *client) bool {
|
||||
|
||||
// Grab under lock but process after.
|
||||
var (
|
||||
juc *jwt.UserClaims
|
||||
acc *Account
|
||||
err error
|
||||
juc *jwt.UserClaims
|
||||
acc *Account
|
||||
user *User
|
||||
ok bool
|
||||
err error
|
||||
)
|
||||
|
||||
s.mu.Lock()
|
||||
|
||||
// Check if we have trustedKeys defined in the server. If so we require a user jwt.
|
||||
if s.trustedKeys != nil {
|
||||
if c.opts.JWT == "" {
|
||||
@@ -609,6 +612,14 @@ func (s *Server) isLeafNodeAuthorized(c *client) bool {
|
||||
c.Debugf("User JWT no longer valid: %+v", vr)
|
||||
return false
|
||||
}
|
||||
} else if s.users != nil {
|
||||
if c.opts.Username != "" {
|
||||
user, ok = s.users[c.opts.Username]
|
||||
if !ok {
|
||||
s.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
@@ -672,6 +683,18 @@ func (s *Server) isLeafNodeAuthorized(c *client) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
if user != nil {
|
||||
ok = comparePasswords(user.Password, c.opts.Password)
|
||||
// If we are authorized, register the user which will properly setup any permissions
|
||||
// for pub/sub authorizations.
|
||||
if ok {
|
||||
c.RegisterUser(user)
|
||||
// Generate an event if we have a system account and this is not the $G account.
|
||||
s.accountConnectEvent(c)
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
// FIXME(dlc) - Add ability to support remote account bindings via
|
||||
// other auth like user or nkey and tlsMapping.
|
||||
|
||||
|
||||
@@ -1704,7 +1704,7 @@ func splitArg(arg []byte) [][]byte {
|
||||
return args
|
||||
}
|
||||
|
||||
func (c *client) processSub(argo []byte) (err error) {
|
||||
func (c *client) processSub(argo []byte, noForward bool) (*subscription, error) {
|
||||
c.traceInOp("SUB", argo)
|
||||
|
||||
// Indicate activity.
|
||||
@@ -1726,7 +1726,7 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
sub.queue = args[1]
|
||||
sub.sid = args[2]
|
||||
default:
|
||||
return fmt.Errorf("processSub Parse Error: '%s'", arg)
|
||||
return nil, fmt.Errorf("processSub Parse Error: '%s'", arg)
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
@@ -1740,7 +1740,7 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
|
||||
if c.nc == nil && kind != SYSTEM {
|
||||
c.mu.Unlock()
|
||||
return nil
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
// Check permissions if applicable.
|
||||
@@ -1749,17 +1749,18 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
|
||||
c.Errorf("Subscription Violation - %s, Subject %q, SID %s",
|
||||
c.getAuthUser(), sub.subject, sub.sid)
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Check if we have a maximum on the number of subscriptions.
|
||||
if c.subsAtLimit() {
|
||||
c.mu.Unlock()
|
||||
c.maxSubsExceeded()
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
updateGWs := false
|
||||
var updateGWs bool
|
||||
var err error
|
||||
|
||||
// Subscribe here.
|
||||
if c.subs[sid] == nil {
|
||||
@@ -1778,19 +1779,24 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
|
||||
if err != nil {
|
||||
c.sendErr("Invalid Subject")
|
||||
return nil
|
||||
return nil, nil
|
||||
} else if c.opts.Verbose && kind != SYSTEM {
|
||||
c.sendOK()
|
||||
}
|
||||
|
||||
// No account just return.
|
||||
if acc == nil {
|
||||
return nil
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
if err := c.addShadowSubscriptions(acc, sub); err != nil {
|
||||
c.Errorf(err.Error())
|
||||
}
|
||||
|
||||
if noForward {
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
// If we are routing and this is a local sub, add to the route map for the associated account.
|
||||
if kind == CLIENT || kind == SYSTEM {
|
||||
srv.updateRouteSubscriptionMap(acc, sub, 1)
|
||||
@@ -1800,7 +1806,7 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
}
|
||||
// Now check on leafnode updates.
|
||||
srv.updateLeafNodes(acc, sub, 1)
|
||||
return nil
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
// If the client's account has stream imports and there are matches for
|
||||
@@ -2204,7 +2210,7 @@ func (c *client) deliverMsg(sub *subscription, subject, mh, msg []byte) bool {
|
||||
if client.kind == SYSTEM {
|
||||
s := client.srv
|
||||
client.mu.Unlock()
|
||||
s.deliverInternalMsg(sub, subject, c.pa.reply, msg[:msgSize])
|
||||
s.deliverInternalMsg(sub, c, subject, c.pa.reply, msg[:msgSize])
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -2389,7 +2395,7 @@ func (c *client) pubAllowedFullCheck(subject string, fullCheck bool) bool {
|
||||
}
|
||||
|
||||
// If we are currently not allowed but we are tracking reply subjects
|
||||
// dynamically, check to see if we are allowed here Avoid pcache.
|
||||
// dynamically, check to see if we are allowed here but avoid pcache.
|
||||
// We need to acquire the lock though.
|
||||
if !allowed && fullCheck && c.perms.resp != nil {
|
||||
c.mu.Lock()
|
||||
@@ -2516,7 +2522,7 @@ func (c *client) processInboundClientMsg(msg []byte) {
|
||||
|
||||
// If we have an exported service and we are doing remote tracking, check this subject
|
||||
// to see if we need to report the latency.
|
||||
if c.acc.exports.services != nil && c.rrTracking != nil {
|
||||
if c.rrTracking != nil {
|
||||
c.mu.Lock()
|
||||
rl := c.rrTracking[string(c.pa.subject)]
|
||||
if rl != nil {
|
||||
|
||||
@@ -124,6 +124,9 @@ var (
|
||||
|
||||
// ErrRevocation is returned when a credential has been revoked.
|
||||
ErrRevocation = errors.New("credentials have been revoked")
|
||||
|
||||
// Used to signal an error that a server is not running.
|
||||
ErrServerNotRunning = errors.New("server is not running")
|
||||
)
|
||||
|
||||
// configErr is a configuration error.
|
||||
|
||||
382
server/events.go
382
server/events.go
@@ -18,13 +18,13 @@ import (
|
||||
"crypto/sha256"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/jwt"
|
||||
"github.com/nats-io/nats-server/v2/server/pse"
|
||||
)
|
||||
|
||||
@@ -42,33 +42,42 @@ const (
|
||||
serverStatsPingReqSubj = "$SYS.REQ.SERVER.PING"
|
||||
leafNodeConnectEventSubj = "$SYS.ACCOUNT.%s.LEAFNODE.CONNECT"
|
||||
remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s"
|
||||
inboxRespSubj = "$SYS._INBOX.%s.%s"
|
||||
|
||||
// FIXME(dlc) - Should account scope, even with wc for now, but later on
|
||||
// we can then shard as needed.
|
||||
accNumSubsReqSubj = "$SYS.REQ.ACCOUNT.NSUBS"
|
||||
|
||||
// These are for exported debug services. These are local to this server only.
|
||||
accSubsSubj = "$SYS.DEBUG.SUBSCRIBERS"
|
||||
|
||||
shutdownEventTokens = 4
|
||||
serverSubjectIndex = 2
|
||||
accUpdateTokens = 5
|
||||
accUpdateAccIndex = 2
|
||||
defaultEventsHBItvl = 30 * time.Second
|
||||
)
|
||||
|
||||
// FIXME(dlc) - make configurable.
|
||||
var eventsHBInterval = defaultEventsHBItvl
|
||||
var eventsHBInterval = 30 * time.Second
|
||||
|
||||
// Used to send and receive messages from inside the server.
|
||||
type internal struct {
|
||||
account *Account
|
||||
client *client
|
||||
seq uint64
|
||||
sid uint64
|
||||
servers map[string]*serverUpdate
|
||||
sweeper *time.Timer
|
||||
stmr *time.Timer
|
||||
subs map[string]msgHandler
|
||||
sendq chan *pubMsg
|
||||
wg sync.WaitGroup
|
||||
orphMax time.Duration
|
||||
chkOrph time.Duration
|
||||
statsz time.Duration
|
||||
shash string
|
||||
account *Account
|
||||
client *client
|
||||
seq uint64
|
||||
sid uint64
|
||||
servers map[string]*serverUpdate
|
||||
sweeper *time.Timer
|
||||
stmr *time.Timer
|
||||
subs map[string]msgHandler
|
||||
replies map[string]msgHandler
|
||||
sendq chan *pubMsg
|
||||
wg sync.WaitGroup
|
||||
orphMax time.Duration
|
||||
chkOrph time.Duration
|
||||
statsz time.Duration
|
||||
shash string
|
||||
inboxPre string
|
||||
}
|
||||
|
||||
// ServerStatsMsg is sent periodically with stats updates.
|
||||
@@ -280,6 +289,7 @@ func (s *Server) sendShutdownEvent() {
|
||||
s.sys.sendq = nil
|
||||
// Unhook all msgHandlers. Normal client cleanup will deal with subs, etc.
|
||||
s.sys.subs = nil
|
||||
s.sys.replies = nil
|
||||
s.mu.Unlock()
|
||||
// Send to the internal queue and mark as last.
|
||||
sendq <- &pubMsg{nil, subj, _EMPTY_, nil, nil, true}
|
||||
@@ -332,8 +342,9 @@ func (s *Server) eventsRunning() bool {
|
||||
// a defined system account.
|
||||
func (s *Server) EventsEnabled() bool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.eventsEnabled()
|
||||
ee := s.eventsEnabled()
|
||||
s.mu.Unlock()
|
||||
return ee
|
||||
}
|
||||
|
||||
// eventsEnabled will report if events are enabled.
|
||||
@@ -342,6 +353,18 @@ func (s *Server) eventsEnabled() bool {
|
||||
return s.sys != nil && s.sys.client != nil && s.sys.account != nil
|
||||
}
|
||||
|
||||
// TrackedRemoteServers returns how many remote servers we are tracking
|
||||
// from a system events perspective.
|
||||
func (s *Server) TrackedRemoteServers() int {
|
||||
s.mu.Lock()
|
||||
if !s.running || !s.eventsEnabled() {
|
||||
return -1
|
||||
}
|
||||
ns := len(s.sys.servers)
|
||||
s.mu.Unlock()
|
||||
return ns
|
||||
}
|
||||
|
||||
// Check for orphan servers who may have gone away without notification.
|
||||
// This should be wrapChk() to setup common locking.
|
||||
func (s *Server) checkRemoteServers() {
|
||||
@@ -479,7 +502,14 @@ func (s *Server) initEventTracking() {
|
||||
sha.Write([]byte(s.info.ID))
|
||||
s.sys.shash = fmt.Sprintf("%x", sha.Sum(nil))[:sysHashLen]
|
||||
|
||||
subject := fmt.Sprintf(accConnsEventSubj, "*")
|
||||
// This will be for all inbox responses.
|
||||
subject := fmt.Sprintf(inboxRespSubj, s.sys.shash, "*")
|
||||
if _, err := s.sysSubscribe(subject, s.inboxReply); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
s.sys.inboxPre = subject
|
||||
// This is for remote updates for connection accounting.
|
||||
subject = fmt.Sprintf(accConnsEventSubj, "*")
|
||||
if _, err := s.sysSubscribe(subject, s.remoteConnsUpdate); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
@@ -493,6 +523,10 @@ func (s *Server) initEventTracking() {
|
||||
if _, err := s.sysSubscribe(subject, s.connsRequest); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
// Listen for broad requests to respond with number of subscriptions for a given subject.
|
||||
if _, err := s.sysSubscribe(accNumSubsReqSubj, s.nsubsRequest); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
// Listen for all server shutdowns.
|
||||
subject = fmt.Sprintf(shutdownEventSubj, "*")
|
||||
if _, err := s.sysSubscribe(subject, s.remoteServerShutdown); err != nil {
|
||||
@@ -518,15 +552,26 @@ func (s *Server) initEventTracking() {
|
||||
if _, err := s.sysSubscribe(subject, s.leafNodeConnected); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
// For tracking remote lateny measurements.
|
||||
// For tracking remote latency measurements.
|
||||
subject = fmt.Sprintf(remoteLatencyEventSubj, s.sys.shash)
|
||||
if _, err := s.sysSubscribe(subject, s.remoteLatencyUpdate); err != nil {
|
||||
s.Errorf("Error setting up internal latency tracking: %v", err)
|
||||
}
|
||||
|
||||
// These are for system account exports for debugging from client applications.
|
||||
sacc := s.sys.account
|
||||
|
||||
// This is for simple debugging of number of subscribers that exist in the system.
|
||||
if _, err := s.sysSubscribeInternal(accSubsSubj, s.debugSubscribers); err != nil {
|
||||
s.Errorf("Error setting up internal debug service for subscribers: %v", err)
|
||||
}
|
||||
if err := sacc.AddServiceExport(accSubsSubj, nil); err != nil {
|
||||
s.Errorf("Error adding system service export for %q: %v", accSubsSubj, err)
|
||||
}
|
||||
}
|
||||
|
||||
// accountClaimUpdate will receive claim updates for accounts.
|
||||
func (s *Server) accountClaimUpdate(sub *subscription, subject, reply string, msg []byte) {
|
||||
func (s *Server) accountClaimUpdate(sub *subscription, _ *client, subject, reply string, msg []byte) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if !s.eventsEnabled() {
|
||||
@@ -547,19 +592,13 @@ func (s *Server) accountClaimUpdate(sub *subscription, subject, reply string, ms
|
||||
// Lock assume held.
|
||||
func (s *Server) processRemoteServerShutdown(sid string) {
|
||||
s.accounts.Range(func(k, v interface{}) bool {
|
||||
a := v.(*Account)
|
||||
a.mu.Lock()
|
||||
prev := a.strack[sid]
|
||||
delete(a.strack, sid)
|
||||
a.nrclients -= prev.conns
|
||||
a.nrleafs -= prev.leafs
|
||||
a.mu.Unlock()
|
||||
v.(*Account).removeRemoteServer(sid)
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
// remoteServerShutdownEvent is called when we get an event from another server shutting down.
|
||||
func (s *Server) remoteServerShutdown(sub *subscription, subject, reply string, msg []byte) {
|
||||
func (s *Server) remoteServerShutdown(sub *subscription, _ *client, subject, reply string, msg []byte) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if !s.eventsEnabled() {
|
||||
@@ -617,15 +656,7 @@ func (s *Server) shutdownEventing() {
|
||||
|
||||
// Whip through all accounts.
|
||||
s.accounts.Range(func(k, v interface{}) bool {
|
||||
a := v.(*Account)
|
||||
a.mu.Lock()
|
||||
a.nrclients = 0
|
||||
// Now clear state
|
||||
clearTimer(&a.etmr)
|
||||
clearTimer(&a.ctmr)
|
||||
a.clients = nil
|
||||
a.strack = nil
|
||||
a.mu.Unlock()
|
||||
v.(*Account).clearEventing()
|
||||
return true
|
||||
})
|
||||
// Turn everything off here.
|
||||
@@ -633,7 +664,7 @@ func (s *Server) shutdownEventing() {
|
||||
}
|
||||
|
||||
// Request for our local connection count.
|
||||
func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []byte) {
|
||||
func (s *Server) connsRequest(sub *subscription, _ *client, subject, reply string, msg []byte) {
|
||||
if !s.eventsRunning() {
|
||||
return
|
||||
}
|
||||
@@ -655,7 +686,7 @@ func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []by
|
||||
|
||||
// leafNodeConnected is an event we will receive when a leaf node for a given account
|
||||
// connects.
|
||||
func (s *Server) leafNodeConnected(sub *subscription, subject, reply string, msg []byte) {
|
||||
func (s *Server) leafNodeConnected(sub *subscription, _ *client, subject, reply string, msg []byte) {
|
||||
m := accNumConnsReq{}
|
||||
if err := json.Unmarshal(msg, &m); err != nil {
|
||||
s.sys.client.Errorf("Error unmarshalling account connections request message: %v", err)
|
||||
@@ -676,7 +707,7 @@ func (s *Server) leafNodeConnected(sub *subscription, subject, reply string, msg
|
||||
}
|
||||
|
||||
// statszReq is a request for us to respond with current statz.
|
||||
func (s *Server) statszReq(sub *subscription, subject, reply string, msg []byte) {
|
||||
func (s *Server) statszReq(sub *subscription, _ *client, subject, reply string, msg []byte) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if !s.eventsEnabled() || reply == _EMPTY_ {
|
||||
@@ -686,7 +717,7 @@ func (s *Server) statszReq(sub *subscription, subject, reply string, msg []byte)
|
||||
}
|
||||
|
||||
// remoteConnsUpdate gets called when we receive a remote update from another server.
|
||||
func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg []byte) {
|
||||
func (s *Server) remoteConnsUpdate(sub *subscription, _ *client, subject, reply string, msg []byte) {
|
||||
if !s.eventsRunning() {
|
||||
return
|
||||
}
|
||||
@@ -717,17 +748,7 @@ func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg
|
||||
return
|
||||
}
|
||||
// If we are here we have interest in tracking this account. Update our accounting.
|
||||
acc.mu.Lock()
|
||||
if acc.strack == nil {
|
||||
acc.strack = make(map[string]sconns)
|
||||
}
|
||||
// This does not depend on receiving all updates since each one is idempotent.
|
||||
prev := acc.strack[m.Server.ID]
|
||||
acc.strack[m.Server.ID] = sconns{conns: int32(m.Conns), leafs: int32(m.LeafNodes)}
|
||||
acc.nrclients += int32(m.Conns) - prev.conns
|
||||
acc.nrleafs += int32(m.LeafNodes) - prev.leafs
|
||||
acc.mu.Unlock()
|
||||
|
||||
acc.updateRemoteServer(&m)
|
||||
s.updateRemoteServer(&m.Server)
|
||||
}
|
||||
|
||||
@@ -775,12 +796,6 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
|
||||
}
|
||||
a.mu.RLock()
|
||||
|
||||
// If no limits set, don't update, no need to.
|
||||
if a.mconns == jwt.NoLimit && a.mleafs == jwt.NoLimit {
|
||||
a.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
// Build event with account name and number of local clients and leafnodes.
|
||||
m := AccountNumConns{
|
||||
Account: a.Name,
|
||||
@@ -823,15 +838,20 @@ func (s *Server) accConnsUpdate(a *Account) {
|
||||
// This is a billing event.
|
||||
func (s *Server) accountConnectEvent(c *client) {
|
||||
s.mu.Lock()
|
||||
gacc := s.gacc
|
||||
if !s.eventsEnabled() {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
subj := fmt.Sprintf(connectEventSubj, c.acc.Name)
|
||||
|
||||
c.mu.Lock()
|
||||
// Ignore global account activity
|
||||
if c.acc == nil || c.acc == gacc {
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
m := ConnectEventMsg{
|
||||
Client: ClientInfo{
|
||||
Start: c.start,
|
||||
@@ -846,6 +866,7 @@ func (s *Server) accountConnectEvent(c *client) {
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
subj := fmt.Sprintf(connectEventSubj, c.acc.Name)
|
||||
s.sendInternalMsgLocked(subj, _EMPTY_, &m.Server, &m)
|
||||
}
|
||||
|
||||
@@ -894,7 +915,6 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
|
||||
c.mu.Unlock()
|
||||
|
||||
subj := fmt.Sprintf(disconnectEventSubj, c.acc.Name)
|
||||
|
||||
s.sendInternalMsgLocked(subj, _EMPTY_, &m.Server, &m)
|
||||
}
|
||||
|
||||
@@ -940,9 +960,9 @@ func (s *Server) sendAuthErrorEvent(c *client) {
|
||||
|
||||
// Internal message callback. If the msg is needed past the callback it is
|
||||
// required to be copied.
|
||||
type msgHandler func(sub *subscription, subject, reply string, msg []byte)
|
||||
type msgHandler func(sub *subscription, client *client, subject, reply string, msg []byte)
|
||||
|
||||
func (s *Server) deliverInternalMsg(sub *subscription, subject, reply, msg []byte) {
|
||||
func (s *Server) deliverInternalMsg(sub *subscription, c *client, subject, reply, msg []byte) {
|
||||
s.mu.Lock()
|
||||
if !s.eventsEnabled() || s.sys.subs == nil {
|
||||
s.mu.Unlock()
|
||||
@@ -951,12 +971,21 @@ func (s *Server) deliverInternalMsg(sub *subscription, subject, reply, msg []byt
|
||||
cb := s.sys.subs[string(sub.sid)]
|
||||
s.mu.Unlock()
|
||||
if cb != nil {
|
||||
cb(sub, string(subject), string(reply), msg)
|
||||
cb(sub, c, string(subject), string(reply), msg)
|
||||
}
|
||||
}
|
||||
|
||||
// Create an internal subscription. No support for queue groups atm.
|
||||
func (s *Server) sysSubscribe(subject string, cb msgHandler) (*subscription, error) {
|
||||
return s.systemSubscribe(subject, false, cb)
|
||||
}
|
||||
|
||||
// Create an internal subscription but do not forward interest.
|
||||
func (s *Server) sysSubscribeInternal(subject string, cb msgHandler) (*subscription, error) {
|
||||
return s.systemSubscribe(subject, true, cb)
|
||||
}
|
||||
|
||||
func (s *Server) systemSubscribe(subject string, internalOnly bool, cb msgHandler) (*subscription, error) {
|
||||
if !s.eventsEnabled() {
|
||||
return nil, ErrNoSysAccount
|
||||
}
|
||||
@@ -971,13 +1000,7 @@ func (s *Server) sysSubscribe(subject string, cb msgHandler) (*subscription, err
|
||||
s.mu.Unlock()
|
||||
|
||||
// Now create the subscription
|
||||
if err := c.processSub([]byte(subject + " " + sid)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.mu.Lock()
|
||||
sub := c.subs[sid]
|
||||
c.mu.Unlock()
|
||||
return sub, nil
|
||||
return c.processSub([]byte(subject+" "+sid), internalOnly)
|
||||
}
|
||||
|
||||
func (s *Server) sysUnsubscribe(sub *subscription) {
|
||||
@@ -1003,7 +1026,7 @@ func remoteLatencySubjectForResponse(subject []byte) string {
|
||||
}
|
||||
|
||||
// remoteLatencyUpdate is used to track remote latency measurements for tracking on exported services.
|
||||
func (s *Server) remoteLatencyUpdate(sub *subscription, subject, _ string, msg []byte) {
|
||||
func (s *Server) remoteLatencyUpdate(sub *subscription, _ *client, subject, _ string, msg []byte) {
|
||||
if !s.eventsRunning() {
|
||||
return
|
||||
}
|
||||
@@ -1060,6 +1083,217 @@ func (s *Server) remoteLatencyUpdate(sub *subscription, subject, _ string, msg [
|
||||
s.sendInternalAccountMsg(acc, lsub, &m1)
|
||||
}
|
||||
|
||||
// This is used for all inbox replies so that we do not send supercluster wide interest
|
||||
// updates for every request. Same trick used in modern NATS clients.
|
||||
func (s *Server) inboxReply(sub *subscription, c *client, subject, reply string, msg []byte) {
|
||||
s.mu.Lock()
|
||||
if !s.eventsEnabled() || s.sys.replies == nil {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
cb, ok := s.sys.replies[subject]
|
||||
s.mu.Unlock()
|
||||
|
||||
if ok && cb != nil {
|
||||
cb(sub, c, subject, reply, msg)
|
||||
}
|
||||
}
|
||||
|
||||
// Copied from go client.
|
||||
// We could use serviceReply here instead to save some code.
|
||||
// I prefer these semantics for the moment, when tracing you know
|
||||
// what this is.
|
||||
const (
|
||||
InboxPrefix = "$SYS._INBOX."
|
||||
inboxPrefixLen = len(InboxPrefix)
|
||||
respInboxPrefixLen = inboxPrefixLen + sysHashLen + 1
|
||||
replySuffixLen = 8 // Gives us 62^8
|
||||
)
|
||||
|
||||
// Creates an internal inbox used for replies that will be processed by the global wc handler.
|
||||
func (s *Server) newRespInbox() string {
|
||||
var b [respInboxPrefixLen + replySuffixLen]byte
|
||||
pres := b[:respInboxPrefixLen]
|
||||
copy(pres, s.sys.inboxPre)
|
||||
rn := rand.Int63()
|
||||
for i, l := respInboxPrefixLen, rn; i < len(b); i++ {
|
||||
b[i] = digits[l%base]
|
||||
l /= base
|
||||
}
|
||||
return string(b[:])
|
||||
}
|
||||
|
||||
// accNumSubsReq is sent when we need to gather remote info on subs.
|
||||
type accNumSubsReq struct {
|
||||
Account string `json:"acc"`
|
||||
Subject string `json:"subject"`
|
||||
Queue []byte `json:"queue,omitempty"`
|
||||
}
|
||||
|
||||
// helper function to total information from results to count subs.
|
||||
func totalSubs(rr *SublistResult, qg []byte) (nsubs int32) {
|
||||
if rr == nil {
|
||||
return
|
||||
}
|
||||
checkSub := func(sub *subscription) {
|
||||
// TODO(dlc) - This could be smarter.
|
||||
if qg != nil && !bytes.Equal(qg, sub.queue) {
|
||||
return
|
||||
}
|
||||
if sub.client.kind == CLIENT || sub.client.isUnsolicitedLeafNode() {
|
||||
nsubs++
|
||||
}
|
||||
}
|
||||
if qg == nil {
|
||||
for _, sub := range rr.psubs {
|
||||
checkSub(sub)
|
||||
}
|
||||
}
|
||||
for _, qsub := range rr.qsubs {
|
||||
for _, sub := range qsub {
|
||||
checkSub(sub)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Allows users of large systems to debug active subscribers for a given subject.
|
||||
// Payload should be the subject of interest.
|
||||
func (s *Server) debugSubscribers(sub *subscription, c *client, subject, reply string, msg []byte) {
|
||||
// Even though this is an internal only subscription, meaning interest was not forwarded, we could
|
||||
// get one here from a GW in optimistic mode. Ignore for now.
|
||||
// FIXME(dlc) - Should we send no interest here back to the GW?
|
||||
if c.kind != CLIENT {
|
||||
return
|
||||
}
|
||||
|
||||
var nsubs int32
|
||||
|
||||
// We could have a single subject or we could have a subject and a wildcard separated by whitespace.
|
||||
args := strings.Split(strings.TrimSpace(string(msg)), " ")
|
||||
if len(args) == 0 {
|
||||
s.sendInternalAccountMsg(c.acc, reply, 0)
|
||||
return
|
||||
}
|
||||
|
||||
tsubj := args[0]
|
||||
var qgroup []byte
|
||||
if len(args) > 1 {
|
||||
qgroup = []byte(args[1])
|
||||
}
|
||||
|
||||
if subjectIsLiteral(tsubj) {
|
||||
// We will look up subscribers locally first then determine if we need to solicit other servers.
|
||||
rr := c.acc.sl.Match(tsubj)
|
||||
nsubs = totalSubs(rr, qgroup)
|
||||
} else {
|
||||
// We have a wildcard, so this is a bit slower path.
|
||||
var _subs [32]*subscription
|
||||
subs := _subs[:0]
|
||||
c.acc.sl.All(&subs)
|
||||
for _, sub := range subs {
|
||||
if subjectIsSubsetMatch(string(sub.subject), tsubj) {
|
||||
if qgroup != nil && !bytes.Equal(qgroup, sub.queue) {
|
||||
continue
|
||||
}
|
||||
if sub.client.kind == CLIENT || sub.client.isUnsolicitedLeafNode() {
|
||||
nsubs++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We should have an idea of how many responses to expect from remote servers.
|
||||
var expected = c.acc.expectedRemoteResponses()
|
||||
|
||||
// If we are only local, go ahead and return.
|
||||
if expected == 0 {
|
||||
s.sendInternalAccountMsg(c.acc, reply, nsubs)
|
||||
return
|
||||
}
|
||||
|
||||
// We need to solicit from others.
|
||||
// To track status.
|
||||
responses := int32(0)
|
||||
done := make(chan (bool))
|
||||
|
||||
s.mu.Lock()
|
||||
// Create direct reply inbox that we multiplex under the WC replies.
|
||||
replySubj := s.newRespInbox()
|
||||
// Store our handler.
|
||||
s.sys.replies[replySubj] = func(sub *subscription, _ *client, subject, _ string, msg []byte) {
|
||||
if n, err := strconv.Atoi(string(msg)); err == nil {
|
||||
atomic.AddInt32(&nsubs, int32(n))
|
||||
}
|
||||
if atomic.AddInt32(&responses, 1) >= expected {
|
||||
select {
|
||||
case done <- true:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
// Send the request to the other servers.
|
||||
request := &accNumSubsReq{
|
||||
Account: c.acc.Name,
|
||||
Subject: tsubj,
|
||||
Queue: qgroup,
|
||||
}
|
||||
s.sendInternalMsg(accNumSubsReqSubj, replySubj, nil, request)
|
||||
s.mu.Unlock()
|
||||
|
||||
// FIXME(dlc) - We should rate limit here instead of blind Go routine.
|
||||
go func() {
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
}
|
||||
// Cleanup the WC entry.
|
||||
s.mu.Lock()
|
||||
delete(s.sys.replies, replySubj)
|
||||
s.mu.Unlock()
|
||||
// Send the response.
|
||||
s.sendInternalAccountMsg(c.acc, reply, nsubs)
|
||||
}()
|
||||
}
|
||||
|
||||
// Request for our local subscription count. This will come from a remote origin server
|
||||
// that received the initial request.
|
||||
func (s *Server) nsubsRequest(sub *subscription, _ *client, subject, reply string, msg []byte) {
|
||||
if !s.eventsRunning() {
|
||||
return
|
||||
}
|
||||
m := accNumSubsReq{}
|
||||
if err := json.Unmarshal(msg, &m); err != nil {
|
||||
s.sys.client.Errorf("Error unmarshalling account nsubs request message: %v", err)
|
||||
return
|
||||
}
|
||||
// Grab account.
|
||||
acc, _ := s.lookupAccount(m.Account)
|
||||
if acc == nil || acc.numLocalAndLeafConnections() == 0 {
|
||||
return
|
||||
}
|
||||
// We will look up subscribers locally first then determine if we need to solicit other servers.
|
||||
var nsubs int32
|
||||
if subjectIsLiteral(m.Subject) {
|
||||
rr := acc.sl.Match(m.Subject)
|
||||
nsubs = totalSubs(rr, m.Queue)
|
||||
} else {
|
||||
// We have a wildcard, so this is a bit slower path.
|
||||
var _subs [32]*subscription
|
||||
subs := _subs[:0]
|
||||
acc.sl.All(&subs)
|
||||
for _, sub := range subs {
|
||||
if (sub.client.kind == CLIENT || sub.client.isUnsolicitedLeafNode()) && subjectIsSubsetMatch(string(sub.subject), m.Subject) {
|
||||
if m.Queue != nil && !bytes.Equal(m.Queue, sub.queue) {
|
||||
continue
|
||||
}
|
||||
nsubs++
|
||||
}
|
||||
}
|
||||
}
|
||||
s.sendInternalMsgLocked(reply, _EMPTY_, nil, nsubs)
|
||||
}
|
||||
|
||||
// Helper to grab name for a client.
|
||||
func nameForClient(c *client) string {
|
||||
if c.user != nil {
|
||||
|
||||
@@ -538,7 +538,7 @@ func TestSystemAccountInternalSubscriptions(t *testing.T) {
|
||||
|
||||
received := make(chan *nats.Msg)
|
||||
// Create message callback handler.
|
||||
cb := func(sub *subscription, subject, reply string, msg []byte) {
|
||||
cb := func(sub *subscription, _ *client, subject, reply string, msg []byte) {
|
||||
copy := append([]byte(nil), msg...)
|
||||
received <- &nats.Msg{Subject: subject, Reply: reply, Data: copy}
|
||||
}
|
||||
@@ -612,7 +612,7 @@ func TestSystemAccountConnectionUpdatesStopAfterNoLocal(t *testing.T) {
|
||||
|
||||
// Listen for updates to the new account connection activity.
|
||||
received := make(chan *nats.Msg, 10)
|
||||
cb := func(sub *subscription, subject, reply string, msg []byte) {
|
||||
cb := func(sub *subscription, _ *client, subject, reply string, msg []byte) {
|
||||
copy := append([]byte(nil), msg...)
|
||||
received <- &nats.Msg{Subject: subject, Reply: reply, Data: copy}
|
||||
}
|
||||
@@ -1172,9 +1172,10 @@ func TestSystemAccountWithGateways(t *testing.T) {
|
||||
sub, _ := nca.SubscribeSync("$SYS.ACCOUNT.>")
|
||||
defer sub.Unsubscribe()
|
||||
nca.Flush()
|
||||
|
||||
// If this tests fails with wrong number after 10 seconds we may have
|
||||
// added a new inititial subscription for the eventing system.
|
||||
checkExpectedSubs(t, 10, sa)
|
||||
checkExpectedSubs(t, 13, sa)
|
||||
|
||||
// Create a client on B and see if we receive the event
|
||||
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
|
||||
@@ -1467,8 +1468,9 @@ func TestFetchAccountRace(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestConnectionUpdatesTimerProperlySet(t *testing.T) {
|
||||
origEventsHBInterval := eventsHBInterval
|
||||
eventsHBInterval = 50 * time.Millisecond
|
||||
defer func() { eventsHBInterval = defaultEventsHBItvl }()
|
||||
defer func() { eventsHBInterval = origEventsHBInterval }()
|
||||
|
||||
sa, _, sb, optsB, _ := runTrustedCluster(t)
|
||||
defer sa.Shutdown()
|
||||
@@ -1486,7 +1488,7 @@ func TestConnectionUpdatesTimerProperlySet(t *testing.T) {
|
||||
|
||||
// Listen for HB updates...
|
||||
count := int32(0)
|
||||
cb := func(sub *subscription, subject, reply string, msg []byte) {
|
||||
cb := func(sub *subscription, _ *client, subject, reply string, msg []byte) {
|
||||
atomic.AddInt32(&count, 1)
|
||||
}
|
||||
subj := fmt.Sprintf(accConnsEventSubj, pub)
|
||||
|
||||
@@ -64,6 +64,10 @@ func (c *client) isSolicitedLeafNode() bool {
|
||||
return c.kind == LEAF && c.leaf.remote != nil
|
||||
}
|
||||
|
||||
func (c *client) isUnsolicitedLeafNode() bool {
|
||||
return c.kind == LEAF && c.leaf.remote == nil
|
||||
}
|
||||
|
||||
// This will spin up go routines to solicit the remote leaf node connections.
|
||||
func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
|
||||
for _, r := range remotes {
|
||||
@@ -403,7 +407,6 @@ func (c *client) sendLeafConnect(tlsRequired bool) {
|
||||
cinfo.User = c.leaf.remote.username
|
||||
cinfo.Pass = c.leaf.remote.password
|
||||
}
|
||||
|
||||
b, err := json.Marshal(cinfo)
|
||||
if err != nil {
|
||||
c.Errorf("Error marshaling CONNECT to route: %v\n", err)
|
||||
@@ -1001,6 +1004,7 @@ func (c *client) updateSmap(sub *subscription, delta int32) {
|
||||
n := c.leaf.smap[key]
|
||||
// We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
|
||||
update := sub.queue != nil || n == 0 || n+delta <= 0
|
||||
|
||||
n += delta
|
||||
if n > 0 {
|
||||
c.leaf.smap[key] = n
|
||||
|
||||
@@ -407,7 +407,7 @@ func (c *client) parse(buf []byte) error {
|
||||
|
||||
switch c.kind {
|
||||
case CLIENT:
|
||||
err = c.processSub(arg)
|
||||
_, err = c.processSub(arg, false)
|
||||
case ROUTER:
|
||||
err = c.processRemoteSub(arg)
|
||||
case GATEWAY:
|
||||
|
||||
@@ -327,6 +327,18 @@ func NewServer(opts *Options) (*Server, error) {
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// ClientURL returns the URL used to connect clients. Helpful in testing
|
||||
// when we designate a random client port (-1).
|
||||
func (s *Server) ClientURL() string {
|
||||
// FIXME(dlc) - should we add in user and pass if defined single?
|
||||
opts := s.getOpts()
|
||||
scheme := "nats://"
|
||||
if opts.TLSConfig != nil {
|
||||
scheme = "tls://"
|
||||
}
|
||||
return fmt.Sprintf("%s%s:%d", scheme, opts.Host, opts.Port)
|
||||
}
|
||||
|
||||
func validateOptions(o *Options) error {
|
||||
// Check that the trust configuration is correct.
|
||||
if err := validateTrustedOperators(o); err != nil {
|
||||
@@ -725,6 +737,14 @@ func (s *Server) setSystemAccount(acc *Account) error {
|
||||
return ErrAccountExists
|
||||
}
|
||||
|
||||
// This is here in an attempt to quiet the race detector and not have to place
|
||||
// locks on fast path for inbound messages and checking service imports.
|
||||
acc.mu.Lock()
|
||||
if acc.imports.services == nil {
|
||||
acc.imports.services = make(map[string]*serviceImport)
|
||||
}
|
||||
acc.mu.Unlock()
|
||||
|
||||
s.sys = &internal{
|
||||
account: acc,
|
||||
client: &client{srv: s, kind: SYSTEM, opts: internalOpts, msubs: -1, mpay: -1, start: time.Now(), last: time.Now()},
|
||||
@@ -732,6 +752,7 @@ func (s *Server) setSystemAccount(acc *Account) error {
|
||||
sid: 1,
|
||||
servers: make(map[string]*serverUpdate),
|
||||
subs: make(map[string]msgHandler),
|
||||
replies: make(map[string]msgHandler),
|
||||
sendq: make(chan *pubMsg, internalSendQLen),
|
||||
statsz: eventsHBInterval,
|
||||
orphMax: 5 * eventsHBInterval,
|
||||
|
||||
@@ -979,7 +979,7 @@ func matchLiteral(literal, subject string) bool {
|
||||
}
|
||||
|
||||
func addLocalSub(sub *subscription, subs *[]*subscription) {
|
||||
if sub != nil && sub.client != nil && sub.client.kind == CLIENT && sub.im == nil {
|
||||
if sub != nil && sub.client != nil && (sub.client.kind == CLIENT || sub.client.kind == SYSTEM) && sub.im == nil {
|
||||
*subs = append(*subs, sub)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -596,6 +596,7 @@ func createClusterEx(t *testing.T, doAccounts bool, clusterName string, numServe
|
||||
&server.User{Username: "ngs", Password: "pass", Permissions: nil, Account: ngs},
|
||||
&server.User{Username: "foo", Password: "pass", Permissions: nil, Account: foo},
|
||||
&server.User{Username: "bar", Password: "pass", Permissions: nil, Account: bar},
|
||||
&server.User{Username: "sys", Password: "pass", Permissions: nil, Account: sys},
|
||||
}
|
||||
return accounts, users
|
||||
}
|
||||
|
||||
368
test/system_services_test.go
Normal file
368
test/system_services_test.go
Normal file
@@ -0,0 +1,368 @@
|
||||
// Copyright 2019 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats-server/v2/server"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
const dbgSubs = "$SYS.DEBUG.SUBSCRIBERS"
|
||||
|
||||
func (sc *supercluster) selectRandomServer() *server.Options {
|
||||
ci := rand.Int31n(int32(len(sc.clusters)))
|
||||
si := rand.Int31n(int32(len(sc.clusters[ci].servers)))
|
||||
return sc.clusters[ci].opts[si]
|
||||
}
|
||||
|
||||
func (sc *supercluster) setupSystemServicesImports(t *testing.T, account string) {
|
||||
t.Helper()
|
||||
for _, c := range sc.clusters {
|
||||
for _, s := range c.servers {
|
||||
sysAcc := s.SystemAccount()
|
||||
if sysAcc == nil {
|
||||
t.Fatalf("System account not set")
|
||||
}
|
||||
acc, err := s.LookupAccount(account)
|
||||
if err != nil {
|
||||
t.Fatalf("Error looking up account '%s': %v", account, err)
|
||||
}
|
||||
if err := acc.AddServiceImport(sysAcc, dbgSubs, dbgSubs); err != nil {
|
||||
t.Fatalf("Error adding subscribers debug service to '%s': %v", account, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func numSubs(t *testing.T, msg *nats.Msg) int {
|
||||
t.Helper()
|
||||
if msg == nil || msg.Data == nil {
|
||||
t.Fatalf("No response")
|
||||
}
|
||||
n, err := strconv.Atoi(string(msg.Data))
|
||||
if err != nil {
|
||||
t.Fatalf("Got non-number response: %v", err)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func checkNumSubs(t *testing.T, msg *nats.Msg, expected int) {
|
||||
t.Helper()
|
||||
if n := numSubs(t, msg); n != expected {
|
||||
t.Fatalf("Expected %d subscribers, got %d", expected, n)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSystemServiceSubscribers(t *testing.T) {
|
||||
numServers, numClusters := 3, 3
|
||||
sc := createSuperCluster(t, numServers, numClusters)
|
||||
defer sc.shutdown()
|
||||
|
||||
sc.setupSystemServicesImports(t, "FOO")
|
||||
|
||||
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
||||
defer nc.Close()
|
||||
|
||||
checkInterest := func(expected int) {
|
||||
t.Helper()
|
||||
response, _ := nc.Request(dbgSubs, []byte("foo.bar"), time.Second)
|
||||
checkNumSubs(t, response, expected)
|
||||
}
|
||||
|
||||
checkInterest(0)
|
||||
|
||||
// Now add in local subscribers.
|
||||
for i := 0; i < 5; i++ {
|
||||
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
||||
defer nc.Close()
|
||||
nc.SubscribeSync("foo.bar")
|
||||
nc.SubscribeSync("foo.*")
|
||||
nc.Flush()
|
||||
}
|
||||
|
||||
checkInterest(10)
|
||||
|
||||
// Now create remote subscribers at random.
|
||||
for i := 0; i < 90; i++ {
|
||||
nc := clientConnect(t, sc.selectRandomServer(), "foo")
|
||||
defer nc.Close()
|
||||
nc.SubscribeSync("foo.bar")
|
||||
nc.Flush()
|
||||
}
|
||||
|
||||
checkInterest(100)
|
||||
}
|
||||
|
||||
// Test that we can match wildcards. So sub may be foo.bar and we ask about foo.*, that should work.
|
||||
func TestSystemServiceSubscribersWildcards(t *testing.T) {
|
||||
numServers, numClusters := 3, 3
|
||||
sc := createSuperCluster(t, numServers, numClusters)
|
||||
defer sc.shutdown()
|
||||
|
||||
sc.setupSystemServicesImports(t, "FOO")
|
||||
|
||||
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
||||
defer nc.Close()
|
||||
|
||||
for i := 0; i < 50; i++ {
|
||||
nc := clientConnect(t, sc.selectRandomServer(), "foo")
|
||||
defer nc.Close()
|
||||
nc.SubscribeSync(fmt.Sprintf("foo.bar.%d", i+1))
|
||||
nc.SubscribeSync(fmt.Sprintf("%d", i+1))
|
||||
nc.Flush()
|
||||
}
|
||||
|
||||
response, _ := nc.Request(dbgSubs, []byte("foo.bar.*"), time.Second)
|
||||
checkNumSubs(t, response, 50)
|
||||
|
||||
response, _ = nc.Request(dbgSubs, []byte("foo.>"), time.Second)
|
||||
checkNumSubs(t, response, 50)
|
||||
|
||||
response, _ = nc.Request(dbgSubs, []byte("foo.bar.22"), time.Second)
|
||||
checkNumSubs(t, response, 1)
|
||||
|
||||
response, _ = nc.Request(dbgSubs, []byte("_INBOX.*.*"), time.Second)
|
||||
hasInbox := numSubs(t, response)
|
||||
|
||||
response, _ = nc.Request(dbgSubs, []byte(">"), time.Second)
|
||||
checkNumSubs(t, response, 100+hasInbox)
|
||||
}
|
||||
|
||||
// Test that we can match on queue groups as well. Separate request payload with any whitespace.
|
||||
func TestSystemServiceSubscribersQueueGroups(t *testing.T) {
|
||||
numServers, numClusters := 3, 3
|
||||
sc := createSuperCluster(t, numServers, numClusters)
|
||||
defer sc.shutdown()
|
||||
|
||||
sc.setupSystemServicesImports(t, "FOO")
|
||||
|
||||
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
||||
defer nc.Close()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
nc := clientConnect(t, sc.selectRandomServer(), "foo")
|
||||
defer nc.Close()
|
||||
subj := fmt.Sprintf("foo.bar.%d", i+1)
|
||||
nc.QueueSubscribeSync(subj, "QG.11")
|
||||
nc.QueueSubscribeSync("foo.baz", "QG.33")
|
||||
nc.Flush()
|
||||
}
|
||||
|
||||
for i := 0; i < 23; i++ {
|
||||
nc := clientConnect(t, sc.selectRandomServer(), "foo")
|
||||
defer nc.Close()
|
||||
subj := fmt.Sprintf("foo.bar.%d", i+1)
|
||||
nc.QueueSubscribeSync(subj, "QG.22")
|
||||
nc.QueueSubscribeSync("foo.baz", "QG.22")
|
||||
nc.Flush()
|
||||
}
|
||||
|
||||
response, _ := nc.Request(dbgSubs, []byte("foo.bar.*"), time.Second)
|
||||
checkNumSubs(t, response, 33)
|
||||
|
||||
response, _ = nc.Request(dbgSubs, []byte("foo.bar.22 QG.22"), time.Second)
|
||||
checkNumSubs(t, response, 1)
|
||||
|
||||
response, _ = nc.Request(dbgSubs, []byte("foo.bar.2"), time.Second)
|
||||
checkNumSubs(t, response, 2)
|
||||
|
||||
response, _ = nc.Request(dbgSubs, []byte("foo.baz"), time.Second)
|
||||
checkNumSubs(t, response, 33)
|
||||
|
||||
response, _ = nc.Request(dbgSubs, []byte("foo.baz QG.22"), time.Second)
|
||||
checkNumSubs(t, response, 23)
|
||||
|
||||
// Now check qfilters work on wildcards too.
|
||||
response, _ = nc.Request(dbgSubs, []byte("foo.bar.> QG.11"), time.Second)
|
||||
checkNumSubs(t, response, 10)
|
||||
|
||||
response, _ = nc.Request(dbgSubs, []byte("*.baz QG.22"), time.Second)
|
||||
checkNumSubs(t, response, 23)
|
||||
|
||||
response, _ = nc.Request(dbgSubs, []byte("foo.*.2 QG.22"), time.Second)
|
||||
checkNumSubs(t, response, 1)
|
||||
}
|
||||
|
||||
func TestSystemServiceSubscribersLeafNodesWithoutSystem(t *testing.T) {
|
||||
numServers, numClusters := 3, 3
|
||||
sc := createSuperCluster(t, numServers, numClusters)
|
||||
defer sc.shutdown()
|
||||
|
||||
sc.setupSystemServicesImports(t, "FOO")
|
||||
|
||||
ci := rand.Int31n(int32(len(sc.clusters)))
|
||||
si := rand.Int31n(int32(len(sc.clusters[ci].servers)))
|
||||
s, opts := sc.clusters[ci].servers[si], sc.clusters[ci].opts[si]
|
||||
url := fmt.Sprintf("nats://%s:pass@%s:%d", "foo", opts.Host, opts.LeafNode.Port)
|
||||
ls, lopts := runSolicitLeafServerToURL(url)
|
||||
defer ls.Shutdown()
|
||||
|
||||
checkLeafNodeConnected(t, s)
|
||||
|
||||
// This is so we can test when the subs on a leafnode are flushed to the connected supercluster.
|
||||
fsubj := "__leaf.flush__"
|
||||
fc := clientConnect(t, opts, "foo")
|
||||
fc.Subscribe(fsubj, func(m *nats.Msg) {
|
||||
m.Respond(nil)
|
||||
})
|
||||
|
||||
lnc := clientConnect(t, lopts, "$G")
|
||||
defer lnc.Close()
|
||||
|
||||
flushLeaf := func() {
|
||||
if _, err := lnc.Request(fsubj, nil, time.Second); err != nil {
|
||||
t.Fatalf("Did not flush through to the supercluster: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
nc := clientConnect(t, sc.selectRandomServer(), "foo")
|
||||
defer nc.Close()
|
||||
nc.SubscribeSync(fmt.Sprintf("foo.bar.%d", i+1))
|
||||
nc.QueueSubscribeSync("foo.bar.baz", "QG.22")
|
||||
nc.Flush()
|
||||
}
|
||||
|
||||
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
||||
defer nc.Close()
|
||||
|
||||
response, _ := nc.Request(dbgSubs, []byte("foo.bar.*"), time.Second)
|
||||
checkNumSubs(t, response, 20)
|
||||
|
||||
response, _ = nc.Request(dbgSubs, []byte("foo.bar.3"), time.Second)
|
||||
checkNumSubs(t, response, 1)
|
||||
|
||||
lnc.SubscribeSync("foo.bar.3")
|
||||
lnc.QueueSubscribeSync("foo.bar.baz", "QG.22")
|
||||
|
||||
// We could flush here but that does not guarantee we have flushed through to the supercluster.
|
||||
flushLeaf()
|
||||
|
||||
response, _ = nc.Request(dbgSubs, []byte("foo.bar.3"), time.Second)
|
||||
checkNumSubs(t, response, 2)
|
||||
|
||||
response, _ = nc.Request(dbgSubs, []byte("foo.bar.baz QG.22"), time.Second)
|
||||
checkNumSubs(t, response, 11)
|
||||
|
||||
lnc.SubscribeSync("foo.bar.3")
|
||||
lnc.QueueSubscribeSync("foo.bar.baz", "QG.22")
|
||||
flushLeaf()
|
||||
|
||||
// For now we do not see all the details behind a leafnode if the leafnode is not enabled.
|
||||
response, _ = nc.Request(dbgSubs, []byte("foo.bar.3"), time.Second)
|
||||
checkNumSubs(t, response, 2)
|
||||
|
||||
response, _ = nc.Request(dbgSubs, []byte("foo.bar.baz QG.22"), time.Second)
|
||||
checkNumSubs(t, response, 11)
|
||||
}
|
||||
|
||||
func runSolicitLeafServerWithSystemToURL(surl string) (*server.Server, *server.Options) {
|
||||
o := DefaultTestOptions
|
||||
o.Port = -1
|
||||
fooAcc := server.NewAccount("FOO")
|
||||
o.Accounts = []*server.Account{server.NewAccount("$SYS"), fooAcc}
|
||||
o.SystemAccount = "$SYS"
|
||||
o.Users = []*server.User{
|
||||
&server.User{Username: "foo", Password: "pass", Permissions: nil, Account: fooAcc},
|
||||
}
|
||||
rurl, _ := url.Parse(surl)
|
||||
sysUrl, _ := url.Parse(strings.Replace(surl, rurl.User.Username(), "sys", -1))
|
||||
o.LeafNode.Remotes = []*server.RemoteLeafOpts{
|
||||
{
|
||||
URLs: []*url.URL{rurl},
|
||||
LocalAccount: "FOO",
|
||||
},
|
||||
{
|
||||
URLs: []*url.URL{sysUrl},
|
||||
LocalAccount: "$SYS",
|
||||
},
|
||||
}
|
||||
o.LeafNode.ReconnectInterval = 100 * time.Millisecond
|
||||
return RunServer(&o), &o
|
||||
}
|
||||
|
||||
func TestSystemServiceSubscribersLeafNodesWithSystem(t *testing.T) {
|
||||
numServers, numClusters := 3, 3
|
||||
sc := createSuperCluster(t, numServers, numClusters)
|
||||
defer sc.shutdown()
|
||||
|
||||
sc.setupSystemServicesImports(t, "FOO")
|
||||
|
||||
ci := rand.Int31n(int32(len(sc.clusters)))
|
||||
si := rand.Int31n(int32(len(sc.clusters[ci].servers)))
|
||||
s, opts := sc.clusters[ci].servers[si], sc.clusters[ci].opts[si]
|
||||
url := fmt.Sprintf("nats://%s:pass@%s:%d", "foo", opts.Host, opts.LeafNode.Port)
|
||||
ls, lopts := runSolicitLeafServerWithSystemToURL(url)
|
||||
defer ls.Shutdown()
|
||||
|
||||
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
|
||||
if nln := s.NumLeafNodes(); nln != 2 {
|
||||
return fmt.Errorf("Expected a connected leafnode for server %q, got none", s.ID())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// This is so we can test when the subs on a leafnode are flushed to the connected supercluster.
|
||||
fsubj := "__leaf.flush__"
|
||||
fc := clientConnect(t, opts, "foo")
|
||||
fc.Subscribe(fsubj, func(m *nats.Msg) {
|
||||
m.Respond(nil)
|
||||
})
|
||||
|
||||
lnc := clientConnect(t, lopts, "foo")
|
||||
defer lnc.Close()
|
||||
|
||||
flushLeaf := func() {
|
||||
if _, err := lnc.Request(fsubj, nil, time.Second); err != nil {
|
||||
t.Fatalf("Did not flush through to the supercluster: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
nc := clientConnect(t, sc.selectRandomServer(), "foo")
|
||||
defer nc.Close()
|
||||
nc.SubscribeSync(fmt.Sprintf("foo.bar.%d", i+1))
|
||||
nc.QueueSubscribeSync("foo.bar.baz", "QG.22")
|
||||
nc.Flush()
|
||||
}
|
||||
|
||||
nc := clientConnect(t, sc.clusters[0].opts[0], "foo")
|
||||
defer nc.Close()
|
||||
|
||||
response, _ := nc.Request(dbgSubs, []byte("foo.bar.3"), time.Second)
|
||||
checkNumSubs(t, response, 1)
|
||||
|
||||
response, _ = nc.Request(dbgSubs, []byte("foo.bar.*"), time.Second)
|
||||
checkNumSubs(t, response, 20)
|
||||
|
||||
lnc.SubscribeSync("foo.bar.3")
|
||||
lnc.QueueSubscribeSync("foo.bar.baz", "QG.22")
|
||||
flushLeaf()
|
||||
|
||||
// Since we are doing real tracking now on the other side, this will be off by 1 since we are counting
|
||||
// the leaf and the real sub.
|
||||
response, _ = nc.Request(dbgSubs, []byte("foo.bar.3"), time.Second)
|
||||
checkNumSubs(t, response, 3)
|
||||
|
||||
response, _ = nc.Request(dbgSubs, []byte("foo.bar.baz QG.22"), time.Second)
|
||||
checkNumSubs(t, response, 12)
|
||||
}
|
||||
Reference in New Issue
Block a user