From d53d2d04844f4aff75507872eec8de4388de678e Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Tue, 12 Jul 2022 21:50:32 +0200 Subject: [PATCH] [Added] account specific monitoring endpoint(s) (#3250) Added http monitoring endpoint /accstatz It responds with a list of statz for all accounts with local connections the argument "unused=1" can be provided to get statz for all accounts This endpoint is also exposed as nats request under: This monitoring endpoint is exposed via the system account. $SYS.REQ.ACCOUNT.*.STATZ Each server will respond with connection statistics for the requested account. The format of the data section is a list (size 1) identical to the event $SYS.ACCOUNT.%s.SERVER.CONNS which is sent periodically as well as on connect/disconnect. Unless requested by options, server without the account, or server where the account has no local connections, will not respond. A PING endpoint exists as well. The response format is identical to $SYS.REQ.ACCOUNT.*.STATZ (however the data section will contain more than one account, if they exist) In addition to general filter options the request takes a list of accounts and an argument to include accounts without local connections (disabled by default) $SYS.REQ.ACCOUNT.PING.STATZ Each account has a new system account import where the local subject $SYS.REQ.ACCOUNT.PING.STATZ essentially responds as if the importing account name was used for $SYS.REQ.ACCOUNT.*.STATZ The only difference between requesting ACCOUNT.PING.STATZ from within the system account and an account is that the later can only retrieve statz for the account the client requests from. Also exposed the monitoring /healthz via the system account under $SYS.REQ.SERVER.*.HEALTHZ $SYS.REQ.SERVER.PING.HEALTHZ No dedicated options are available for these. HEALTHZ also accept general filter options. Signed-off-by: Matthias Hanel --- server/events.go | 176 +++++++++++++++++-------- server/events_test.go | 137 ++++++++++++------- server/jetstream_cluster_test.go | 2 +- server/jetstream_super_cluster_test.go | 5 +- server/leafnode_test.go | 16 +-- server/monitor.go | 83 ++++++++++-- server/monitor_test.go | 44 +++---- server/routes_test.go | 2 +- server/server.go | 27 ++-- test/leafnode_test.go | 36 ++--- 10 files changed, 350 insertions(+), 178 deletions(-) diff --git a/server/events.go b/server/events.go index 55dfaefd..441434d6 100644 --- a/server/events.go +++ b/server/events.go @@ -42,7 +42,8 @@ const ( connectEventSubj = "$SYS.ACCOUNT.%s.CONNECT" disconnectEventSubj = "$SYS.ACCOUNT.%s.DISCONNECT" - accReqSubj = "$SYS.REQ.ACCOUNT.%s.%s" + accDirectReqSubj = "$SYS.REQ.ACCOUNT.%s.%s" + accPingReqSubj = "$SYS.REQ.ACCOUNT.PING.%s" // atm. only used for STATZ and CONNZ import from system account // kept for backward compatibility when using http resolver // this overlaps with the names for events but you'd have to have the operator private key in order to succeed. accUpdateEventSubjOld = "$SYS.ACCOUNT.%s.CLAIMS.UPDATE" @@ -59,7 +60,6 @@ const ( leafNodeConnectEventSubj = "$SYS.ACCOUNT.%s.LEAFNODE.CONNECT" // for internal use only remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s" inboxRespSubj = "$SYS._INBOX.%s.%s" - accConnzReqSubj = "$SYS.REQ.ACCOUNT.PING.CONNZ" // FIXME(dlc) - Should account scope, even with wc for now, but later on // we can then shard as needed. @@ -138,14 +138,19 @@ const DisconnectEventMsgType = "io.nats.server.advisory.v1.client_disconnect" // updates in the absence of any changes. type AccountNumConns struct { TypedEvent - Server ServerInfo `json:"server"` - Account string `json:"acc"` - Conns int `json:"conns"` - LeafNodes int `json:"leafnodes"` - TotalConns int `json:"total_conns"` - Sent DataStats `json:"sent"` - Received DataStats `json:"received"` - SlowConsumers int64 `json:"slow_consumers"` + Server ServerInfo `json:"server"` + AccountStat +} + +// AccountStat contains the data common between AccountNumConns and AccountStatz +type AccountStat struct { + Account string `json:"acc"` + Conns int `json:"conns"` + LeafNodes int `json:"leafnodes"` + TotalConns int `json:"total_conns"` + Sent DataStats `json:"sent"` + Received DataStats `json:"received"` + SlowConsumers int64 `json:"slow_consumers"` } const AccountNumConnsMsgType = "io.nats.server.advisory.v1.account_connections" @@ -869,6 +874,10 @@ func (s *Server) initEventTracking() { optz := &JszEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Jsz(&optz.JSzOptions) }) }, + "HEALTHZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + optz := &EventFilterOptions{} + s.zReq(c, reply, msg, optz, optz, func() (interface{}, error) { return s.healthz(), nil }) + }, } for name, req := range monSrvc { subject = fmt.Sprintf(serverDirectReqSubj, s.info.ID, name) @@ -880,18 +889,26 @@ func (s *Server) initEventTracking() { s.Errorf("Error setting up internal tracking: %v", err) } } - extractAccount := func(subject string) (string, error) { + extractAccount := func(c *client, subject string, msg []byte) (string, error) { if tk := strings.Split(subject, tsep); len(tk) != accReqTokens { return _EMPTY_, fmt.Errorf("subject %q is malformed", subject) } else { - return tk[accReqAccIndex], nil + acc := tk[accReqAccIndex] + if ci, _, _, _, err := c.srv.getRequestInfo(c, msg); err == nil && ci.Account != _EMPTY_ { + // Make sure the accounts match. + if ci.Account != acc { + // Do not leak too much here. + return _EMPTY_, fmt.Errorf("bad request") + } + } + return acc, nil } } monAccSrvc := map[string]msgHandler{ "SUBSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { optz := &SubszEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { - if acc, err := extractAccount(subject); err != nil { + if acc, err := extractAccount(c, subject, msg); err != nil { return nil, err } else { optz.SubszOptions.Subscriptions = true @@ -903,17 +920,9 @@ func (s *Server) initEventTracking() { "CONNZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { optz := &ConnzEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { - if acc, err := extractAccount(subject); err != nil { + if acc, err := extractAccount(c, subject, msg); err != nil { return nil, err } else { - if ci, _, _, _, err := c.srv.getRequestInfo(c, msg); err == nil && ci.Account != _EMPTY_ { - // Make sure the accounts match. - if ci.Account != acc { - // Do not leak too much here. - return nil, fmt.Errorf("bad request") - } - optz.ConnzOptions.isAccountReq = true - } optz.ConnzOptions.Account = acc return s.Connz(&optz.ConnzOptions) } @@ -922,7 +931,7 @@ func (s *Server) initEventTracking() { "LEAFZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { optz := &LeafzEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { - if acc, err := extractAccount(subject); err != nil { + if acc, err := extractAccount(c, subject, msg); err != nil { return nil, err } else { optz.LeafzOptions.Account = acc @@ -933,7 +942,7 @@ func (s *Server) initEventTracking() { "JSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { optz := &JszEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { - if acc, err := extractAccount(subject); err != nil { + if acc, err := extractAccount(c, subject, msg); err != nil { return nil, err } else { optz.Account = acc @@ -944,21 +953,60 @@ func (s *Server) initEventTracking() { "INFO": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { optz := &AccInfoEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { - if acc, err := extractAccount(subject); err != nil { + if acc, err := extractAccount(c, subject, msg); err != nil { return nil, err } else { return s.accountInfo(acc) } }) }, + // STATZ is essentially a duplicate of CONNS with an envelope identical to the others. + // For historical reasons CONNS is the odd one out. + // STATZ is also less heavy weight than INFO + "STATZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + optz := &AccountStatzEventOptions{} + s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { + if acc, err := extractAccount(c, subject, msg); err != nil { + return nil, err + } else if acc == "PING" { // Filter PING subject. Happens for server as well. But wildcards are not used + return nil, errSkipZreq + } else { + optz.Accounts = []string{acc} + if stz, err := s.AccountStatz(&optz.AccountStatzOptions); err != nil { + return nil, err + } else if len(stz.Accounts) == 0 && !optz.IncludeUnused { + return nil, errSkipZreq + } else { + return stz, nil + } + } + }) + }, "CONNS": s.connsRequest, } for name, req := range monAccSrvc { - if _, err := s.sysSubscribe(fmt.Sprintf(accReqSubj, "*", name), req); err != nil { + if _, err := s.sysSubscribe(fmt.Sprintf(accDirectReqSubj, "*", name), req); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } } + // For now only the STATZ subject has an account specific ping equivalent. + if _, err := s.sysSubscribe(fmt.Sprintf(accPingReqSubj, "STATZ"), + func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + optz := &AccountStatzEventOptions{} + s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { + if stz, err := s.AccountStatz(&optz.AccountStatzOptions); err != nil { + return nil, err + } else if len(stz.Accounts) == 0 && !optz.IncludeUnused { + return nil, errSkipZreq + } else { + return stz, nil + } + }) + }); err != nil { + s.Errorf("Error setting up internal tracking: %v", err) + } + // Listen for updates when leaf nodes connect for a given account. This will // force any gateway connections to move to `modeInterestOnly` subject = fmt.Sprintf(leafNodeConnectEventSubj, "*") @@ -1005,10 +1053,14 @@ func (s *Server) addSystemAccountExports(sacc *Account) { if !s.EventsEnabled() { return } - accConnzSubj := fmt.Sprintf(accReqSubj, "*", "CONNZ") + accConnzSubj := fmt.Sprintf(accDirectReqSubj, "*", "CONNZ") if err := sacc.AddServiceExportWithResponse(accConnzSubj, Streamed, nil); err != nil { s.Errorf("Error adding system service export for %q: %v", accConnzSubj, err) } + accStatzSubj := fmt.Sprintf(accDirectReqSubj, "*", "STATZ") + if err := sacc.AddServiceExportWithResponse(accStatzSubj, Streamed, nil); err != nil { + s.Errorf("Error adding system service export for %q: %v", accStatzSubj, err) + } // Register any accounts that existed prior. s.registerSystemImportsForExisting() @@ -1369,6 +1421,12 @@ type AccountzEventOptions struct { EventFilterOptions } +// In the context of system events, AccountzEventOptions are options passed to Accountz +type AccountStatzEventOptions struct { + AccountStatzOptions + EventFilterOptions +} + // In the context of system events, JszEventOptions are options passed to Jsz type JszEventOptions struct { JSzOptions @@ -1572,21 +1630,21 @@ func (s *Server) registerSystemImports(a *Account) { return } // FIXME(dlc) - make a shared list between sys exports etc. - connzSubj := fmt.Sprintf(serverPingReqSubj, "CONNZ") - mappedSubj := fmt.Sprintf(accReqSubj, a.Name, "CONNZ") + importSrvc := func(subj, mappedSubj string) { + if !a.serviceImportExists(subj) { + if err := a.AddServiceImport(sacc, subj, mappedSubj); err != nil { + s.Errorf("Error setting up system service import %s -> %s for account: %v", + subj, mappedSubj, err) + } + } + } // Add in this to the account in 2 places. // "$SYS.REQ.SERVER.PING.CONNZ" and "$SYS.REQ.ACCOUNT.PING.CONNZ" - if !a.serviceImportExists(connzSubj) { - if err := a.AddServiceImport(sacc, connzSubj, mappedSubj); err != nil { - s.Errorf("Error setting up system service imports for account: %v", err) - } - } - if !a.serviceImportExists(accConnzReqSubj) { - if err := a.AddServiceImport(sacc, accConnzReqSubj, mappedSubj); err != nil { - s.Errorf("Error setting up system service imports for account: %v", err) - } - } + mappedConnzSubj := fmt.Sprintf(accDirectReqSubj, a.Name, "CONNZ") + importSrvc(fmt.Sprintf(accPingReqSubj, "CONNZ"), mappedConnzSubj) + importSrvc(fmt.Sprintf(serverPingReqSubj, "CONNZ"), mappedConnzSubj) + importSrvc(fmt.Sprintf(accPingReqSubj, "STATZ"), fmt.Sprintf(accDirectReqSubj, a.Name, "STATZ")) } // Setup tracking for this account. This allows us to track global account activity. @@ -1600,7 +1658,7 @@ func (s *Server) enableAccountTracking(a *Account) { // May need to ensure we do so only if there is a known interest. // This can get complicated with gateways. - subj := fmt.Sprintf(accReqSubj, a.Name, "CONNS") + subj := fmt.Sprintf(accDirectReqSubj, a.Name, "CONNS") reply := fmt.Sprintf(connsRespSubj, s.info.ID) m := accNumConnsReq{Account: a.Name} s.sendInternalMsg(subj, reply, &m.Server, &m) @@ -1643,29 +1701,18 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) { // Build event with account name and number of local clients and leafnodes. eid := s.nextEventID() a.mu.Lock() - localConns := a.numLocalConnections() - leafConns := a.numLocalLeafNodes() - m := &AccountNumConns{ + stat := a.statz() + m := AccountNumConns{ TypedEvent: TypedEvent{ Type: AccountNumConnsMsgType, ID: eid, Time: time.Now().UTC(), }, - Account: a.Name, - Conns: localConns, - LeafNodes: leafConns, - TotalConns: localConns + leafConns, - Received: DataStats{ - Msgs: atomic.LoadInt64(&a.inMsgs), - Bytes: atomic.LoadInt64(&a.inBytes)}, - Sent: DataStats{ - Msgs: atomic.LoadInt64(&a.outMsgs), - Bytes: atomic.LoadInt64(&a.outBytes)}, - SlowConsumers: atomic.LoadInt64(&a.slowConsumers), + AccountStat: *stat, } // Set timer to fire again unless we are at zero, but only if the account // is not configured for JetStream. - if localConns == 0 && !a.jetStreamConfiguredNoLock() { + if m.TotalConns == 0 && !a.jetStreamConfiguredNoLock() { clearTimer(&a.ctmr) } else { // Check to see if we have an HB running and update. @@ -1682,6 +1729,25 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) { a.mu.Unlock() } +// Lock shoulc be held on entry +func (a *Account) statz() *AccountStat { + localConns := a.numLocalConnections() + leafConns := a.numLocalLeafNodes() + return &AccountStat{ + Account: a.Name, + Conns: localConns, + LeafNodes: leafConns, + TotalConns: localConns + leafConns, + Received: DataStats{ + Msgs: atomic.LoadInt64(&a.inMsgs), + Bytes: atomic.LoadInt64(&a.inBytes)}, + Sent: DataStats{ + Msgs: atomic.LoadInt64(&a.outMsgs), + Bytes: atomic.LoadInt64(&a.outBytes)}, + SlowConsumers: atomic.LoadInt64(&a.slowConsumers), + } +} + // accConnsUpdate is called whenever there is a change to the account's // number of active connections, or during a heartbeat. func (s *Server) accConnsUpdate(a *Account) { diff --git a/server/events_test.go b/server/events_test.go index a5f1321b..5c14f94b 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1239,14 +1239,15 @@ func TestAccountReqMonitoring(t *testing.T) { sacc, sakp := createAccount(s) s.setSystemAccount(sacc) s.EnableJetStream(nil) + unusedAcc, _ := createAccount(s) acc, akp := createAccount(s) - if acc == nil { - t.Fatalf("did not create account") - } acc.EnableJetStream(nil) - subsz := fmt.Sprintf(accReqSubj, acc.Name, "SUBSZ") - connz := fmt.Sprintf(accReqSubj, acc.Name, "CONNZ") - jsz := fmt.Sprintf(accReqSubj, acc.Name, "JSZ") + subsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "SUBSZ") + connz := fmt.Sprintf(accDirectReqSubj, acc.Name, "CONNZ") + jsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "JSZ") + + pStatz := fmt.Sprintf(accPingReqSubj, "STATZ") + statz := func(name string) string { return fmt.Sprintf(accDirectReqSubj, name, "STATZ") } // Create system account connection to query url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) ncSys, err := nats.Connect(url, createUserCreds(t, s, sakp)) @@ -1261,42 +1262,86 @@ func TestAccountReqMonitoring(t *testing.T) { } defer nc.Close() // query SUBSZ for account - if resp, err := ncSys.Request(subsz, nil, time.Second); err != nil { - t.Fatalf("Error on request: %v", err) - } else if !strings.Contains(string(resp.Data), `"num_subscriptions":3,`) { - t.Fatalf("unexpected subs count (expected 3): %v", string(resp.Data)) - } + resp, err := ncSys.Request(subsz, nil, time.Second) + require_NoError(t, err) + require_Contains(t, string(resp.Data), `"num_subscriptions":4,`) // create a subscription - if sub, err := nc.Subscribe("foo", func(msg *nats.Msg) {}); err != nil { - t.Fatalf("error on subscribe %v", err) - } else { - defer sub.Unsubscribe() - } - nc.Flush() + sub, err := nc.Subscribe("foo", func(msg *nats.Msg) {}) + require_NoError(t, err) + defer sub.Unsubscribe() + + require_NoError(t, nc.Flush()) // query SUBSZ for account - if resp, err := ncSys.Request(subsz, nil, time.Second); err != nil { - t.Fatalf("Error on request: %v", err) - } else if !strings.Contains(string(resp.Data), `"num_subscriptions":4,`) { - t.Fatalf("unexpected subs count (expected 4): %v", string(resp.Data)) - } else if !strings.Contains(string(resp.Data), `"subject":"foo"`) { - t.Fatalf("expected subscription foo: %v", string(resp.Data)) - } + resp, err = ncSys.Request(subsz, nil, time.Second) + require_NoError(t, err) + require_Contains(t, string(resp.Data), `"num_subscriptions":5,`, `"subject":"foo"`) // query connections for account - if resp, err := ncSys.Request(connz, nil, time.Second); err != nil { - t.Fatalf("Error on request: %v", err) - } else if !strings.Contains(string(resp.Data), `"num_connections":1,`) { - t.Fatalf("unexpected subs count (expected 1): %v", string(resp.Data)) - } else if !strings.Contains(string(resp.Data), `"total":1,`) { - t.Fatalf("unexpected subs count (expected 1): %v", string(resp.Data)) - } + resp, err = ncSys.Request(connz, nil, time.Second) + require_NoError(t, err) + require_Contains(t, string(resp.Data), `"num_connections":1,`, `"total":1,`) // query connections for js account - if resp, err := ncSys.Request(jsz, nil, time.Second); err != nil { - t.Fatalf("Error on request: %v", err) - } else if !strings.Contains(string(resp.Data), `"memory":0,`) { - t.Fatalf("jetstream should be enabled but empty: %v", string(resp.Data)) - } else if !strings.Contains(string(resp.Data), `"storage":0,`) { - t.Fatalf("jetstream should be enabled but empty: %v", string(resp.Data)) - } + resp, err = ncSys.Request(jsz, nil, time.Second) + require_NoError(t, err) + require_Contains(t, string(resp.Data), `"memory":0,`, `"storage":0,`) + // query statz/conns for account + resp, err = ncSys.Request(statz(acc.Name), nil, time.Second) + require_NoError(t, err) + respContentAcc := []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"sent":{"msgs":0,"bytes":0}`, + `"received":{"msgs":0,"bytes":0}`, fmt.Sprintf(`"acc":"%s"`, acc.Name)} + require_Contains(t, string(resp.Data), respContentAcc...) + + rIb := ncSys.NewRespInbox() + rSub, err := ncSys.SubscribeSync(rIb) + require_NoError(t, err) + require_NoError(t, ncSys.PublishRequest(pStatz, rIb, nil)) + minRespContentForBothAcc := []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"acc":"`} + resp, err = rSub.NextMsg(time.Second) + require_NoError(t, err) + require_Contains(t, string(resp.Data), minRespContentForBothAcc...) + // expect one entry per account + require_Contains(t, string(resp.Data), fmt.Sprintf(`"acc":"%s"`, acc.Name), fmt.Sprintf(`"acc":"%s"`, sacc.Name)) + + // Test ping with filter by account name + require_NoError(t, ncSys.PublishRequest(pStatz, rIb, []byte(fmt.Sprintf(`{"accounts":["%s"]}`, sacc.Name)))) + m, err := rSub.NextMsg(time.Second) + require_NoError(t, err) + require_Contains(t, string(m.Data), minRespContentForBothAcc...) + + require_NoError(t, ncSys.PublishRequest(pStatz, rIb, []byte(fmt.Sprintf(`{"accounts":["%s"]}`, acc.Name)))) + m, err = rSub.NextMsg(time.Second) + require_NoError(t, err) + require_Contains(t, string(m.Data), respContentAcc...) + + // Test include unused for statz and ping of statz + unusedContent := []string{`"conns":0,`, `"total_conns":0`, `"slow_consumers":0`, + fmt.Sprintf(`"acc":"%s"`, unusedAcc.Name)} + + resp, err = ncSys.Request(statz(unusedAcc.Name), + []byte(fmt.Sprintf(`{"accounts":["%s"], "include_unused":true}`, unusedAcc.Name)), + time.Second) + require_NoError(t, err) + require_Contains(t, string(resp.Data), unusedContent...) + + require_NoError(t, ncSys.PublishRequest(pStatz, rIb, + []byte(fmt.Sprintf(`{"accounts":["%s"], "include_unused":true}`, unusedAcc.Name)))) + resp, err = rSub.NextMsg(time.Second) + require_NoError(t, err) + require_Contains(t, string(resp.Data), unusedContent...) + + require_NoError(t, ncSys.PublishRequest(pStatz, rIb, []byte(fmt.Sprintf(`{"accounts":["%s"]}`, unusedAcc.Name)))) + _, err = rSub.NextMsg(200 * time.Millisecond) + require_Error(t, err) + + // Test ping from within account + ib := nc.NewRespInbox() + rSub, err = nc.SubscribeSync(ib) + require_NoError(t, err) + require_NoError(t, nc.PublishRequest(pStatz, ib, nil)) + resp, err = rSub.NextMsg(time.Second) + require_NoError(t, err) + require_Contains(t, string(resp.Data), respContentAcc...) + _, err = rSub.NextMsg(200 * time.Millisecond) + require_Error(t, err) } func TestAccountReqInfo(t *testing.T) { @@ -1312,7 +1357,7 @@ func TestAccountReqInfo(t *testing.T) { ajwt1, _ := nac1.Encode(oKp) addAccountToMemResolver(s, pub1, ajwt1) s.LookupAccount(pub1) - info1 := fmt.Sprintf(accReqSubj, pub1, "INFO") + info1 := fmt.Sprintf(accDirectReqSubj, pub1, "INFO") // Now add an account with service imports. akp2, _ := nkeys.CreateAccount() pub2, _ := akp2.PublicKey() @@ -1321,7 +1366,7 @@ func TestAccountReqInfo(t *testing.T) { ajwt2, _ := nac2.Encode(oKp) addAccountToMemResolver(s, pub2, ajwt2) s.LookupAccount(pub2) - info2 := fmt.Sprintf(accReqSubj, pub2, "INFO") + info2 := fmt.Sprintf(accDirectReqSubj, pub2, "INFO") // Create system account connection to query url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) ncSys, err := nats.Connect(url, createUserCreds(t, s, sakp)) @@ -1369,7 +1414,7 @@ func TestAccountReqInfo(t *testing.T) { t.Fatalf("Unmarshalling failed: %v", err) } else if len(info.Exports) != 1 { t.Fatalf("Unexpected value: %v", info.Exports) - } else if len(info.Imports) != 2 { + } else if len(info.Imports) != 3 { t.Fatalf("Unexpected value: %+v", info.Imports) } else if info.Exports[0].Subject != "req.*" { t.Fatalf("Unexpected value: %v", info.Exports) @@ -1377,7 +1422,7 @@ func TestAccountReqInfo(t *testing.T) { t.Fatalf("Unexpected value: %v", info.Exports) } else if info.Exports[0].ResponseType != jwt.ResponseTypeSingleton { t.Fatalf("Unexpected value: %v", info.Exports) - } else if info.SubCnt != 2 { + } else if info.SubCnt != 3 { t.Fatalf("Unexpected value: %v", info.SubCnt) } else { checkCommon(&info, &srv, pub1, ajwt1) @@ -1390,7 +1435,7 @@ func TestAccountReqInfo(t *testing.T) { t.Fatalf("Unmarshalling failed: %v", err) } else if len(info.Exports) != 0 { t.Fatalf("Unexpected value: %v", info.Exports) - } else if len(info.Imports) != 3 { + } else if len(info.Imports) != 4 { t.Fatalf("Unexpected value: %+v", info.Imports) } // Here we need to find our import @@ -1408,7 +1453,7 @@ func TestAccountReqInfo(t *testing.T) { t.Fatalf("Unexpected value: %+v", si) } else if si.Account != pub1 { t.Fatalf("Unexpected value: %+v", si) - } else if info.SubCnt != 3 { + } else if info.SubCnt != 4 { t.Fatalf("Unexpected value: %+v", si) } else { checkCommon(&info, &srv, pub2, ajwt2) @@ -1611,7 +1656,7 @@ func TestSystemAccountWithGateways(t *testing.T) { // If this tests fails with wrong number after 10 seconds we may have // added a new inititial subscription for the eventing system. - checkExpectedSubs(t, 40, sa) + checkExpectedSubs(t, 45, sa) // Create a client on B and see if we receive the event urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) @@ -2123,6 +2168,8 @@ func TestServerEventsPingMonitorz(t *testing.T) { []string{"now", "routes"}}, {"JSZ", nil, &JSzOptions{}, []string{"now", "disabled"}}, + + {"HEALTHZ", nil, &JSzOptions{}, []string{"status"}}, } for i, test := range tests { diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 04ddf18a..560cacf1 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3948,7 +3948,7 @@ func TestJetStreamClusterPeerEvacuationAndStreamReassignment(t *testing.T) { }) } // Now wait until the stream is now current. - checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 50*time.Second, 100*time.Millisecond, func() error { si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second)) if err != nil { return fmt.Errorf("could not fetch stream info: %v", err) diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index df29c7cd..4b2abe7f 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -516,7 +516,7 @@ func TestJetStreamSuperClusterConnectionCount(t *testing.T) { sysNc := natsConnect(t, sc.randomServer().ClientURL(), nats.UserInfo("admin", "s3cr3t!")) defer sysNc.Close() - _, err := sysNc.Request(fmt.Sprintf(accReqSubj, "ONE", "CONNS"), nil, 100*time.Millisecond) + _, err := sysNc.Request(fmt.Sprintf(accDirectReqSubj, "ONE", "CONNS"), nil, 100*time.Millisecond) // this is a timeout as the server only responds when it has connections.... // not convinced this should be that way, but also not the issue to investigate. require_True(t, err == nats.ErrTimeout) @@ -561,7 +561,7 @@ func TestJetStreamSuperClusterConnectionCount(t *testing.T) { // There should be no active NATS CLIENT connections, but we still need // to wait a little bit... checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { - _, err := sysNc.Request(fmt.Sprintf(accReqSubj, "ONE", "CONNS"), nil, 100*time.Millisecond) + _, err := sysNc.Request(fmt.Sprintf(accDirectReqSubj, "ONE", "CONNS"), nil, 100*time.Millisecond) if err != nats.ErrTimeout { return fmt.Errorf("Expected timeout, got %v", err) } @@ -2087,6 +2087,7 @@ func TestJetStreamSuperClusterMovingStreamAndMoveBack(t *testing.T) { checkMove := func(cluster string) { t.Helper() + sc.waitOnStreamLeader("$G", "TEST") checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { si, err := js.StreamInfo("TEST") if err != nil { diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 19c62e50..e14d8fab 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -1306,7 +1306,7 @@ func TestLeafNodePermissions(t *testing.T) { // Create a sub on ">" on LN1 subAll := natsSubSync(t, nc1, ">") // this should be registered in LN2 (there is 1 sub for LN1 $LDS subject) + SYS IMPORTS - checkSubs(ln2.globalAccount(), 8) + checkSubs(ln2.globalAccount(), 10) // Check deny export clause from messages published from LN2 for _, test := range []struct { @@ -1333,7 +1333,7 @@ func TestLeafNodePermissions(t *testing.T) { subAll.Unsubscribe() // Goes down by 1. - checkSubs(ln2.globalAccount(), 7) + checkSubs(ln2.globalAccount(), 9) // We used to make sure we would not do subscriptions however that // was incorrect. We need to check publishes, not the subscriptions. @@ -1358,7 +1358,7 @@ func TestLeafNodePermissions(t *testing.T) { } { t.Run(test.name, func(t *testing.T) { sub := natsSubSync(t, nc2, test.subSubject) - checkSubs(ln2.globalAccount(), 8) + checkSubs(ln2.globalAccount(), 10) if !test.ok { nc1.Publish(test.pubSubject, []byte("msg")) @@ -1366,12 +1366,12 @@ func TestLeafNodePermissions(t *testing.T) { t.Fatalf("Did not expect to get the message") } } else { - checkSubs(ln1.globalAccount(), 7) + checkSubs(ln1.globalAccount(), 9) nc1.Publish(test.pubSubject, []byte("msg")) natsNexMsg(t, sub, time.Second) } sub.Unsubscribe() - checkSubs(ln1.globalAccount(), 6) + checkSubs(ln1.globalAccount(), 8) }) } } @@ -1492,8 +1492,8 @@ func TestLeafNodeExportPermissionsNotForSpecialSubs(t *testing.T) { // The deny is totally restrictive, but make sure that we still accept the $LDS, $GR and _GR_ go from LN1. checkFor(t, time.Second, 15*time.Millisecond, func() error { // We should have registered the 3 subs from the accepting leafnode. - if n := ln2.globalAccount().TotalSubs(); n != 7 { - return fmt.Errorf("Expected %d subs, got %v", 7, n) + if n := ln2.globalAccount().TotalSubs(); n != 8 { + return fmt.Errorf("Expected %d subs, got %v", 8, n) } return nil }) @@ -3395,7 +3395,7 @@ func TestLeafNodeRouteSubWithOrigin(t *testing.T) { r1.Shutdown() checkFor(t, time.Second, 15*time.Millisecond, func() error { acc := l2.GlobalAccount() - if n := acc.TotalSubs(); n != 3 { + if n := acc.TotalSubs(); n != 4 { return fmt.Errorf("Account %q should have 3 subs, got %v", acc.GetName(), n) } return nil diff --git a/server/monitor.go b/server/monitor.go index 77f7d765..8c0b0b58 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -92,10 +92,6 @@ type ConnzOptions struct { // Filter by subject interest FilterSubject string `json:"filter_subject"` - - // Private indication that this request is from an account and not a system account. - // Used to not leak system level information to the account. - isAccountReq bool } // ConnState is for filtering states of connections. We will only have two, open and closed. @@ -256,9 +252,6 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { var clist map[uint64]*client - // If this is an account scoped request from a no $SYS account. - isAccReq := acc != _EMPTY_ && opts.isAccountReq - if acc != _EMPTY_ { var err error a, err = s.lookupAccount(acc) @@ -300,7 +293,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { } // We may need to filter these connections. - if isAccReq && len(closedClients) > 0 { + if acc != _EMPTY_ && len(closedClients) > 0 { var ccc []*closedClient for _, cc := range closedClients { if cc.acc == acc { @@ -1348,6 +1341,7 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { JetStream Connections Accounts + Account stats Subscriptions Routes LeafNodes @@ -1362,6 +1356,7 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { s.basePath(JszPath), s.basePath(ConnzPath), s.basePath(AccountzPath), + s.basePath(AccountStatzPath), s.basePath(SubszPath), s.basePath(RoutezPath), s.basePath(LeafzPath), @@ -2156,6 +2151,78 @@ func (s *Server) HandleLeafz(w http.ResponseWriter, r *http.Request) { ResponseHandler(w, r, b) } +// Leafz represents detailed information on Leafnodes. +type AccountStatz struct { + ID string `json:"server_id"` + Now time.Time `json:"now"` + Accounts []*AccountStat `json:"account_statz"` +} + +// LeafzOptions are options passed to Leafz +type AccountStatzOptions struct { + Accounts []string `json:"accounts"` + IncludeUnused bool `json:"include_unused"` +} + +// Leafz returns a AccountStatz structure containing summary information about accounts. +func (s *Server) AccountStatz(opts *AccountStatzOptions) (*AccountStatz, error) { + stz := &AccountStatz{ + ID: s.ID(), + Now: time.Now().UTC(), + Accounts: []*AccountStat{}, + } + if opts == nil || len(opts.Accounts) == 0 { + s.accounts.Range(func(key, a interface{}) bool { + acc := a.(*Account) + acc.mu.RLock() + if opts.IncludeUnused || acc.numLocalConnections() != 0 { + stz.Accounts = append(stz.Accounts, acc.statz()) + } + acc.mu.RUnlock() + return true + }) + } else { + for _, a := range opts.Accounts { + if acc, ok := s.accounts.Load(a); ok { + acc := acc.(*Account) + acc.mu.RLock() + if opts.IncludeUnused || acc.numLocalConnections() != 0 { + stz.Accounts = append(stz.Accounts, acc.statz()) + } + acc.mu.RUnlock() + } + } + } + return stz, nil +} + +// HandleAccountStatz process HTTP requests for statz information of all accounts. +func (s *Server) HandleAccountStatz(w http.ResponseWriter, r *http.Request) { + s.mu.Lock() + s.httpReqStats[AccountStatzPath]++ + s.mu.Unlock() + + unused, err := decodeBool(w, r, "unused") + if err != nil { + return + } + + l, err := s.AccountStatz(&AccountStatzOptions{IncludeUnused: unused}) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + b, err := json.MarshalIndent(l, "", " ") + if err != nil { + s.Errorf("Error marshaling response to %s request: %v", AccountStatzPath, err) + return + } + + // Handle response + ResponseHandler(w, r, b) +} + // ResponseHandler handles responses for monitoring routes func ResponseHandler(w http.ResponseWriter, r *http.Request, data []byte) { // Get callback from request diff --git a/server/monitor_test.go b/server/monitor_test.go index 9a0db9ed..71f10951 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -3854,7 +3854,7 @@ func TestMonitorLeafz(t *testing.T) { t.Fatalf("RTT not tracked?") } // LDS should be only one. - if ln.NumSubs != 3 || len(ln.Subs) != 3 { + if ln.NumSubs != 4 || len(ln.Subs) != 4 { t.Fatalf("Expected 3 subs, got %v (%v)", ln.NumSubs, ln.Subs) } } @@ -3864,28 +3864,26 @@ func TestMonitorLeafz(t *testing.T) { func TestMonitorAccountz(t *testing.T) { s := RunServer(DefaultMonitorOptions()) defer s.Shutdown() - body := string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d/accountz", s.MonitorAddr().Port))) - if !strings.Contains(body, `$G`) { - t.Fatalf("Body missing value. Contains: %s", body) - } else if !strings.Contains(body, `$SYS`) { - t.Fatalf("Body missing value. Contains: %s", body) - } else if !strings.Contains(body, `"accounts": [`) { - t.Fatalf("Body missing value. Contains: %s", body) - } else if !strings.Contains(body, `"system_account": "$SYS"`) { - t.Fatalf("Body missing value. Contains: %s", body) - } - body = string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d/accountz?acc=$SYS", s.MonitorAddr().Port))) - if !strings.Contains(body, `"account_detail": {`) { - t.Fatalf("Body missing value. Contains: %s", body) - } else if !strings.Contains(body, `"account_name": "$SYS",`) { - t.Fatalf("Body missing value. Contains: %s", body) - } else if !strings.Contains(body, `"subscriptions": 36,`) { - t.Fatalf("Body missing value. Contains: %s", body) - } else if !strings.Contains(body, `"is_system": true,`) { - t.Fatalf("Body missing value. Contains: %s", body) - } else if !strings.Contains(body, `"system_account": "$SYS"`) { - t.Fatalf("Body missing value. Contains: %s", body) - } + body := string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d%s", s.MonitorAddr().Port, AccountzPath))) + require_Contains(t, body, `$G`) + require_Contains(t, body, `$SYS`) + require_Contains(t, body, `"accounts": [`) + require_Contains(t, body, `"system_account": "$SYS"`) + + body = string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d%s?acc=$SYS", s.MonitorAddr().Port, AccountzPath))) + require_Contains(t, body, `"account_detail": {`) + require_Contains(t, body, `"account_name": "$SYS",`) + require_Contains(t, body, `"subscriptions": 40,`) + require_Contains(t, body, `"is_system": true,`) + require_Contains(t, body, `"system_account": "$SYS"`) + + body = string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d%s?unused=1", s.MonitorAddr().Port, AccountStatzPath))) + require_Contains(t, body, `"acc": "$G"`) + require_Contains(t, body, `"acc": "$SYS"`) + require_Contains(t, body, `"sent": {`) + require_Contains(t, body, `"received": {`) + require_Contains(t, body, `"total_conns": 0,`) + require_Contains(t, body, `"leafnodes": 0,`) } func TestMonitorAuthorizedUsers(t *testing.T) { diff --git a/server/routes_test.go b/server/routes_test.go index e68dcaa4..69205ae7 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -1095,7 +1095,7 @@ func TestRouteNoCrashOnAddingSubToRoute(t *testing.T) { // Make sure all subs are registered in s. checkFor(t, time.Second, 15*time.Millisecond, func() error { - if ts := s.globalAccount().TotalSubs() - 3; ts != int(numRoutes) { + if ts := s.globalAccount().TotalSubs() - 4; ts != int(numRoutes) { return fmt.Errorf("Not all %d routed subs were registered: %d", numRoutes, ts) } return nil diff --git a/server/server.go b/server/server.go index 6901701e..3e9256a2 100644 --- a/server/server.go +++ b/server/server.go @@ -2267,18 +2267,19 @@ func (s *Server) StartMonitoring() error { // HTTP endpoints const ( - RootPath = "/" - VarzPath = "/varz" - ConnzPath = "/connz" - RoutezPath = "/routez" - GatewayzPath = "/gatewayz" - LeafzPath = "/leafz" - SubszPath = "/subsz" - StackszPath = "/stacksz" - AccountzPath = "/accountz" - JszPath = "/jsz" - HealthzPath = "/healthz" - IPQueuesPath = "/ipqueuesz" + RootPath = "/" + VarzPath = "/varz" + ConnzPath = "/connz" + RoutezPath = "/routez" + GatewayzPath = "/gatewayz" + LeafzPath = "/leafz" + SubszPath = "/subsz" + StackszPath = "/stacksz" + AccountzPath = "/accountz" + AccountStatzPath = "/accstatz" + JszPath = "/jsz" + HealthzPath = "/healthz" + IPQueuesPath = "/ipqueuesz" ) func (s *Server) basePath(p string) string { @@ -2381,6 +2382,8 @@ func (s *Server) startMonitoring(secure bool) error { mux.HandleFunc(s.basePath(StackszPath), s.HandleStacksz) // Accountz mux.HandleFunc(s.basePath(AccountzPath), s.HandleAccountz) + // Accstatz + mux.HandleFunc(s.basePath(AccountStatzPath), s.HandleAccountStatz) // Jsz mux.HandleFunc(s.basePath(JszPath), s.HandleJsz) // Healthz diff --git a/test/leafnode_test.go b/test/leafnode_test.go index 7e49715b..b0b1c8b5 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -440,7 +440,7 @@ func TestLeafNodeAndRoutes(t *testing.T) { lc := createLeafConn(t, optsA.LeafNode.Host, optsA.LeafNode.Port) defer lc.Close() - leafSend, leafExpect := setupLeaf(t, lc, 4) + leafSend, leafExpect := setupLeaf(t, lc, 5) leafSend("PING\r\n") leafExpect(pongRe) @@ -834,7 +834,7 @@ func TestLeafNodeGatewaySendsSystemEvent(t *testing.T) { defer lc.Close() // This is for our global responses since we are setting up GWs above. - leafSend, leafExpect := setupLeaf(t, lc, 6) + leafSend, leafExpect := setupLeaf(t, lc, 7) leafSend("PING\r\n") leafExpect(pongRe) @@ -878,13 +878,13 @@ func TestLeafNodeGatewayInterestPropagation(t *testing.T) { buf := leafExpect(infoRe) buf = infoRe.ReplaceAll(buf, []byte(nil)) foundFoo := false - for count := 0; count != 8; { + for count := 0; count != 9; { // skip first time if we still have data (buf from above may already have some left) if count != 0 || len(buf) == 0 { buf = append(buf, leafExpect(anyRe)...) } count += len(lsubRe.FindAllSubmatch(buf, -1)) - if count > 8 { + if count > 9 { t.Fatalf("Expected %v matches, got %v (buf=%s)", 8, count, buf) } if strings.Contains(string(buf), "foo") { @@ -937,7 +937,7 @@ func TestLeafNodeWithRouteAndGateway(t *testing.T) { defer lc.Close() // This is for our global responses since we are setting up GWs above. - leafSend, leafExpect := setupLeaf(t, lc, 6) + leafSend, leafExpect := setupLeaf(t, lc, 7) leafSend("PING\r\n") leafExpect(pongRe) @@ -996,7 +996,7 @@ func TestLeafNodeWithGatewaysAndStaggeredStart(t *testing.T) { lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port) defer lc.Close() - leafSend, leafExpect := setupLeaf(t, lc, 6) + leafSend, leafExpect := setupLeaf(t, lc, 7) leafSend("PING\r\n") leafExpect(pongRe) @@ -1036,7 +1036,7 @@ func TestLeafNodeWithGatewaysServerRestart(t *testing.T) { lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port) defer lc.Close() - leafSend, leafExpect := setupLeaf(t, lc, 6) + leafSend, leafExpect := setupLeaf(t, lc, 7) leafSend("PING\r\n") leafExpect(pongRe) @@ -1070,7 +1070,7 @@ func TestLeafNodeWithGatewaysServerRestart(t *testing.T) { lc = createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port) defer lc.Close() - _, leafExpect = setupLeaf(t, lc, 6) + _, leafExpect = setupLeaf(t, lc, 7) // Now wait on GW solicit to fire time.Sleep(500 * time.Millisecond) @@ -1530,7 +1530,7 @@ func TestLeafNodeMultipleAccounts(t *testing.T) { defer s.Shutdown() // Setup the two accounts for this server. - _, akp1 := createAccount(t, s) + a, akp1 := createAccount(t, s) kp1, _ := nkeys.CreateUser() pub1, _ := kp1.PublicKey() nuc1 := jwt.NewUserClaims(pub1) @@ -1575,12 +1575,7 @@ func TestLeafNodeMultipleAccounts(t *testing.T) { lsub, _ := ncl.SubscribeSync("foo.test") // Wait for the subs to propagate. LDS + foo.test - checkFor(t, 2*time.Second, 10*time.Millisecond, func() error { - if subs := s.NumSubscriptions(); subs < 4 { - return fmt.Errorf("Number of subs is %d", subs) - } - return nil - }) + checkSubInterest(t, s, a.GetName(), "foo.test", 2*time.Second) // Now send from nc1 with account 1, should be received by our leafnode subscriber. nc1.Publish("foo.test", nil) @@ -1909,12 +1904,7 @@ func TestLeafNodeExportsImports(t *testing.T) { lsub, _ := ncl.SubscribeSync("import.foo.stream") // Wait for all subs to propagate. - checkFor(t, time.Second, 10*time.Millisecond, func() error { - if subs := s.NumSubscriptions(); subs < 5 { - return fmt.Errorf("Number of subs is %d", subs) - } - return nil - }) + checkSubInterest(t, s, acc1.GetName(), "import.foo.stream", time.Second) // Pub to other account with export on original subject. nc2.Publish("foo.stream", nil) @@ -2074,7 +2064,7 @@ func TestLeafNodeExportImportComplexSetup(t *testing.T) { // Wait for the sub to propagate to s2. LDS + subject above. checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { - if acc1.RoutedSubs() != 4 { + if acc1.RoutedSubs() != 5 { return fmt.Errorf("Still no routed subscription: %d", acc1.RoutedSubs()) } return nil @@ -2660,7 +2650,7 @@ func TestLeafNodeSwitchGatewayToInterestModeOnly(t *testing.T) { defer lc.Close() // This is for our global responses since we are setting up GWs above. - leafSend, leafExpect := setupLeaf(t, lc, 6) + leafSend, leafExpect := setupLeaf(t, lc, 7) leafSend("PING\r\n") leafExpect(pongRe) }