From 10167b1bcfdd666c075ac2f67cf885403c01a6e7 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 13 Aug 2021 09:41:44 -0700 Subject: [PATCH] Added in ability for normal accounts to access scoped connz info. Added in client kind and sub type for clients. Added in ability to filter connections based on matching subject interest. Signed-off-by: Derek Collison --- server/accounts.go | 12 ++-- server/events.go | 72 ++++++++++++++++++++++-- server/monitor.go | 100 +++++++++++++++++++++++++++++---- server/norace_test.go | 128 ++++++++++++++++++++++++++++++++++++++++++ server/server.go | 6 ++ 5 files changed, 296 insertions(+), 22 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 9fe4a81c..e20be9bc 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -816,8 +816,7 @@ func (a *Account) selectMappedSubject(dest string) (string, bool) { } // SubscriptionInterest returns true if this account has a matching subscription -// for the given `subject`. Works only for literal subjects. -// TODO: Add support for wildcards +// for the given `subject`. func (a *Account) SubscriptionInterest(subject string) bool { return a.Interest(subject) > 0 } @@ -1377,7 +1376,7 @@ func (a *Account) AddServiceImportWithClaim(destination *Account, from, to strin return ErrMissingAccount } // Empty means use from. - if to == "" { + if to == _EMPTY_ { to = from } if !IsValidSubject(from) || !IsValidSubject(to) { @@ -2277,7 +2276,7 @@ func (a *Account) AddStreamImportWithClaim(account *Account, from, prefix string // Check prefix if it exists and make sure its a literal. // Append token separator if not already present. - if prefix != "" { + if prefix != _EMPTY_ { // Make sure there are no wildcards here, this prefix needs to be a literal // since it will be prepended to a publish subject. if !subjectIsLiteral(prefix) { @@ -2415,7 +2414,7 @@ func (a *Account) checkStreamImportAuthorizedNoLock(account *Account, subject st func (a *Account) checkAuth(ea *exportAuth, account *Account, imClaim *jwt.Import, tokens []string) bool { // if ea is nil or ea.approved is nil, that denotes a public export - if ea == nil || (ea.approved == nil && !ea.tokenReq && ea.accountPos == 0) { + if ea == nil || (len(ea.approved) == 0 && !ea.tokenReq && ea.accountPos == 0) { return true } // Check if the export is protected and enforces presence of importing account identity @@ -2444,6 +2443,7 @@ func (a *Account) checkStreamExportApproved(account *Account, subject string, im } return a.checkAuth(&ea.exportAuth, account, imClaim, nil) } + // ok if we are here we did not match directly so we need to test each one. // The import subject arg has to take precedence, meaning the export // has to be a true subset of the import claim. We already checked for @@ -2464,7 +2464,7 @@ func (a *Account) checkServiceExportApproved(account *Account, subject string, i // Check direct match of subject first se, ok := a.exports.services[subject] if ok { - // if ea is nil or eq.approved is nil, that denotes a public export + // if se is nil or eq.approved is nil, that denotes a public export if se == nil { return true } diff --git a/server/events.go b/server/events.go index 2aabcc63..25fa80db 100644 --- a/server/events.go +++ b/server/events.go @@ -57,6 +57,7 @@ const ( leafNodeConnectEventSubj = "$SYS.ACCOUNT.%s.LEAFNODE.CONNECT" // for internal use only remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s" inboxRespSubj = "$SYS._INBOX.%s.%s" + accConnzReqSubj = "$SYS.REQ.ACCOUNT.PING.CONNZ" // FIXME(dlc) - Should account scope, even with wc for now, but later on // we can then shard as needed. @@ -464,9 +465,8 @@ func (s *Server) eventsRunning() bool { // a defined system account. func (s *Server) EventsEnabled() bool { s.mu.Lock() - ee := s.eventsEnabled() - s.mu.Unlock() - return ee + defer s.mu.Unlock() + return s.eventsEnabled() } // eventsEnabled will report if events are enabled. @@ -777,7 +777,7 @@ func (s *Server) initEventTracking() { } extractAccount := func(subject string) (string, error) { if tk := strings.Split(subject, tsep); len(tk) != accReqTokens { - return "", fmt.Errorf("subject %q is malformed", subject) + return _EMPTY_, fmt.Errorf("subject %q is malformed", subject) } else { return tk[accReqAccIndex], nil } @@ -801,6 +801,14 @@ func (s *Server) initEventTracking() { if acc, err := extractAccount(subject); err != nil { return nil, err } else { + if ci, _, _, _, err := c.srv.getRequestInfo(c, msg); err == nil { + // Make sure the accounts match. + if ci.Account != acc { + // Do not leak too much here. + return nil, fmt.Errorf("bad request") + } + optz.ConnzOptions.isAccountReq = true + } optz.ConnzOptions.Account = acc return s.Connz(&optz.ConnzOptions) } @@ -863,14 +871,47 @@ func (s *Server) initEventTracking() { } } +// register existing accounts with any system exports. +func (s *Server) registerSystemImportsForExisting() { + var accounts []*Account + + s.mu.Lock() + if s.sys == nil { + s.mu.Unlock() + return + } + sacc := s.sys.account + s.accounts.Range(func(k, v interface{}) bool { + a := v.(*Account) + if a != sacc { + accounts = append(accounts, a) + } + return true + }) + s.mu.Unlock() + + for _, a := range accounts { + s.registerSystemImports(a) + } +} + // add all exports a system account will need func (s *Server) addSystemAccountExports(sacc *Account) { if !s.EventsEnabled() { return } + accConnzSubj := fmt.Sprintf(accReqSubj, "*", "CONNZ") + if err := sacc.AddServiceExportWithResponse(accConnzSubj, Streamed, nil); err != nil { + s.Errorf("Error adding system service export for %q: %v", accConnzSubj, err) + } + // Register any accounts that existed prior. + s.registerSystemImportsForExisting() + + // FIXME(dlc) - Old experiment, Remove? if err := sacc.AddServiceExport(accSubsSubj, nil); err != nil { s.Errorf("Error adding system service export for %q: %v", accSubsSubj, err) } + if s.JetStreamEnabled() { s.checkJetStreamExports() } @@ -1329,6 +1370,29 @@ func (s *Server) remoteConnsUpdate(sub *subscription, _ *client, _ *Account, sub } } +// This will import any system level exports. +func (s *Server) registerSystemImports(a *Account) { + if a == nil || !s.eventsEnabled() { + return + } + sacc := s.SystemAccount() + if sacc == nil { + return + } + // FIXME(dlc) - make a shared list between sys exports etc. + connzSubj := fmt.Sprintf(serverPingReqSubj, "CONNZ") + mappedSubj := fmt.Sprintf(accReqSubj, a.Name, "CONNZ") + + // Add in this to the account in 2 places. + // "$SYS.REQ.SERVER.PING.CONNZ" and "$SYS.REQ.ACCOUNT.CONNZ" + if err := a.AddServiceImport(sacc, connzSubj, mappedSubj); err != nil { + s.Errorf("Error setting up system service imports for account: %v", err) + } + if err := a.AddServiceImport(sacc, accConnzReqSubj, mappedSubj); err != nil { + s.Errorf("Error setting up system service imports for account: %v", err) + } +} + // Setup tracking for this account. This allows us to track global account activity. // Lock should be held on entry. func (s *Server) enableAccountTracking(a *Account) { diff --git a/server/monitor.go b/server/monitor.go index 9e27323b..66377922 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -86,6 +86,13 @@ type ConnzOptions struct { // Filter by account. Account string `json:"acc"` + + // Filter by subject interest + FilterSubject string `json:"filter_subject"` + + // Private indication that this request is from an account and not a system account. + // Used to not leak system level information to the account. + isAccountReq bool } // ConnState is for filtering states of connections. We will only have two, open and closed. @@ -103,6 +110,8 @@ const ( // ConnInfo has detailed information on a per connection basis. type ConnInfo struct { Cid uint64 `json:"cid"` + Kind string `json:"kind,omitempty"` + Type string `json:"type,omitempty"` IP string `json:"ip"` Port int `json:"port"` Start time.Time `json:"start"` @@ -170,11 +179,13 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { state = ConnOpen user string acc string + a *Account + filter string ) if opts != nil { // If no sort option given or sort is by uptime, then sort by cid - if opts.Sort == "" { + if opts.Sort == _EMPTY_ { sortOpt = ByCid } else { sortOpt = opts.Sort @@ -185,7 +196,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { // Auth specifics. auth = opts.Username - if !auth && (user != "" || acc != "") { + if !auth && (user != _EMPTY_ || acc != _EMPTY_) { return nil, fmt.Errorf("filter by user or account only allowed with auth option") } user = opts.User @@ -212,12 +223,18 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { if sortOpt == ByReason && state != ConnClosed { return nil, fmt.Errorf("sort by reason only valid on closed connections") } - // If searching by CID if opts.CID > 0 { cid = opts.CID limit = 1 } + // If filtering by subject. + if opts.FilterSubject != _EMPTY_ && opts.FilterSubject != fwcs { + if acc == _EMPTY_ { + return nil, fmt.Errorf("filter by subject only valid with account filtering") + } + filter = opts.FilterSubject + } } c := &Connz{ @@ -231,8 +248,31 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { // Hold for closed clients if requested. var closedClients []*closedClient + var clist map[uint64]*client + + // If this is an account scoped request from a no $SYS account. + isAccReq := acc != _EMPTY_ && opts.isAccountReq + + if acc != _EMPTY_ { + var err error + a, err = s.lookupAccount(acc) + if err != nil { + return c, nil + } + a.mu.RLock() + clist = make(map[uint64]*client, a.numLocalConnections()) + for c := range a.clients { + clist[c.cid] = c + } + a.mu.RUnlock() + } + // Walk the open client list with server lock held. s.mu.Lock() + // Default to all client unless filled in above. + if clist == nil { + clist = s.clients + } // copy the server id for monitoring c.ID = s.info.ID @@ -241,14 +281,34 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { // may be smaller if pagination is used. switch state { case ConnOpen: - c.Total = len(s.clients) + if isAccReq { + c.Total = a.NumLocalConnections() + } else { + c.Total = len(s.clients) + } case ConnClosed: - c.Total = s.closed.len() closedClients = s.closed.closedClients() c.Total = len(closedClients) case ConnAll: + if isAccReq { + c.Total = a.NumLocalConnections() + } else { + c.Total = len(s.clients) + } closedClients = s.closed.closedClients() - c.Total = len(s.clients) + len(closedClients) + c.Total += len(closedClients) + } + + // We may need to filter these connections. + if isAccReq && len(closedClients) > 0 { + var ccc []*closedClient + for _, cc := range closedClients { + if cc.acc == acc { + ccc = append(ccc, cc) + } + } + c.Total -= (len(closedClients) - len(ccc)) + closedClients = ccc } totalClients := c.Total @@ -294,13 +354,13 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { } else { // Gather all open clients. if state == ConnOpen || state == ConnAll { - for _, client := range s.clients { + for _, client := range clist { // If we have an account specified we need to filter. - if acc != "" && (client.acc == nil || client.acc.Name != acc) { + if acc != _EMPTY_ && (client.acc == nil || client.acc.Name != acc) { continue } // Do user filtering second - if user != "" && client.opts.Username != user { + if user != _EMPTY_ && client.opts.Username != user { continue } openClients = append(openClients, client) @@ -309,6 +369,21 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { } s.mu.Unlock() + // Filter by subject now if needed. We do this outside of server lock. + if filter != _EMPTY_ { + var oc []*client + for _, c := range openClients { + c.mu.Lock() + for _, sub := range c.subs { + if SubjectsCollide(filter, string(sub.subject)) { + oc = append(oc, c) + } + } + c.mu.Unlock() + openClients = oc + } + } + // Just return with empty array if nothing here. if len(openClients) == 0 && len(closedClients) == 0 { c.Conns = ConnInfos{} @@ -316,7 +391,6 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { } // Now whip through and generate ConnInfo entries - // Open Clients i := 0 for _, client := range openClients { @@ -354,11 +428,11 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { } for _, cc := range closedClients { // If we have an account specified we need to filter. - if acc != "" && cc.acc != acc { + if acc != _EMPTY_ && cc.acc != acc { continue } // Do user filtering second - if user != "" && cc.user != user { + if user != _EMPTY_ && cc.user != user { continue } @@ -451,6 +525,8 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { // client should be locked. func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time) { ci.Cid = client.cid + ci.Kind = client.kindString() + ci.Type = client.clientTypeString() ci.Start = client.start ci.LastActivity = client.last ci.Uptime = myUptime(now.Sub(client.start)) diff --git a/server/norace_test.go b/server/norace_test.go index 8955d2fa..dbad9926 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -2549,3 +2549,131 @@ func TestNoRaceJetStreamStalledMirrorsAfterExpire(t *testing.T) { return nil }) } + +// We will use JetStream helpers to create supercluster but this test is about exposing the ability to access +// account scoped connz with subject interest filtering. +func TestNoRaceAccountConnz(t *testing.T) { + // This has 4 different account, 3 general and system. + sc := createJetStreamSuperClusterWithTemplate(t, jsClusterAccountsTempl, 3, 3) + defer sc.shutdown() + + // Create 20 connections on account one and two + num := 20 + for i := 0; i < num; i++ { + nc, _ := jsClientConnect(t, sc.randomServer(), nats.UserInfo("one", "p"), nats.Name("one")) + defer nc.Close() + + if i%2 == 0 { + nc.SubscribeSync("foo") + } else { + nc.SubscribeSync("bar") + } + + nc, _ = jsClientConnect(t, sc.randomServer(), nats.UserInfo("two", "p"), nats.Name("two")) + nc.SubscribeSync("baz") + nc.SubscribeSync("foo.bar.*") + nc.SubscribeSync(fmt.Sprintf("id.%d", i+1)) + defer nc.Close() + } + + type czapi struct { + Server *ServerInfo + Data *Connz + Error *ApiError + } + + parseConnz := func(buf []byte) *Connz { + t.Helper() + var cz czapi + if err := json.Unmarshal(buf, &cz); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if cz.Error != nil { + t.Fatalf("Unexpected error: %+v", cz.Error) + } + return cz.Data + } + + doRequest := func(reqSubj, acc, filter string, expected int) { + t.Helper() + nc, _ := jsClientConnect(t, sc.randomServer(), nats.UserInfo(acc, "p"), nats.Name(acc)) + defer nc.Close() + + mch := make(chan *nats.Msg, 9) + sub, _ := nc.ChanSubscribe(nats.NewInbox(), mch) + + var req []byte + if filter != _EMPTY_ { + req, _ = json.Marshal(&ConnzOptions{FilterSubject: filter}) + } + + if err := nc.PublishRequest(reqSubj, sub.Subject, req); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // So we can igniore ourtselves. + cid, _ := nc.GetClientID() + sid := nc.ConnectedServerId() + + wt := time.NewTimer(200 * time.Millisecond) + var conns []*ConnInfo + LOOP: + for { + select { + case m := <-mch: + if len(m.Data) == 0 { + t.Fatalf("No responders") + } + cr := parseConnz(m.Data) + // For account scoped, NumConns and Total should be the same (sans limits and offsets). + // It Total should not include other accounts since that would leak information about the system. + if filter == _EMPTY_ && cr.NumConns != cr.Total { + t.Fatalf("NumConns and Total should be same with account scoped connz, got %+v", cr) + } + for _, c := range cr.Conns { + if c.Name != acc { + t.Fatalf("Got wrong account: %q vs %q", acc, c.Account) + } + if !(c.Cid == cid && cr.ID == sid) { + conns = append(conns, c) + } + } + wt.Reset(200 * time.Millisecond) + case <-wt.C: + break LOOP + } + } + if len(conns) != expected { + t.Fatalf("Expected to see %d conns but got %d", expected, len(conns)) + } + } + + doSysRequest := func(acc string, expected int) { + t.Helper() + doRequest("$SYS.REQ.SERVER.PING.CONNZ", acc, _EMPTY_, expected) + } + doAccRequest := func(acc string, expected int) { + t.Helper() + doRequest("$SYS.REQ.ACCOUNT.PING.CONNZ", acc, _EMPTY_, expected) + } + doFiltered := func(acc, filter string, expected int) { + t.Helper() + doRequest("$SYS.REQ.SERVER.PING.CONNZ", acc, filter, expected) + } + + doSysRequest("one", 20) + doAccRequest("one", 20) + + doSysRequest("two", 20) + doAccRequest("two", 20) + + // Now check filtering. + doFiltered("one", _EMPTY_, 20) + doFiltered("one", ">", 20) + doFiltered("one", "bar", 10) + doFiltered("two", "bar", 0) + doFiltered("two", "id.1", 1) + doFiltered("two", "id.*", 20) + doFiltered("two", "foo.bar.*", 20) + doFiltered("two", "foo.>", 20) +} diff --git a/server/server.go b/server/server.go index 5e993f87..cce12c7a 100644 --- a/server/server.go +++ b/server/server.go @@ -1319,6 +1319,12 @@ func (s *Server) registerAccountNoLock(acc *Account) *Account { s.accounts.Store(acc.Name, acc) s.tmpAccounts.Delete(acc.Name) s.enableAccountTracking(acc) + + // Can not have server lock here. + s.mu.Unlock() + s.registerSystemImports(acc) + s.mu.Lock() + return nil }