diff --git a/server/accounts.go b/server/accounts.go index 9fe4a81c..e20be9bc 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -816,8 +816,7 @@ func (a *Account) selectMappedSubject(dest string) (string, bool) { } // SubscriptionInterest returns true if this account has a matching subscription -// for the given `subject`. Works only for literal subjects. -// TODO: Add support for wildcards +// for the given `subject`. func (a *Account) SubscriptionInterest(subject string) bool { return a.Interest(subject) > 0 } @@ -1377,7 +1376,7 @@ func (a *Account) AddServiceImportWithClaim(destination *Account, from, to strin return ErrMissingAccount } // Empty means use from. - if to == "" { + if to == _EMPTY_ { to = from } if !IsValidSubject(from) || !IsValidSubject(to) { @@ -2277,7 +2276,7 @@ func (a *Account) AddStreamImportWithClaim(account *Account, from, prefix string // Check prefix if it exists and make sure its a literal. // Append token separator if not already present. - if prefix != "" { + if prefix != _EMPTY_ { // Make sure there are no wildcards here, this prefix needs to be a literal // since it will be prepended to a publish subject. if !subjectIsLiteral(prefix) { @@ -2415,7 +2414,7 @@ func (a *Account) checkStreamImportAuthorizedNoLock(account *Account, subject st func (a *Account) checkAuth(ea *exportAuth, account *Account, imClaim *jwt.Import, tokens []string) bool { // if ea is nil or ea.approved is nil, that denotes a public export - if ea == nil || (ea.approved == nil && !ea.tokenReq && ea.accountPos == 0) { + if ea == nil || (len(ea.approved) == 0 && !ea.tokenReq && ea.accountPos == 0) { return true } // Check if the export is protected and enforces presence of importing account identity @@ -2444,6 +2443,7 @@ func (a *Account) checkStreamExportApproved(account *Account, subject string, im } return a.checkAuth(&ea.exportAuth, account, imClaim, nil) } + // ok if we are here we did not match directly so we need to test each one. // The import subject arg has to take precedence, meaning the export // has to be a true subset of the import claim. We already checked for @@ -2464,7 +2464,7 @@ func (a *Account) checkServiceExportApproved(account *Account, subject string, i // Check direct match of subject first se, ok := a.exports.services[subject] if ok { - // if ea is nil or eq.approved is nil, that denotes a public export + // if se is nil or eq.approved is nil, that denotes a public export if se == nil { return true } diff --git a/server/events.go b/server/events.go index 2aabcc63..25fa80db 100644 --- a/server/events.go +++ b/server/events.go @@ -57,6 +57,7 @@ const ( leafNodeConnectEventSubj = "$SYS.ACCOUNT.%s.LEAFNODE.CONNECT" // for internal use only remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s" inboxRespSubj = "$SYS._INBOX.%s.%s" + accConnzReqSubj = "$SYS.REQ.ACCOUNT.PING.CONNZ" // FIXME(dlc) - Should account scope, even with wc for now, but later on // we can then shard as needed. @@ -464,9 +465,8 @@ func (s *Server) eventsRunning() bool { // a defined system account. func (s *Server) EventsEnabled() bool { s.mu.Lock() - ee := s.eventsEnabled() - s.mu.Unlock() - return ee + defer s.mu.Unlock() + return s.eventsEnabled() } // eventsEnabled will report if events are enabled. @@ -777,7 +777,7 @@ func (s *Server) initEventTracking() { } extractAccount := func(subject string) (string, error) { if tk := strings.Split(subject, tsep); len(tk) != accReqTokens { - return "", fmt.Errorf("subject %q is malformed", subject) + return _EMPTY_, fmt.Errorf("subject %q is malformed", subject) } else { return tk[accReqAccIndex], nil } @@ -801,6 +801,14 @@ func (s *Server) initEventTracking() { if acc, err := extractAccount(subject); err != nil { return nil, err } else { + if ci, _, _, _, err := c.srv.getRequestInfo(c, msg); err == nil { + // Make sure the accounts match. + if ci.Account != acc { + // Do not leak too much here. + return nil, fmt.Errorf("bad request") + } + optz.ConnzOptions.isAccountReq = true + } optz.ConnzOptions.Account = acc return s.Connz(&optz.ConnzOptions) } @@ -863,14 +871,47 @@ func (s *Server) initEventTracking() { } } +// register existing accounts with any system exports. +func (s *Server) registerSystemImportsForExisting() { + var accounts []*Account + + s.mu.Lock() + if s.sys == nil { + s.mu.Unlock() + return + } + sacc := s.sys.account + s.accounts.Range(func(k, v interface{}) bool { + a := v.(*Account) + if a != sacc { + accounts = append(accounts, a) + } + return true + }) + s.mu.Unlock() + + for _, a := range accounts { + s.registerSystemImports(a) + } +} + // add all exports a system account will need func (s *Server) addSystemAccountExports(sacc *Account) { if !s.EventsEnabled() { return } + accConnzSubj := fmt.Sprintf(accReqSubj, "*", "CONNZ") + if err := sacc.AddServiceExportWithResponse(accConnzSubj, Streamed, nil); err != nil { + s.Errorf("Error adding system service export for %q: %v", accConnzSubj, err) + } + // Register any accounts that existed prior. + s.registerSystemImportsForExisting() + + // FIXME(dlc) - Old experiment, Remove? if err := sacc.AddServiceExport(accSubsSubj, nil); err != nil { s.Errorf("Error adding system service export for %q: %v", accSubsSubj, err) } + if s.JetStreamEnabled() { s.checkJetStreamExports() } @@ -1329,6 +1370,29 @@ func (s *Server) remoteConnsUpdate(sub *subscription, _ *client, _ *Account, sub } } +// This will import any system level exports. +func (s *Server) registerSystemImports(a *Account) { + if a == nil || !s.eventsEnabled() { + return + } + sacc := s.SystemAccount() + if sacc == nil { + return + } + // FIXME(dlc) - make a shared list between sys exports etc. + connzSubj := fmt.Sprintf(serverPingReqSubj, "CONNZ") + mappedSubj := fmt.Sprintf(accReqSubj, a.Name, "CONNZ") + + // Add in this to the account in 2 places. + // "$SYS.REQ.SERVER.PING.CONNZ" and "$SYS.REQ.ACCOUNT.CONNZ" + if err := a.AddServiceImport(sacc, connzSubj, mappedSubj); err != nil { + s.Errorf("Error setting up system service imports for account: %v", err) + } + if err := a.AddServiceImport(sacc, accConnzReqSubj, mappedSubj); err != nil { + s.Errorf("Error setting up system service imports for account: %v", err) + } +} + // Setup tracking for this account. This allows us to track global account activity. // Lock should be held on entry. func (s *Server) enableAccountTracking(a *Account) { diff --git a/server/monitor.go b/server/monitor.go index 9e27323b..66377922 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -86,6 +86,13 @@ type ConnzOptions struct { // Filter by account. Account string `json:"acc"` + + // Filter by subject interest + FilterSubject string `json:"filter_subject"` + + // Private indication that this request is from an account and not a system account. + // Used to not leak system level information to the account. + isAccountReq bool } // ConnState is for filtering states of connections. We will only have two, open and closed. @@ -103,6 +110,8 @@ const ( // ConnInfo has detailed information on a per connection basis. type ConnInfo struct { Cid uint64 `json:"cid"` + Kind string `json:"kind,omitempty"` + Type string `json:"type,omitempty"` IP string `json:"ip"` Port int `json:"port"` Start time.Time `json:"start"` @@ -170,11 +179,13 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { state = ConnOpen user string acc string + a *Account + filter string ) if opts != nil { // If no sort option given or sort is by uptime, then sort by cid - if opts.Sort == "" { + if opts.Sort == _EMPTY_ { sortOpt = ByCid } else { sortOpt = opts.Sort @@ -185,7 +196,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { // Auth specifics. auth = opts.Username - if !auth && (user != "" || acc != "") { + if !auth && (user != _EMPTY_ || acc != _EMPTY_) { return nil, fmt.Errorf("filter by user or account only allowed with auth option") } user = opts.User @@ -212,12 +223,18 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { if sortOpt == ByReason && state != ConnClosed { return nil, fmt.Errorf("sort by reason only valid on closed connections") } - // If searching by CID if opts.CID > 0 { cid = opts.CID limit = 1 } + // If filtering by subject. + if opts.FilterSubject != _EMPTY_ && opts.FilterSubject != fwcs { + if acc == _EMPTY_ { + return nil, fmt.Errorf("filter by subject only valid with account filtering") + } + filter = opts.FilterSubject + } } c := &Connz{ @@ -231,8 +248,31 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { // Hold for closed clients if requested. var closedClients []*closedClient + var clist map[uint64]*client + + // If this is an account scoped request from a no $SYS account. + isAccReq := acc != _EMPTY_ && opts.isAccountReq + + if acc != _EMPTY_ { + var err error + a, err = s.lookupAccount(acc) + if err != nil { + return c, nil + } + a.mu.RLock() + clist = make(map[uint64]*client, a.numLocalConnections()) + for c := range a.clients { + clist[c.cid] = c + } + a.mu.RUnlock() + } + // Walk the open client list with server lock held. s.mu.Lock() + // Default to all client unless filled in above. + if clist == nil { + clist = s.clients + } // copy the server id for monitoring c.ID = s.info.ID @@ -241,14 +281,34 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { // may be smaller if pagination is used. switch state { case ConnOpen: - c.Total = len(s.clients) + if isAccReq { + c.Total = a.NumLocalConnections() + } else { + c.Total = len(s.clients) + } case ConnClosed: - c.Total = s.closed.len() closedClients = s.closed.closedClients() c.Total = len(closedClients) case ConnAll: + if isAccReq { + c.Total = a.NumLocalConnections() + } else { + c.Total = len(s.clients) + } closedClients = s.closed.closedClients() - c.Total = len(s.clients) + len(closedClients) + c.Total += len(closedClients) + } + + // We may need to filter these connections. + if isAccReq && len(closedClients) > 0 { + var ccc []*closedClient + for _, cc := range closedClients { + if cc.acc == acc { + ccc = append(ccc, cc) + } + } + c.Total -= (len(closedClients) - len(ccc)) + closedClients = ccc } totalClients := c.Total @@ -294,13 +354,13 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { } else { // Gather all open clients. if state == ConnOpen || state == ConnAll { - for _, client := range s.clients { + for _, client := range clist { // If we have an account specified we need to filter. - if acc != "" && (client.acc == nil || client.acc.Name != acc) { + if acc != _EMPTY_ && (client.acc == nil || client.acc.Name != acc) { continue } // Do user filtering second - if user != "" && client.opts.Username != user { + if user != _EMPTY_ && client.opts.Username != user { continue } openClients = append(openClients, client) @@ -309,6 +369,21 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { } s.mu.Unlock() + // Filter by subject now if needed. We do this outside of server lock. + if filter != _EMPTY_ { + var oc []*client + for _, c := range openClients { + c.mu.Lock() + for _, sub := range c.subs { + if SubjectsCollide(filter, string(sub.subject)) { + oc = append(oc, c) + } + } + c.mu.Unlock() + openClients = oc + } + } + // Just return with empty array if nothing here. if len(openClients) == 0 && len(closedClients) == 0 { c.Conns = ConnInfos{} @@ -316,7 +391,6 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { } // Now whip through and generate ConnInfo entries - // Open Clients i := 0 for _, client := range openClients { @@ -354,11 +428,11 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { } for _, cc := range closedClients { // If we have an account specified we need to filter. - if acc != "" && cc.acc != acc { + if acc != _EMPTY_ && cc.acc != acc { continue } // Do user filtering second - if user != "" && cc.user != user { + if user != _EMPTY_ && cc.user != user { continue } @@ -451,6 +525,8 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { // client should be locked. func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time) { ci.Cid = client.cid + ci.Kind = client.kindString() + ci.Type = client.clientTypeString() ci.Start = client.start ci.LastActivity = client.last ci.Uptime = myUptime(now.Sub(client.start)) diff --git a/server/norace_test.go b/server/norace_test.go index 8955d2fa..dbad9926 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -2549,3 +2549,131 @@ func TestNoRaceJetStreamStalledMirrorsAfterExpire(t *testing.T) { return nil }) } + +// We will use JetStream helpers to create supercluster but this test is about exposing the ability to access +// account scoped connz with subject interest filtering. +func TestNoRaceAccountConnz(t *testing.T) { + // This has 4 different account, 3 general and system. + sc := createJetStreamSuperClusterWithTemplate(t, jsClusterAccountsTempl, 3, 3) + defer sc.shutdown() + + // Create 20 connections on account one and two + num := 20 + for i := 0; i < num; i++ { + nc, _ := jsClientConnect(t, sc.randomServer(), nats.UserInfo("one", "p"), nats.Name("one")) + defer nc.Close() + + if i%2 == 0 { + nc.SubscribeSync("foo") + } else { + nc.SubscribeSync("bar") + } + + nc, _ = jsClientConnect(t, sc.randomServer(), nats.UserInfo("two", "p"), nats.Name("two")) + nc.SubscribeSync("baz") + nc.SubscribeSync("foo.bar.*") + nc.SubscribeSync(fmt.Sprintf("id.%d", i+1)) + defer nc.Close() + } + + type czapi struct { + Server *ServerInfo + Data *Connz + Error *ApiError + } + + parseConnz := func(buf []byte) *Connz { + t.Helper() + var cz czapi + if err := json.Unmarshal(buf, &cz); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if cz.Error != nil { + t.Fatalf("Unexpected error: %+v", cz.Error) + } + return cz.Data + } + + doRequest := func(reqSubj, acc, filter string, expected int) { + t.Helper() + nc, _ := jsClientConnect(t, sc.randomServer(), nats.UserInfo(acc, "p"), nats.Name(acc)) + defer nc.Close() + + mch := make(chan *nats.Msg, 9) + sub, _ := nc.ChanSubscribe(nats.NewInbox(), mch) + + var req []byte + if filter != _EMPTY_ { + req, _ = json.Marshal(&ConnzOptions{FilterSubject: filter}) + } + + if err := nc.PublishRequest(reqSubj, sub.Subject, req); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // So we can igniore ourtselves. + cid, _ := nc.GetClientID() + sid := nc.ConnectedServerId() + + wt := time.NewTimer(200 * time.Millisecond) + var conns []*ConnInfo + LOOP: + for { + select { + case m := <-mch: + if len(m.Data) == 0 { + t.Fatalf("No responders") + } + cr := parseConnz(m.Data) + // For account scoped, NumConns and Total should be the same (sans limits and offsets). + // It Total should not include other accounts since that would leak information about the system. + if filter == _EMPTY_ && cr.NumConns != cr.Total { + t.Fatalf("NumConns and Total should be same with account scoped connz, got %+v", cr) + } + for _, c := range cr.Conns { + if c.Name != acc { + t.Fatalf("Got wrong account: %q vs %q", acc, c.Account) + } + if !(c.Cid == cid && cr.ID == sid) { + conns = append(conns, c) + } + } + wt.Reset(200 * time.Millisecond) + case <-wt.C: + break LOOP + } + } + if len(conns) != expected { + t.Fatalf("Expected to see %d conns but got %d", expected, len(conns)) + } + } + + doSysRequest := func(acc string, expected int) { + t.Helper() + doRequest("$SYS.REQ.SERVER.PING.CONNZ", acc, _EMPTY_, expected) + } + doAccRequest := func(acc string, expected int) { + t.Helper() + doRequest("$SYS.REQ.ACCOUNT.PING.CONNZ", acc, _EMPTY_, expected) + } + doFiltered := func(acc, filter string, expected int) { + t.Helper() + doRequest("$SYS.REQ.SERVER.PING.CONNZ", acc, filter, expected) + } + + doSysRequest("one", 20) + doAccRequest("one", 20) + + doSysRequest("two", 20) + doAccRequest("two", 20) + + // Now check filtering. + doFiltered("one", _EMPTY_, 20) + doFiltered("one", ">", 20) + doFiltered("one", "bar", 10) + doFiltered("two", "bar", 0) + doFiltered("two", "id.1", 1) + doFiltered("two", "id.*", 20) + doFiltered("two", "foo.bar.*", 20) + doFiltered("two", "foo.>", 20) +} diff --git a/server/server.go b/server/server.go index 5e993f87..cce12c7a 100644 --- a/server/server.go +++ b/server/server.go @@ -1319,6 +1319,12 @@ func (s *Server) registerAccountNoLock(acc *Account) *Account { s.accounts.Store(acc.Name, acc) s.tmpAccounts.Delete(acc.Name) s.enableAccountTracking(acc) + + // Can not have server lock here. + s.mu.Unlock() + s.registerSystemImports(acc) + s.mu.Lock() + return nil }