mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Added in ability for normal accounts to access scoped connz info.
Added in client kind and sub type for clients. Added in ability to filter connections based on matching subject interest. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -57,6 +57,7 @@ const (
|
||||
leafNodeConnectEventSubj = "$SYS.ACCOUNT.%s.LEAFNODE.CONNECT" // for internal use only
|
||||
remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s"
|
||||
inboxRespSubj = "$SYS._INBOX.%s.%s"
|
||||
accConnzReqSubj = "$SYS.REQ.ACCOUNT.PING.CONNZ"
|
||||
|
||||
// FIXME(dlc) - Should account scope, even with wc for now, but later on
|
||||
// we can then shard as needed.
|
||||
@@ -464,9 +465,8 @@ func (s *Server) eventsRunning() bool {
|
||||
// a defined system account.
|
||||
func (s *Server) EventsEnabled() bool {
|
||||
s.mu.Lock()
|
||||
ee := s.eventsEnabled()
|
||||
s.mu.Unlock()
|
||||
return ee
|
||||
defer s.mu.Unlock()
|
||||
return s.eventsEnabled()
|
||||
}
|
||||
|
||||
// eventsEnabled will report if events are enabled.
|
||||
@@ -777,7 +777,7 @@ func (s *Server) initEventTracking() {
|
||||
}
|
||||
extractAccount := func(subject string) (string, error) {
|
||||
if tk := strings.Split(subject, tsep); len(tk) != accReqTokens {
|
||||
return "", fmt.Errorf("subject %q is malformed", subject)
|
||||
return _EMPTY_, fmt.Errorf("subject %q is malformed", subject)
|
||||
} else {
|
||||
return tk[accReqAccIndex], nil
|
||||
}
|
||||
@@ -801,6 +801,14 @@ func (s *Server) initEventTracking() {
|
||||
if acc, err := extractAccount(subject); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
if ci, _, _, _, err := c.srv.getRequestInfo(c, msg); err == nil {
|
||||
// Make sure the accounts match.
|
||||
if ci.Account != acc {
|
||||
// Do not leak too much here.
|
||||
return nil, fmt.Errorf("bad request")
|
||||
}
|
||||
optz.ConnzOptions.isAccountReq = true
|
||||
}
|
||||
optz.ConnzOptions.Account = acc
|
||||
return s.Connz(&optz.ConnzOptions)
|
||||
}
|
||||
@@ -863,14 +871,47 @@ func (s *Server) initEventTracking() {
|
||||
}
|
||||
}
|
||||
|
||||
// register existing accounts with any system exports.
|
||||
func (s *Server) registerSystemImportsForExisting() {
|
||||
var accounts []*Account
|
||||
|
||||
s.mu.Lock()
|
||||
if s.sys == nil {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
sacc := s.sys.account
|
||||
s.accounts.Range(func(k, v interface{}) bool {
|
||||
a := v.(*Account)
|
||||
if a != sacc {
|
||||
accounts = append(accounts, a)
|
||||
}
|
||||
return true
|
||||
})
|
||||
s.mu.Unlock()
|
||||
|
||||
for _, a := range accounts {
|
||||
s.registerSystemImports(a)
|
||||
}
|
||||
}
|
||||
|
||||
// add all exports a system account will need
|
||||
func (s *Server) addSystemAccountExports(sacc *Account) {
|
||||
if !s.EventsEnabled() {
|
||||
return
|
||||
}
|
||||
accConnzSubj := fmt.Sprintf(accReqSubj, "*", "CONNZ")
|
||||
if err := sacc.AddServiceExportWithResponse(accConnzSubj, Streamed, nil); err != nil {
|
||||
s.Errorf("Error adding system service export for %q: %v", accConnzSubj, err)
|
||||
}
|
||||
// Register any accounts that existed prior.
|
||||
s.registerSystemImportsForExisting()
|
||||
|
||||
// FIXME(dlc) - Old experiment, Remove?
|
||||
if err := sacc.AddServiceExport(accSubsSubj, nil); err != nil {
|
||||
s.Errorf("Error adding system service export for %q: %v", accSubsSubj, err)
|
||||
}
|
||||
|
||||
if s.JetStreamEnabled() {
|
||||
s.checkJetStreamExports()
|
||||
}
|
||||
@@ -1329,6 +1370,29 @@ func (s *Server) remoteConnsUpdate(sub *subscription, _ *client, _ *Account, sub
|
||||
}
|
||||
}
|
||||
|
||||
// This will import any system level exports.
|
||||
func (s *Server) registerSystemImports(a *Account) {
|
||||
if a == nil || !s.eventsEnabled() {
|
||||
return
|
||||
}
|
||||
sacc := s.SystemAccount()
|
||||
if sacc == nil {
|
||||
return
|
||||
}
|
||||
// FIXME(dlc) - make a shared list between sys exports etc.
|
||||
connzSubj := fmt.Sprintf(serverPingReqSubj, "CONNZ")
|
||||
mappedSubj := fmt.Sprintf(accReqSubj, a.Name, "CONNZ")
|
||||
|
||||
// Add in this to the account in 2 places.
|
||||
// "$SYS.REQ.SERVER.PING.CONNZ" and "$SYS.REQ.ACCOUNT.CONNZ"
|
||||
if err := a.AddServiceImport(sacc, connzSubj, mappedSubj); err != nil {
|
||||
s.Errorf("Error setting up system service imports for account: %v", err)
|
||||
}
|
||||
if err := a.AddServiceImport(sacc, accConnzReqSubj, mappedSubj); err != nil {
|
||||
s.Errorf("Error setting up system service imports for account: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Setup tracking for this account. This allows us to track global account activity.
|
||||
// Lock should be held on entry.
|
||||
func (s *Server) enableAccountTracking(a *Account) {
|
||||
|
||||
Reference in New Issue
Block a user