diff --git a/server/events.go b/server/events.go index 55dfaefd..441434d6 100644 --- a/server/events.go +++ b/server/events.go @@ -42,7 +42,8 @@ const ( connectEventSubj = "$SYS.ACCOUNT.%s.CONNECT" disconnectEventSubj = "$SYS.ACCOUNT.%s.DISCONNECT" - accReqSubj = "$SYS.REQ.ACCOUNT.%s.%s" + accDirectReqSubj = "$SYS.REQ.ACCOUNT.%s.%s" + accPingReqSubj = "$SYS.REQ.ACCOUNT.PING.%s" // atm. only used for STATZ and CONNZ import from system account // kept for backward compatibility when using http resolver // this overlaps with the names for events but you'd have to have the operator private key in order to succeed. accUpdateEventSubjOld = "$SYS.ACCOUNT.%s.CLAIMS.UPDATE" @@ -59,7 +60,6 @@ const ( leafNodeConnectEventSubj = "$SYS.ACCOUNT.%s.LEAFNODE.CONNECT" // for internal use only remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s" inboxRespSubj = "$SYS._INBOX.%s.%s" - accConnzReqSubj = "$SYS.REQ.ACCOUNT.PING.CONNZ" // FIXME(dlc) - Should account scope, even with wc for now, but later on // we can then shard as needed. @@ -138,14 +138,19 @@ const DisconnectEventMsgType = "io.nats.server.advisory.v1.client_disconnect" // updates in the absence of any changes. type AccountNumConns struct { TypedEvent - Server ServerInfo `json:"server"` - Account string `json:"acc"` - Conns int `json:"conns"` - LeafNodes int `json:"leafnodes"` - TotalConns int `json:"total_conns"` - Sent DataStats `json:"sent"` - Received DataStats `json:"received"` - SlowConsumers int64 `json:"slow_consumers"` + Server ServerInfo `json:"server"` + AccountStat +} + +// AccountStat contains the data common between AccountNumConns and AccountStatz +type AccountStat struct { + Account string `json:"acc"` + Conns int `json:"conns"` + LeafNodes int `json:"leafnodes"` + TotalConns int `json:"total_conns"` + Sent DataStats `json:"sent"` + Received DataStats `json:"received"` + SlowConsumers int64 `json:"slow_consumers"` } const AccountNumConnsMsgType = "io.nats.server.advisory.v1.account_connections" @@ -869,6 +874,10 @@ func (s *Server) initEventTracking() { optz := &JszEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Jsz(&optz.JSzOptions) }) }, + "HEALTHZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + optz := &EventFilterOptions{} + s.zReq(c, reply, msg, optz, optz, func() (interface{}, error) { return s.healthz(), nil }) + }, } for name, req := range monSrvc { subject = fmt.Sprintf(serverDirectReqSubj, s.info.ID, name) @@ -880,18 +889,26 @@ func (s *Server) initEventTracking() { s.Errorf("Error setting up internal tracking: %v", err) } } - extractAccount := func(subject string) (string, error) { + extractAccount := func(c *client, subject string, msg []byte) (string, error) { if tk := strings.Split(subject, tsep); len(tk) != accReqTokens { return _EMPTY_, fmt.Errorf("subject %q is malformed", subject) } else { - return tk[accReqAccIndex], nil + acc := tk[accReqAccIndex] + if ci, _, _, _, err := c.srv.getRequestInfo(c, msg); err == nil && ci.Account != _EMPTY_ { + // Make sure the accounts match. + if ci.Account != acc { + // Do not leak too much here. + return _EMPTY_, fmt.Errorf("bad request") + } + } + return acc, nil } } monAccSrvc := map[string]msgHandler{ "SUBSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { optz := &SubszEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { - if acc, err := extractAccount(subject); err != nil { + if acc, err := extractAccount(c, subject, msg); err != nil { return nil, err } else { optz.SubszOptions.Subscriptions = true @@ -903,17 +920,9 @@ func (s *Server) initEventTracking() { "CONNZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { optz := &ConnzEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { - if acc, err := extractAccount(subject); err != nil { + if acc, err := extractAccount(c, subject, msg); err != nil { return nil, err } else { - if ci, _, _, _, err := c.srv.getRequestInfo(c, msg); err == nil && ci.Account != _EMPTY_ { - // Make sure the accounts match. - if ci.Account != acc { - // Do not leak too much here. - return nil, fmt.Errorf("bad request") - } - optz.ConnzOptions.isAccountReq = true - } optz.ConnzOptions.Account = acc return s.Connz(&optz.ConnzOptions) } @@ -922,7 +931,7 @@ func (s *Server) initEventTracking() { "LEAFZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { optz := &LeafzEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { - if acc, err := extractAccount(subject); err != nil { + if acc, err := extractAccount(c, subject, msg); err != nil { return nil, err } else { optz.LeafzOptions.Account = acc @@ -933,7 +942,7 @@ func (s *Server) initEventTracking() { "JSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { optz := &JszEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { - if acc, err := extractAccount(subject); err != nil { + if acc, err := extractAccount(c, subject, msg); err != nil { return nil, err } else { optz.Account = acc @@ -944,21 +953,60 @@ func (s *Server) initEventTracking() { "INFO": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { optz := &AccInfoEventOptions{} s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { - if acc, err := extractAccount(subject); err != nil { + if acc, err := extractAccount(c, subject, msg); err != nil { return nil, err } else { return s.accountInfo(acc) } }) }, + // STATZ is essentially a duplicate of CONNS with an envelope identical to the others. + // For historical reasons CONNS is the odd one out. + // STATZ is also less heavy weight than INFO + "STATZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + optz := &AccountStatzEventOptions{} + s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { + if acc, err := extractAccount(c, subject, msg); err != nil { + return nil, err + } else if acc == "PING" { // Filter PING subject. Happens for server as well. But wildcards are not used + return nil, errSkipZreq + } else { + optz.Accounts = []string{acc} + if stz, err := s.AccountStatz(&optz.AccountStatzOptions); err != nil { + return nil, err + } else if len(stz.Accounts) == 0 && !optz.IncludeUnused { + return nil, errSkipZreq + } else { + return stz, nil + } + } + }) + }, "CONNS": s.connsRequest, } for name, req := range monAccSrvc { - if _, err := s.sysSubscribe(fmt.Sprintf(accReqSubj, "*", name), req); err != nil { + if _, err := s.sysSubscribe(fmt.Sprintf(accDirectReqSubj, "*", name), req); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } } + // For now only the STATZ subject has an account specific ping equivalent. + if _, err := s.sysSubscribe(fmt.Sprintf(accPingReqSubj, "STATZ"), + func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + optz := &AccountStatzEventOptions{} + s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { + if stz, err := s.AccountStatz(&optz.AccountStatzOptions); err != nil { + return nil, err + } else if len(stz.Accounts) == 0 && !optz.IncludeUnused { + return nil, errSkipZreq + } else { + return stz, nil + } + }) + }); err != nil { + s.Errorf("Error setting up internal tracking: %v", err) + } + // Listen for updates when leaf nodes connect for a given account. This will // force any gateway connections to move to `modeInterestOnly` subject = fmt.Sprintf(leafNodeConnectEventSubj, "*") @@ -1005,10 +1053,14 @@ func (s *Server) addSystemAccountExports(sacc *Account) { if !s.EventsEnabled() { return } - accConnzSubj := fmt.Sprintf(accReqSubj, "*", "CONNZ") + accConnzSubj := fmt.Sprintf(accDirectReqSubj, "*", "CONNZ") if err := sacc.AddServiceExportWithResponse(accConnzSubj, Streamed, nil); err != nil { s.Errorf("Error adding system service export for %q: %v", accConnzSubj, err) } + accStatzSubj := fmt.Sprintf(accDirectReqSubj, "*", "STATZ") + if err := sacc.AddServiceExportWithResponse(accStatzSubj, Streamed, nil); err != nil { + s.Errorf("Error adding system service export for %q: %v", accStatzSubj, err) + } // Register any accounts that existed prior. s.registerSystemImportsForExisting() @@ -1369,6 +1421,12 @@ type AccountzEventOptions struct { EventFilterOptions } +// In the context of system events, AccountzEventOptions are options passed to Accountz +type AccountStatzEventOptions struct { + AccountStatzOptions + EventFilterOptions +} + // In the context of system events, JszEventOptions are options passed to Jsz type JszEventOptions struct { JSzOptions @@ -1572,21 +1630,21 @@ func (s *Server) registerSystemImports(a *Account) { return } // FIXME(dlc) - make a shared list between sys exports etc. - connzSubj := fmt.Sprintf(serverPingReqSubj, "CONNZ") - mappedSubj := fmt.Sprintf(accReqSubj, a.Name, "CONNZ") + importSrvc := func(subj, mappedSubj string) { + if !a.serviceImportExists(subj) { + if err := a.AddServiceImport(sacc, subj, mappedSubj); err != nil { + s.Errorf("Error setting up system service import %s -> %s for account: %v", + subj, mappedSubj, err) + } + } + } // Add in this to the account in 2 places. // "$SYS.REQ.SERVER.PING.CONNZ" and "$SYS.REQ.ACCOUNT.PING.CONNZ" - if !a.serviceImportExists(connzSubj) { - if err := a.AddServiceImport(sacc, connzSubj, mappedSubj); err != nil { - s.Errorf("Error setting up system service imports for account: %v", err) - } - } - if !a.serviceImportExists(accConnzReqSubj) { - if err := a.AddServiceImport(sacc, accConnzReqSubj, mappedSubj); err != nil { - s.Errorf("Error setting up system service imports for account: %v", err) - } - } + mappedConnzSubj := fmt.Sprintf(accDirectReqSubj, a.Name, "CONNZ") + importSrvc(fmt.Sprintf(accPingReqSubj, "CONNZ"), mappedConnzSubj) + importSrvc(fmt.Sprintf(serverPingReqSubj, "CONNZ"), mappedConnzSubj) + importSrvc(fmt.Sprintf(accPingReqSubj, "STATZ"), fmt.Sprintf(accDirectReqSubj, a.Name, "STATZ")) } // Setup tracking for this account. This allows us to track global account activity. @@ -1600,7 +1658,7 @@ func (s *Server) enableAccountTracking(a *Account) { // May need to ensure we do so only if there is a known interest. // This can get complicated with gateways. - subj := fmt.Sprintf(accReqSubj, a.Name, "CONNS") + subj := fmt.Sprintf(accDirectReqSubj, a.Name, "CONNS") reply := fmt.Sprintf(connsRespSubj, s.info.ID) m := accNumConnsReq{Account: a.Name} s.sendInternalMsg(subj, reply, &m.Server, &m) @@ -1643,29 +1701,18 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) { // Build event with account name and number of local clients and leafnodes. eid := s.nextEventID() a.mu.Lock() - localConns := a.numLocalConnections() - leafConns := a.numLocalLeafNodes() - m := &AccountNumConns{ + stat := a.statz() + m := AccountNumConns{ TypedEvent: TypedEvent{ Type: AccountNumConnsMsgType, ID: eid, Time: time.Now().UTC(), }, - Account: a.Name, - Conns: localConns, - LeafNodes: leafConns, - TotalConns: localConns + leafConns, - Received: DataStats{ - Msgs: atomic.LoadInt64(&a.inMsgs), - Bytes: atomic.LoadInt64(&a.inBytes)}, - Sent: DataStats{ - Msgs: atomic.LoadInt64(&a.outMsgs), - Bytes: atomic.LoadInt64(&a.outBytes)}, - SlowConsumers: atomic.LoadInt64(&a.slowConsumers), + AccountStat: *stat, } // Set timer to fire again unless we are at zero, but only if the account // is not configured for JetStream. - if localConns == 0 && !a.jetStreamConfiguredNoLock() { + if m.TotalConns == 0 && !a.jetStreamConfiguredNoLock() { clearTimer(&a.ctmr) } else { // Check to see if we have an HB running and update. @@ -1682,6 +1729,25 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) { a.mu.Unlock() } +// Lock shoulc be held on entry +func (a *Account) statz() *AccountStat { + localConns := a.numLocalConnections() + leafConns := a.numLocalLeafNodes() + return &AccountStat{ + Account: a.Name, + Conns: localConns, + LeafNodes: leafConns, + TotalConns: localConns + leafConns, + Received: DataStats{ + Msgs: atomic.LoadInt64(&a.inMsgs), + Bytes: atomic.LoadInt64(&a.inBytes)}, + Sent: DataStats{ + Msgs: atomic.LoadInt64(&a.outMsgs), + Bytes: atomic.LoadInt64(&a.outBytes)}, + SlowConsumers: atomic.LoadInt64(&a.slowConsumers), + } +} + // accConnsUpdate is called whenever there is a change to the account's // number of active connections, or during a heartbeat. func (s *Server) accConnsUpdate(a *Account) { diff --git a/server/events_test.go b/server/events_test.go index a5f1321b..5c14f94b 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1239,14 +1239,15 @@ func TestAccountReqMonitoring(t *testing.T) { sacc, sakp := createAccount(s) s.setSystemAccount(sacc) s.EnableJetStream(nil) + unusedAcc, _ := createAccount(s) acc, akp := createAccount(s) - if acc == nil { - t.Fatalf("did not create account") - } acc.EnableJetStream(nil) - subsz := fmt.Sprintf(accReqSubj, acc.Name, "SUBSZ") - connz := fmt.Sprintf(accReqSubj, acc.Name, "CONNZ") - jsz := fmt.Sprintf(accReqSubj, acc.Name, "JSZ") + subsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "SUBSZ") + connz := fmt.Sprintf(accDirectReqSubj, acc.Name, "CONNZ") + jsz := fmt.Sprintf(accDirectReqSubj, acc.Name, "JSZ") + + pStatz := fmt.Sprintf(accPingReqSubj, "STATZ") + statz := func(name string) string { return fmt.Sprintf(accDirectReqSubj, name, "STATZ") } // Create system account connection to query url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) ncSys, err := nats.Connect(url, createUserCreds(t, s, sakp)) @@ -1261,42 +1262,86 @@ func TestAccountReqMonitoring(t *testing.T) { } defer nc.Close() // query SUBSZ for account - if resp, err := ncSys.Request(subsz, nil, time.Second); err != nil { - t.Fatalf("Error on request: %v", err) - } else if !strings.Contains(string(resp.Data), `"num_subscriptions":3,`) { - t.Fatalf("unexpected subs count (expected 3): %v", string(resp.Data)) - } + resp, err := ncSys.Request(subsz, nil, time.Second) + require_NoError(t, err) + require_Contains(t, string(resp.Data), `"num_subscriptions":4,`) // create a subscription - if sub, err := nc.Subscribe("foo", func(msg *nats.Msg) {}); err != nil { - t.Fatalf("error on subscribe %v", err) - } else { - defer sub.Unsubscribe() - } - nc.Flush() + sub, err := nc.Subscribe("foo", func(msg *nats.Msg) {}) + require_NoError(t, err) + defer sub.Unsubscribe() + + require_NoError(t, nc.Flush()) // query SUBSZ for account - if resp, err := ncSys.Request(subsz, nil, time.Second); err != nil { - t.Fatalf("Error on request: %v", err) - } else if !strings.Contains(string(resp.Data), `"num_subscriptions":4,`) { - t.Fatalf("unexpected subs count (expected 4): %v", string(resp.Data)) - } else if !strings.Contains(string(resp.Data), `"subject":"foo"`) { - t.Fatalf("expected subscription foo: %v", string(resp.Data)) - } + resp, err = ncSys.Request(subsz, nil, time.Second) + require_NoError(t, err) + require_Contains(t, string(resp.Data), `"num_subscriptions":5,`, `"subject":"foo"`) // query connections for account - if resp, err := ncSys.Request(connz, nil, time.Second); err != nil { - t.Fatalf("Error on request: %v", err) - } else if !strings.Contains(string(resp.Data), `"num_connections":1,`) { - t.Fatalf("unexpected subs count (expected 1): %v", string(resp.Data)) - } else if !strings.Contains(string(resp.Data), `"total":1,`) { - t.Fatalf("unexpected subs count (expected 1): %v", string(resp.Data)) - } + resp, err = ncSys.Request(connz, nil, time.Second) + require_NoError(t, err) + require_Contains(t, string(resp.Data), `"num_connections":1,`, `"total":1,`) // query connections for js account - if resp, err := ncSys.Request(jsz, nil, time.Second); err != nil { - t.Fatalf("Error on request: %v", err) - } else if !strings.Contains(string(resp.Data), `"memory":0,`) { - t.Fatalf("jetstream should be enabled but empty: %v", string(resp.Data)) - } else if !strings.Contains(string(resp.Data), `"storage":0,`) { - t.Fatalf("jetstream should be enabled but empty: %v", string(resp.Data)) - } + resp, err = ncSys.Request(jsz, nil, time.Second) + require_NoError(t, err) + require_Contains(t, string(resp.Data), `"memory":0,`, `"storage":0,`) + // query statz/conns for account + resp, err = ncSys.Request(statz(acc.Name), nil, time.Second) + require_NoError(t, err) + respContentAcc := []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"sent":{"msgs":0,"bytes":0}`, + `"received":{"msgs":0,"bytes":0}`, fmt.Sprintf(`"acc":"%s"`, acc.Name)} + require_Contains(t, string(resp.Data), respContentAcc...) + + rIb := ncSys.NewRespInbox() + rSub, err := ncSys.SubscribeSync(rIb) + require_NoError(t, err) + require_NoError(t, ncSys.PublishRequest(pStatz, rIb, nil)) + minRespContentForBothAcc := []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"acc":"`} + resp, err = rSub.NextMsg(time.Second) + require_NoError(t, err) + require_Contains(t, string(resp.Data), minRespContentForBothAcc...) + // expect one entry per account + require_Contains(t, string(resp.Data), fmt.Sprintf(`"acc":"%s"`, acc.Name), fmt.Sprintf(`"acc":"%s"`, sacc.Name)) + + // Test ping with filter by account name + require_NoError(t, ncSys.PublishRequest(pStatz, rIb, []byte(fmt.Sprintf(`{"accounts":["%s"]}`, sacc.Name)))) + m, err := rSub.NextMsg(time.Second) + require_NoError(t, err) + require_Contains(t, string(m.Data), minRespContentForBothAcc...) + + require_NoError(t, ncSys.PublishRequest(pStatz, rIb, []byte(fmt.Sprintf(`{"accounts":["%s"]}`, acc.Name)))) + m, err = rSub.NextMsg(time.Second) + require_NoError(t, err) + require_Contains(t, string(m.Data), respContentAcc...) + + // Test include unused for statz and ping of statz + unusedContent := []string{`"conns":0,`, `"total_conns":0`, `"slow_consumers":0`, + fmt.Sprintf(`"acc":"%s"`, unusedAcc.Name)} + + resp, err = ncSys.Request(statz(unusedAcc.Name), + []byte(fmt.Sprintf(`{"accounts":["%s"], "include_unused":true}`, unusedAcc.Name)), + time.Second) + require_NoError(t, err) + require_Contains(t, string(resp.Data), unusedContent...) + + require_NoError(t, ncSys.PublishRequest(pStatz, rIb, + []byte(fmt.Sprintf(`{"accounts":["%s"], "include_unused":true}`, unusedAcc.Name)))) + resp, err = rSub.NextMsg(time.Second) + require_NoError(t, err) + require_Contains(t, string(resp.Data), unusedContent...) + + require_NoError(t, ncSys.PublishRequest(pStatz, rIb, []byte(fmt.Sprintf(`{"accounts":["%s"]}`, unusedAcc.Name)))) + _, err = rSub.NextMsg(200 * time.Millisecond) + require_Error(t, err) + + // Test ping from within account + ib := nc.NewRespInbox() + rSub, err = nc.SubscribeSync(ib) + require_NoError(t, err) + require_NoError(t, nc.PublishRequest(pStatz, ib, nil)) + resp, err = rSub.NextMsg(time.Second) + require_NoError(t, err) + require_Contains(t, string(resp.Data), respContentAcc...) + _, err = rSub.NextMsg(200 * time.Millisecond) + require_Error(t, err) } func TestAccountReqInfo(t *testing.T) { @@ -1312,7 +1357,7 @@ func TestAccountReqInfo(t *testing.T) { ajwt1, _ := nac1.Encode(oKp) addAccountToMemResolver(s, pub1, ajwt1) s.LookupAccount(pub1) - info1 := fmt.Sprintf(accReqSubj, pub1, "INFO") + info1 := fmt.Sprintf(accDirectReqSubj, pub1, "INFO") // Now add an account with service imports. akp2, _ := nkeys.CreateAccount() pub2, _ := akp2.PublicKey() @@ -1321,7 +1366,7 @@ func TestAccountReqInfo(t *testing.T) { ajwt2, _ := nac2.Encode(oKp) addAccountToMemResolver(s, pub2, ajwt2) s.LookupAccount(pub2) - info2 := fmt.Sprintf(accReqSubj, pub2, "INFO") + info2 := fmt.Sprintf(accDirectReqSubj, pub2, "INFO") // Create system account connection to query url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) ncSys, err := nats.Connect(url, createUserCreds(t, s, sakp)) @@ -1369,7 +1414,7 @@ func TestAccountReqInfo(t *testing.T) { t.Fatalf("Unmarshalling failed: %v", err) } else if len(info.Exports) != 1 { t.Fatalf("Unexpected value: %v", info.Exports) - } else if len(info.Imports) != 2 { + } else if len(info.Imports) != 3 { t.Fatalf("Unexpected value: %+v", info.Imports) } else if info.Exports[0].Subject != "req.*" { t.Fatalf("Unexpected value: %v", info.Exports) @@ -1377,7 +1422,7 @@ func TestAccountReqInfo(t *testing.T) { t.Fatalf("Unexpected value: %v", info.Exports) } else if info.Exports[0].ResponseType != jwt.ResponseTypeSingleton { t.Fatalf("Unexpected value: %v", info.Exports) - } else if info.SubCnt != 2 { + } else if info.SubCnt != 3 { t.Fatalf("Unexpected value: %v", info.SubCnt) } else { checkCommon(&info, &srv, pub1, ajwt1) @@ -1390,7 +1435,7 @@ func TestAccountReqInfo(t *testing.T) { t.Fatalf("Unmarshalling failed: %v", err) } else if len(info.Exports) != 0 { t.Fatalf("Unexpected value: %v", info.Exports) - } else if len(info.Imports) != 3 { + } else if len(info.Imports) != 4 { t.Fatalf("Unexpected value: %+v", info.Imports) } // Here we need to find our import @@ -1408,7 +1453,7 @@ func TestAccountReqInfo(t *testing.T) { t.Fatalf("Unexpected value: %+v", si) } else if si.Account != pub1 { t.Fatalf("Unexpected value: %+v", si) - } else if info.SubCnt != 3 { + } else if info.SubCnt != 4 { t.Fatalf("Unexpected value: %+v", si) } else { checkCommon(&info, &srv, pub2, ajwt2) @@ -1611,7 +1656,7 @@ func TestSystemAccountWithGateways(t *testing.T) { // If this tests fails with wrong number after 10 seconds we may have // added a new inititial subscription for the eventing system. - checkExpectedSubs(t, 40, sa) + checkExpectedSubs(t, 45, sa) // Create a client on B and see if we receive the event urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) @@ -2123,6 +2168,8 @@ func TestServerEventsPingMonitorz(t *testing.T) { []string{"now", "routes"}}, {"JSZ", nil, &JSzOptions{}, []string{"now", "disabled"}}, + + {"HEALTHZ", nil, &JSzOptions{}, []string{"status"}}, } for i, test := range tests { diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 04ddf18a..560cacf1 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -3948,7 +3948,7 @@ func TestJetStreamClusterPeerEvacuationAndStreamReassignment(t *testing.T) { }) } // Now wait until the stream is now current. - checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { + checkFor(t, 50*time.Second, 100*time.Millisecond, func() error { si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second)) if err != nil { return fmt.Errorf("could not fetch stream info: %v", err) diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index df29c7cd..4b2abe7f 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -516,7 +516,7 @@ func TestJetStreamSuperClusterConnectionCount(t *testing.T) { sysNc := natsConnect(t, sc.randomServer().ClientURL(), nats.UserInfo("admin", "s3cr3t!")) defer sysNc.Close() - _, err := sysNc.Request(fmt.Sprintf(accReqSubj, "ONE", "CONNS"), nil, 100*time.Millisecond) + _, err := sysNc.Request(fmt.Sprintf(accDirectReqSubj, "ONE", "CONNS"), nil, 100*time.Millisecond) // this is a timeout as the server only responds when it has connections.... // not convinced this should be that way, but also not the issue to investigate. require_True(t, err == nats.ErrTimeout) @@ -561,7 +561,7 @@ func TestJetStreamSuperClusterConnectionCount(t *testing.T) { // There should be no active NATS CLIENT connections, but we still need // to wait a little bit... checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { - _, err := sysNc.Request(fmt.Sprintf(accReqSubj, "ONE", "CONNS"), nil, 100*time.Millisecond) + _, err := sysNc.Request(fmt.Sprintf(accDirectReqSubj, "ONE", "CONNS"), nil, 100*time.Millisecond) if err != nats.ErrTimeout { return fmt.Errorf("Expected timeout, got %v", err) } @@ -2087,6 +2087,7 @@ func TestJetStreamSuperClusterMovingStreamAndMoveBack(t *testing.T) { checkMove := func(cluster string) { t.Helper() + sc.waitOnStreamLeader("$G", "TEST") checkFor(t, 20*time.Second, 100*time.Millisecond, func() error { si, err := js.StreamInfo("TEST") if err != nil { diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 19c62e50..e14d8fab 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -1306,7 +1306,7 @@ func TestLeafNodePermissions(t *testing.T) { // Create a sub on ">" on LN1 subAll := natsSubSync(t, nc1, ">") // this should be registered in LN2 (there is 1 sub for LN1 $LDS subject) + SYS IMPORTS - checkSubs(ln2.globalAccount(), 8) + checkSubs(ln2.globalAccount(), 10) // Check deny export clause from messages published from LN2 for _, test := range []struct { @@ -1333,7 +1333,7 @@ func TestLeafNodePermissions(t *testing.T) { subAll.Unsubscribe() // Goes down by 1. - checkSubs(ln2.globalAccount(), 7) + checkSubs(ln2.globalAccount(), 9) // We used to make sure we would not do subscriptions however that // was incorrect. We need to check publishes, not the subscriptions. @@ -1358,7 +1358,7 @@ func TestLeafNodePermissions(t *testing.T) { } { t.Run(test.name, func(t *testing.T) { sub := natsSubSync(t, nc2, test.subSubject) - checkSubs(ln2.globalAccount(), 8) + checkSubs(ln2.globalAccount(), 10) if !test.ok { nc1.Publish(test.pubSubject, []byte("msg")) @@ -1366,12 +1366,12 @@ func TestLeafNodePermissions(t *testing.T) { t.Fatalf("Did not expect to get the message") } } else { - checkSubs(ln1.globalAccount(), 7) + checkSubs(ln1.globalAccount(), 9) nc1.Publish(test.pubSubject, []byte("msg")) natsNexMsg(t, sub, time.Second) } sub.Unsubscribe() - checkSubs(ln1.globalAccount(), 6) + checkSubs(ln1.globalAccount(), 8) }) } } @@ -1492,8 +1492,8 @@ func TestLeafNodeExportPermissionsNotForSpecialSubs(t *testing.T) { // The deny is totally restrictive, but make sure that we still accept the $LDS, $GR and _GR_ go from LN1. checkFor(t, time.Second, 15*time.Millisecond, func() error { // We should have registered the 3 subs from the accepting leafnode. - if n := ln2.globalAccount().TotalSubs(); n != 7 { - return fmt.Errorf("Expected %d subs, got %v", 7, n) + if n := ln2.globalAccount().TotalSubs(); n != 8 { + return fmt.Errorf("Expected %d subs, got %v", 8, n) } return nil }) @@ -3395,7 +3395,7 @@ func TestLeafNodeRouteSubWithOrigin(t *testing.T) { r1.Shutdown() checkFor(t, time.Second, 15*time.Millisecond, func() error { acc := l2.GlobalAccount() - if n := acc.TotalSubs(); n != 3 { + if n := acc.TotalSubs(); n != 4 { return fmt.Errorf("Account %q should have 3 subs, got %v", acc.GetName(), n) } return nil diff --git a/server/monitor.go b/server/monitor.go index 77f7d765..8c0b0b58 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -92,10 +92,6 @@ type ConnzOptions struct { // Filter by subject interest FilterSubject string `json:"filter_subject"` - - // Private indication that this request is from an account and not a system account. - // Used to not leak system level information to the account. - isAccountReq bool } // ConnState is for filtering states of connections. We will only have two, open and closed. @@ -256,9 +252,6 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { var clist map[uint64]*client - // If this is an account scoped request from a no $SYS account. - isAccReq := acc != _EMPTY_ && opts.isAccountReq - if acc != _EMPTY_ { var err error a, err = s.lookupAccount(acc) @@ -300,7 +293,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { } // We may need to filter these connections. - if isAccReq && len(closedClients) > 0 { + if acc != _EMPTY_ && len(closedClients) > 0 { var ccc []*closedClient for _, cc := range closedClients { if cc.acc == acc { @@ -1348,6 +1341,7 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { JetStream Connections Accounts + Account stats Subscriptions Routes LeafNodes @@ -1362,6 +1356,7 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) { s.basePath(JszPath), s.basePath(ConnzPath), s.basePath(AccountzPath), + s.basePath(AccountStatzPath), s.basePath(SubszPath), s.basePath(RoutezPath), s.basePath(LeafzPath), @@ -2156,6 +2151,78 @@ func (s *Server) HandleLeafz(w http.ResponseWriter, r *http.Request) { ResponseHandler(w, r, b) } +// Leafz represents detailed information on Leafnodes. +type AccountStatz struct { + ID string `json:"server_id"` + Now time.Time `json:"now"` + Accounts []*AccountStat `json:"account_statz"` +} + +// LeafzOptions are options passed to Leafz +type AccountStatzOptions struct { + Accounts []string `json:"accounts"` + IncludeUnused bool `json:"include_unused"` +} + +// Leafz returns a AccountStatz structure containing summary information about accounts. +func (s *Server) AccountStatz(opts *AccountStatzOptions) (*AccountStatz, error) { + stz := &AccountStatz{ + ID: s.ID(), + Now: time.Now().UTC(), + Accounts: []*AccountStat{}, + } + if opts == nil || len(opts.Accounts) == 0 { + s.accounts.Range(func(key, a interface{}) bool { + acc := a.(*Account) + acc.mu.RLock() + if opts.IncludeUnused || acc.numLocalConnections() != 0 { + stz.Accounts = append(stz.Accounts, acc.statz()) + } + acc.mu.RUnlock() + return true + }) + } else { + for _, a := range opts.Accounts { + if acc, ok := s.accounts.Load(a); ok { + acc := acc.(*Account) + acc.mu.RLock() + if opts.IncludeUnused || acc.numLocalConnections() != 0 { + stz.Accounts = append(stz.Accounts, acc.statz()) + } + acc.mu.RUnlock() + } + } + } + return stz, nil +} + +// HandleAccountStatz process HTTP requests for statz information of all accounts. +func (s *Server) HandleAccountStatz(w http.ResponseWriter, r *http.Request) { + s.mu.Lock() + s.httpReqStats[AccountStatzPath]++ + s.mu.Unlock() + + unused, err := decodeBool(w, r, "unused") + if err != nil { + return + } + + l, err := s.AccountStatz(&AccountStatzOptions{IncludeUnused: unused}) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(err.Error())) + return + } + b, err := json.MarshalIndent(l, "", " ") + if err != nil { + s.Errorf("Error marshaling response to %s request: %v", AccountStatzPath, err) + return + } + + // Handle response + ResponseHandler(w, r, b) +} + // ResponseHandler handles responses for monitoring routes func ResponseHandler(w http.ResponseWriter, r *http.Request, data []byte) { // Get callback from request diff --git a/server/monitor_test.go b/server/monitor_test.go index 9a0db9ed..71f10951 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -3854,7 +3854,7 @@ func TestMonitorLeafz(t *testing.T) { t.Fatalf("RTT not tracked?") } // LDS should be only one. - if ln.NumSubs != 3 || len(ln.Subs) != 3 { + if ln.NumSubs != 4 || len(ln.Subs) != 4 { t.Fatalf("Expected 3 subs, got %v (%v)", ln.NumSubs, ln.Subs) } } @@ -3864,28 +3864,26 @@ func TestMonitorLeafz(t *testing.T) { func TestMonitorAccountz(t *testing.T) { s := RunServer(DefaultMonitorOptions()) defer s.Shutdown() - body := string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d/accountz", s.MonitorAddr().Port))) - if !strings.Contains(body, `$G`) { - t.Fatalf("Body missing value. Contains: %s", body) - } else if !strings.Contains(body, `$SYS`) { - t.Fatalf("Body missing value. Contains: %s", body) - } else if !strings.Contains(body, `"accounts": [`) { - t.Fatalf("Body missing value. Contains: %s", body) - } else if !strings.Contains(body, `"system_account": "$SYS"`) { - t.Fatalf("Body missing value. Contains: %s", body) - } - body = string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d/accountz?acc=$SYS", s.MonitorAddr().Port))) - if !strings.Contains(body, `"account_detail": {`) { - t.Fatalf("Body missing value. Contains: %s", body) - } else if !strings.Contains(body, `"account_name": "$SYS",`) { - t.Fatalf("Body missing value. Contains: %s", body) - } else if !strings.Contains(body, `"subscriptions": 36,`) { - t.Fatalf("Body missing value. Contains: %s", body) - } else if !strings.Contains(body, `"is_system": true,`) { - t.Fatalf("Body missing value. Contains: %s", body) - } else if !strings.Contains(body, `"system_account": "$SYS"`) { - t.Fatalf("Body missing value. Contains: %s", body) - } + body := string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d%s", s.MonitorAddr().Port, AccountzPath))) + require_Contains(t, body, `$G`) + require_Contains(t, body, `$SYS`) + require_Contains(t, body, `"accounts": [`) + require_Contains(t, body, `"system_account": "$SYS"`) + + body = string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d%s?acc=$SYS", s.MonitorAddr().Port, AccountzPath))) + require_Contains(t, body, `"account_detail": {`) + require_Contains(t, body, `"account_name": "$SYS",`) + require_Contains(t, body, `"subscriptions": 40,`) + require_Contains(t, body, `"is_system": true,`) + require_Contains(t, body, `"system_account": "$SYS"`) + + body = string(readBody(t, fmt.Sprintf("http://127.0.0.1:%d%s?unused=1", s.MonitorAddr().Port, AccountStatzPath))) + require_Contains(t, body, `"acc": "$G"`) + require_Contains(t, body, `"acc": "$SYS"`) + require_Contains(t, body, `"sent": {`) + require_Contains(t, body, `"received": {`) + require_Contains(t, body, `"total_conns": 0,`) + require_Contains(t, body, `"leafnodes": 0,`) } func TestMonitorAuthorizedUsers(t *testing.T) { diff --git a/server/routes_test.go b/server/routes_test.go index e68dcaa4..69205ae7 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -1095,7 +1095,7 @@ func TestRouteNoCrashOnAddingSubToRoute(t *testing.T) { // Make sure all subs are registered in s. checkFor(t, time.Second, 15*time.Millisecond, func() error { - if ts := s.globalAccount().TotalSubs() - 3; ts != int(numRoutes) { + if ts := s.globalAccount().TotalSubs() - 4; ts != int(numRoutes) { return fmt.Errorf("Not all %d routed subs were registered: %d", numRoutes, ts) } return nil diff --git a/server/server.go b/server/server.go index 6901701e..3e9256a2 100644 --- a/server/server.go +++ b/server/server.go @@ -2267,18 +2267,19 @@ func (s *Server) StartMonitoring() error { // HTTP endpoints const ( - RootPath = "/" - VarzPath = "/varz" - ConnzPath = "/connz" - RoutezPath = "/routez" - GatewayzPath = "/gatewayz" - LeafzPath = "/leafz" - SubszPath = "/subsz" - StackszPath = "/stacksz" - AccountzPath = "/accountz" - JszPath = "/jsz" - HealthzPath = "/healthz" - IPQueuesPath = "/ipqueuesz" + RootPath = "/" + VarzPath = "/varz" + ConnzPath = "/connz" + RoutezPath = "/routez" + GatewayzPath = "/gatewayz" + LeafzPath = "/leafz" + SubszPath = "/subsz" + StackszPath = "/stacksz" + AccountzPath = "/accountz" + AccountStatzPath = "/accstatz" + JszPath = "/jsz" + HealthzPath = "/healthz" + IPQueuesPath = "/ipqueuesz" ) func (s *Server) basePath(p string) string { @@ -2381,6 +2382,8 @@ func (s *Server) startMonitoring(secure bool) error { mux.HandleFunc(s.basePath(StackszPath), s.HandleStacksz) // Accountz mux.HandleFunc(s.basePath(AccountzPath), s.HandleAccountz) + // Accstatz + mux.HandleFunc(s.basePath(AccountStatzPath), s.HandleAccountStatz) // Jsz mux.HandleFunc(s.basePath(JszPath), s.HandleJsz) // Healthz diff --git a/test/leafnode_test.go b/test/leafnode_test.go index 7e49715b..b0b1c8b5 100644 --- a/test/leafnode_test.go +++ b/test/leafnode_test.go @@ -440,7 +440,7 @@ func TestLeafNodeAndRoutes(t *testing.T) { lc := createLeafConn(t, optsA.LeafNode.Host, optsA.LeafNode.Port) defer lc.Close() - leafSend, leafExpect := setupLeaf(t, lc, 4) + leafSend, leafExpect := setupLeaf(t, lc, 5) leafSend("PING\r\n") leafExpect(pongRe) @@ -834,7 +834,7 @@ func TestLeafNodeGatewaySendsSystemEvent(t *testing.T) { defer lc.Close() // This is for our global responses since we are setting up GWs above. - leafSend, leafExpect := setupLeaf(t, lc, 6) + leafSend, leafExpect := setupLeaf(t, lc, 7) leafSend("PING\r\n") leafExpect(pongRe) @@ -878,13 +878,13 @@ func TestLeafNodeGatewayInterestPropagation(t *testing.T) { buf := leafExpect(infoRe) buf = infoRe.ReplaceAll(buf, []byte(nil)) foundFoo := false - for count := 0; count != 8; { + for count := 0; count != 9; { // skip first time if we still have data (buf from above may already have some left) if count != 0 || len(buf) == 0 { buf = append(buf, leafExpect(anyRe)...) } count += len(lsubRe.FindAllSubmatch(buf, -1)) - if count > 8 { + if count > 9 { t.Fatalf("Expected %v matches, got %v (buf=%s)", 8, count, buf) } if strings.Contains(string(buf), "foo") { @@ -937,7 +937,7 @@ func TestLeafNodeWithRouteAndGateway(t *testing.T) { defer lc.Close() // This is for our global responses since we are setting up GWs above. - leafSend, leafExpect := setupLeaf(t, lc, 6) + leafSend, leafExpect := setupLeaf(t, lc, 7) leafSend("PING\r\n") leafExpect(pongRe) @@ -996,7 +996,7 @@ func TestLeafNodeWithGatewaysAndStaggeredStart(t *testing.T) { lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port) defer lc.Close() - leafSend, leafExpect := setupLeaf(t, lc, 6) + leafSend, leafExpect := setupLeaf(t, lc, 7) leafSend("PING\r\n") leafExpect(pongRe) @@ -1036,7 +1036,7 @@ func TestLeafNodeWithGatewaysServerRestart(t *testing.T) { lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port) defer lc.Close() - leafSend, leafExpect := setupLeaf(t, lc, 6) + leafSend, leafExpect := setupLeaf(t, lc, 7) leafSend("PING\r\n") leafExpect(pongRe) @@ -1070,7 +1070,7 @@ func TestLeafNodeWithGatewaysServerRestart(t *testing.T) { lc = createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port) defer lc.Close() - _, leafExpect = setupLeaf(t, lc, 6) + _, leafExpect = setupLeaf(t, lc, 7) // Now wait on GW solicit to fire time.Sleep(500 * time.Millisecond) @@ -1530,7 +1530,7 @@ func TestLeafNodeMultipleAccounts(t *testing.T) { defer s.Shutdown() // Setup the two accounts for this server. - _, akp1 := createAccount(t, s) + a, akp1 := createAccount(t, s) kp1, _ := nkeys.CreateUser() pub1, _ := kp1.PublicKey() nuc1 := jwt.NewUserClaims(pub1) @@ -1575,12 +1575,7 @@ func TestLeafNodeMultipleAccounts(t *testing.T) { lsub, _ := ncl.SubscribeSync("foo.test") // Wait for the subs to propagate. LDS + foo.test - checkFor(t, 2*time.Second, 10*time.Millisecond, func() error { - if subs := s.NumSubscriptions(); subs < 4 { - return fmt.Errorf("Number of subs is %d", subs) - } - return nil - }) + checkSubInterest(t, s, a.GetName(), "foo.test", 2*time.Second) // Now send from nc1 with account 1, should be received by our leafnode subscriber. nc1.Publish("foo.test", nil) @@ -1909,12 +1904,7 @@ func TestLeafNodeExportsImports(t *testing.T) { lsub, _ := ncl.SubscribeSync("import.foo.stream") // Wait for all subs to propagate. - checkFor(t, time.Second, 10*time.Millisecond, func() error { - if subs := s.NumSubscriptions(); subs < 5 { - return fmt.Errorf("Number of subs is %d", subs) - } - return nil - }) + checkSubInterest(t, s, acc1.GetName(), "import.foo.stream", time.Second) // Pub to other account with export on original subject. nc2.Publish("foo.stream", nil) @@ -2074,7 +2064,7 @@ func TestLeafNodeExportImportComplexSetup(t *testing.T) { // Wait for the sub to propagate to s2. LDS + subject above. checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { - if acc1.RoutedSubs() != 4 { + if acc1.RoutedSubs() != 5 { return fmt.Errorf("Still no routed subscription: %d", acc1.RoutedSubs()) } return nil @@ -2660,7 +2650,7 @@ func TestLeafNodeSwitchGatewayToInterestModeOnly(t *testing.T) { defer lc.Close() // This is for our global responses since we are setting up GWs above. - leafSend, leafExpect := setupLeaf(t, lc, 6) + leafSend, leafExpect := setupLeaf(t, lc, 7) leafSend("PING\r\n") leafExpect(pongRe) }