From 14c716052d06c09327dfde5b6c2107f6832a639f Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Mon, 27 Apr 2020 21:12:39 -0400 Subject: [PATCH] Making monitoring endpoints available via system services. Available via $SYS.REQ.SERVER.%s.%s and $SYS.REQ.SERVER.PING.%s Last token is the endpoint name. Signed-off-by: Matthias Hanel --- server/events.go | 65 +++++++++++++++++++++ server/events_test.go | 128 +++++++++++++++++++++++++++++++++++++++++- server/monitor.go | 6 +- 3 files changed, 195 insertions(+), 4 deletions(-) diff --git a/server/events.go b/server/events.go index e931d442..756fd417 100644 --- a/server/events.go +++ b/server/events.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "math/rand" + "net/http" "strconv" "strings" "sync" @@ -566,6 +567,45 @@ func (s *Server) initEventTracking() { if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.statszReq); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } + + monSrvc := map[string]msgHandler{ + "VARZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + optz := &VarzOptions{} + s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Varz(optz) }) + }, + "SUBSZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + optz := &SubszOptions{} + s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Subsz(optz) }) + }, + "CONNZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + optz := &ConnzOptions{} + s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Connz(optz) }) + }, + "ROUTEZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + optz := &RoutezOptions{} + s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Routez(optz) }) + }, + "GATEWAYZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + optz := &GatewayzOptions{} + s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Gatewayz(optz) }) + }, + "LEAFZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { + optz := &LeafzOptions{} + s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Leafz(optz) }) + }, + } + + for name, req := range monSrvc { + subject = fmt.Sprintf("$SYS.REQ.SERVER.%s.%s", s.info.ID, name) + if _, err := s.sysSubscribe(subject, req); err != nil { + s.Errorf("Error setting up internal tracking: %v", err) + } + subject = fmt.Sprintf("$SYS.REQ.SERVER.PING.%s", name) + if _, err := s.sysSubscribe(subject, req); err != nil { + s.Errorf("Error setting up internal tracking: %v", err) + } + } + // Listen for updates when leaf nodes connect for a given account. This will // force any gateway connections to move to `modeInterestOnly` subject = fmt.Sprintf(leafNodeConnectEventSubj, "*") @@ -767,6 +807,31 @@ func (s *Server) statszReq(sub *subscription, _ *client, subject, reply string, s.sendStatsz(reply) } +func (s *Server) zReq(reply string, msg []byte, optz interface{}, respf func() (interface{}, error)) { + if !s.EventsEnabled() || reply == _EMPTY_ { + return + } + server := &ServerInfo{} + response := map[string]interface{}{"server": server} + var err error + status := 0 + if len(msg) != 0 { + err = json.Unmarshal(msg, optz) + status = http.StatusBadRequest // status is only included on error, so record how far execution got + } + if err == nil { + response["data"], err = respf() + status = http.StatusInternalServerError + } + if err != nil { + response["error"] = map[string]interface{}{ + "code": status, + "description": err.Error(), + } + } + s.sendInternalMsgLocked(reply, _EMPTY_, server, response) +} + // remoteConnsUpdate gets called when we receive a remote update from another server. func (s *Server) remoteConnsUpdate(sub *subscription, _ *client, subject, reply string, msg []byte) { if !s.eventsRunning() { diff --git a/server/events_test.go b/server/events_test.go index 21d5cab5..6995ac0f 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1224,7 +1224,7 @@ func TestSystemAccountWithGateways(t *testing.T) { // If this tests fails with wrong number after 10 seconds we may have // added a new inititial subscription for the eventing system. - checkExpectedSubs(t, 13, sa) + checkExpectedSubs(t, 25, sa) // Create a client on B and see if we receive the event urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port) @@ -1436,6 +1436,132 @@ func TestServerEventsPingStatsZ(t *testing.T) { } } +func TestServerEventsPingMonitorz(t *testing.T) { + sa, _, sb, optsB, akp := runTrustedCluster(t) + defer sa.Shutdown() + defer sb.Shutdown() + + url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port) + nc, err := nats.Connect(url, createUserCreds(t, sb, akp)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + nc.Flush() + + tests := []struct { + endpoint string + opt interface{} + resp interface{} + respField []string + }{ + {"VARZ", nil, &Varz{}, + []string{"now", "cpu"}}, + {"SUBSZ", nil, &Subsz{}, + []string{"num_subscriptions", "num_cache"}}, + {"CONNZ", nil, &Connz{}, + []string{"now", "connections"}}, + {"ROUTEZ", nil, &Routez{}, + []string{"now", "routes"}}, + {"GATEWAYZ", nil, &Gatewayz{}, + []string{"now", "outbound_gateways", "inbound_gateways"}}, + {"LEAFZ", nil, &Leafz{}, + []string{"now", "leafs"}}, + + {"SUBSZ", &SubszOptions{}, &Subsz{}, + []string{"num_subscriptions", "num_cache"}}, + {"CONNZ", &ConnzOptions{}, &Connz{}, + []string{"now", "connections"}}, + {"ROUTEZ", &RoutezOptions{}, &Routez{}, + []string{"now", "routes"}}, + {"GATEWAYZ", &GatewayzOptions{}, &Gatewayz{}, + []string{"now", "outbound_gateways", "inbound_gateways"}}, + {"LEAFZ", &LeafzOptions{}, &Leafz{}, + []string{"now", "leafs"}}, + + {"SUBSZ", &SubszOptions{Limit: 5}, &Subsz{}, + []string{"num_subscriptions", "num_cache"}}, + {"CONNZ", &ConnzOptions{Limit: 5}, &Connz{}, + []string{"now", "connections"}}, + {"ROUTEZ", &RoutezOptions{SubscriptionsDetail: true}, &Routez{}, + []string{"now", "routes"}}, + {"GATEWAYZ", &GatewayzOptions{Accounts: true}, &Gatewayz{}, + []string{"now", "outbound_gateways", "inbound_gateways"}}, + {"LEAFZ", &LeafzOptions{Subscriptions: true}, &Leafz{}, + []string{"now", "leafs"}}, + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%s-%d", test.endpoint, i), func(t *testing.T) { + var opt []byte + if test.opt != nil { + opt, err = json.Marshal(test.opt) + if err != nil { + t.Fatalf("Error marshaling opts: %v", err) + } + } + reply := nc.NewRespInbox() + replySubj, _ := nc.SubscribeSync(reply) + + destSubj := fmt.Sprintf("%s.%s", serverStatsPingReqSubj, test.endpoint) + nc.PublishRequest(destSubj, reply, opt) + + // Receive both manually. + msg, err := replySubj.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + response1 := make(map[string]map[string]interface{}) + + if err := json.Unmarshal(msg.Data, &response1); err != nil { + t.Fatalf("Error unmarshalling response1 json: %v", err) + } + + serverName := "" + if response1["server"]["name"] == "A" { + serverName = "B" + } else if response1["server"]["name"] == "B" { + serverName = "A" + } else { + t.Fatalf("Error finding server in %s", string(msg.Data)) + } + if resp, ok := response1["data"]; !ok { + t.Fatalf("Error finding: %s in %s", + strings.ToLower(test.endpoint), string(msg.Data)) + } else { + for _, respField := range test.respField { + if _, ok := resp[respField]; !ok { + t.Fatalf("Error finding: %s in %s", respField, resp) + } + } + } + + msg, err = replySubj.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + response2 := make(map[string]map[string]interface{}) + if err := json.Unmarshal(msg.Data, &response2); err != nil { + t.Fatalf("Error unmarshalling the response2 json: %v", err) + } + if response2["server"]["name"] != serverName { + t.Fatalf("Error finding server %s in %s", serverName, string(msg.Data)) + } + if resp, ok := response2["data"]; !ok { + t.Fatalf("Error finding: %s in %s", + strings.ToLower(test.endpoint), string(msg.Data)) + } else { + for _, respField := range test.respField { + if _, ok := resp[respField]; !ok { + t.Fatalf("Error finding: %s in %s", respField, resp) + } + } + } + }) + } +} + func TestGatewayNameClientInfo(t *testing.T) { sa, _, sb, _, _ := runTrustedCluster(t) defer sa.Shutdown() diff --git a/server/monitor.go b/server/monitor.go index 2a99404a..beb207e3 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1340,13 +1340,13 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) { // GatewayzOptions are the options passed to Gatewayz() type GatewayzOptions struct { // Name will output only remote gateways with this name - Name string + Name string `json:"name"` // Accounts indicates if accounts with its interest should be included in the results. - Accounts bool + Accounts bool `json:"accounts"` // AccountName will limit the list of accounts to that account name (makes Accounts implicit) - AccountName string + AccountName string `json:"account_name"` } // Gatewayz represents detailed information on Gateways