Making monitoring endpoints available via system services.

Available via $SYS.REQ.SERVER.%s.%s and $SYS.REQ.SERVER.PING.%s
Last token is the endpoint name.

Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
Matthias Hanel
2020-04-27 21:12:39 -04:00
parent a8be338bc1
commit 14c716052d
3 changed files with 195 additions and 4 deletions

View File

@@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"math/rand"
"net/http"
"strconv"
"strings"
"sync"
@@ -566,6 +567,45 @@ func (s *Server) initEventTracking() {
if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.statszReq); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}
monSrvc := map[string]msgHandler{
"VARZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
optz := &VarzOptions{}
s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Varz(optz) })
},
"SUBSZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
optz := &SubszOptions{}
s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Subsz(optz) })
},
"CONNZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
optz := &ConnzOptions{}
s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Connz(optz) })
},
"ROUTEZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
optz := &RoutezOptions{}
s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Routez(optz) })
},
"GATEWAYZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
optz := &GatewayzOptions{}
s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Gatewayz(optz) })
},
"LEAFZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
optz := &LeafzOptions{}
s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Leafz(optz) })
},
}
for name, req := range monSrvc {
subject = fmt.Sprintf("$SYS.REQ.SERVER.%s.%s", s.info.ID, name)
if _, err := s.sysSubscribe(subject, req); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}
subject = fmt.Sprintf("$SYS.REQ.SERVER.PING.%s", name)
if _, err := s.sysSubscribe(subject, req); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}
}
// Listen for updates when leaf nodes connect for a given account. This will
// force any gateway connections to move to `modeInterestOnly`
subject = fmt.Sprintf(leafNodeConnectEventSubj, "*")
@@ -767,6 +807,31 @@ func (s *Server) statszReq(sub *subscription, _ *client, subject, reply string,
s.sendStatsz(reply)
}
func (s *Server) zReq(reply string, msg []byte, optz interface{}, respf func() (interface{}, error)) {
if !s.EventsEnabled() || reply == _EMPTY_ {
return
}
server := &ServerInfo{}
response := map[string]interface{}{"server": server}
var err error
status := 0
if len(msg) != 0 {
err = json.Unmarshal(msg, optz)
status = http.StatusBadRequest // status is only included on error, so record how far execution got
}
if err == nil {
response["data"], err = respf()
status = http.StatusInternalServerError
}
if err != nil {
response["error"] = map[string]interface{}{
"code": status,
"description": err.Error(),
}
}
s.sendInternalMsgLocked(reply, _EMPTY_, server, response)
}
// remoteConnsUpdate gets called when we receive a remote update from another server.
func (s *Server) remoteConnsUpdate(sub *subscription, _ *client, subject, reply string, msg []byte) {
if !s.eventsRunning() {

View File

@@ -1224,7 +1224,7 @@ func TestSystemAccountWithGateways(t *testing.T) {
// If this tests fails with wrong number after 10 seconds we may have
// added a new inititial subscription for the eventing system.
checkExpectedSubs(t, 13, sa)
checkExpectedSubs(t, 25, sa)
// Create a client on B and see if we receive the event
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
@@ -1436,6 +1436,132 @@ func TestServerEventsPingStatsZ(t *testing.T) {
}
}
func TestServerEventsPingMonitorz(t *testing.T) {
sa, _, sb, optsB, akp := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
nc, err := nats.Connect(url, createUserCreds(t, sb, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
nc.Flush()
tests := []struct {
endpoint string
opt interface{}
resp interface{}
respField []string
}{
{"VARZ", nil, &Varz{},
[]string{"now", "cpu"}},
{"SUBSZ", nil, &Subsz{},
[]string{"num_subscriptions", "num_cache"}},
{"CONNZ", nil, &Connz{},
[]string{"now", "connections"}},
{"ROUTEZ", nil, &Routez{},
[]string{"now", "routes"}},
{"GATEWAYZ", nil, &Gatewayz{},
[]string{"now", "outbound_gateways", "inbound_gateways"}},
{"LEAFZ", nil, &Leafz{},
[]string{"now", "leafs"}},
{"SUBSZ", &SubszOptions{}, &Subsz{},
[]string{"num_subscriptions", "num_cache"}},
{"CONNZ", &ConnzOptions{}, &Connz{},
[]string{"now", "connections"}},
{"ROUTEZ", &RoutezOptions{}, &Routez{},
[]string{"now", "routes"}},
{"GATEWAYZ", &GatewayzOptions{}, &Gatewayz{},
[]string{"now", "outbound_gateways", "inbound_gateways"}},
{"LEAFZ", &LeafzOptions{}, &Leafz{},
[]string{"now", "leafs"}},
{"SUBSZ", &SubszOptions{Limit: 5}, &Subsz{},
[]string{"num_subscriptions", "num_cache"}},
{"CONNZ", &ConnzOptions{Limit: 5}, &Connz{},
[]string{"now", "connections"}},
{"ROUTEZ", &RoutezOptions{SubscriptionsDetail: true}, &Routez{},
[]string{"now", "routes"}},
{"GATEWAYZ", &GatewayzOptions{Accounts: true}, &Gatewayz{},
[]string{"now", "outbound_gateways", "inbound_gateways"}},
{"LEAFZ", &LeafzOptions{Subscriptions: true}, &Leafz{},
[]string{"now", "leafs"}},
}
for i, test := range tests {
t.Run(fmt.Sprintf("%s-%d", test.endpoint, i), func(t *testing.T) {
var opt []byte
if test.opt != nil {
opt, err = json.Marshal(test.opt)
if err != nil {
t.Fatalf("Error marshaling opts: %v", err)
}
}
reply := nc.NewRespInbox()
replySubj, _ := nc.SubscribeSync(reply)
destSubj := fmt.Sprintf("%s.%s", serverStatsPingReqSubj, test.endpoint)
nc.PublishRequest(destSubj, reply, opt)
// Receive both manually.
msg, err := replySubj.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
response1 := make(map[string]map[string]interface{})
if err := json.Unmarshal(msg.Data, &response1); err != nil {
t.Fatalf("Error unmarshalling response1 json: %v", err)
}
serverName := ""
if response1["server"]["name"] == "A" {
serverName = "B"
} else if response1["server"]["name"] == "B" {
serverName = "A"
} else {
t.Fatalf("Error finding server in %s", string(msg.Data))
}
if resp, ok := response1["data"]; !ok {
t.Fatalf("Error finding: %s in %s",
strings.ToLower(test.endpoint), string(msg.Data))
} else {
for _, respField := range test.respField {
if _, ok := resp[respField]; !ok {
t.Fatalf("Error finding: %s in %s", respField, resp)
}
}
}
msg, err = replySubj.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
response2 := make(map[string]map[string]interface{})
if err := json.Unmarshal(msg.Data, &response2); err != nil {
t.Fatalf("Error unmarshalling the response2 json: %v", err)
}
if response2["server"]["name"] != serverName {
t.Fatalf("Error finding server %s in %s", serverName, string(msg.Data))
}
if resp, ok := response2["data"]; !ok {
t.Fatalf("Error finding: %s in %s",
strings.ToLower(test.endpoint), string(msg.Data))
} else {
for _, respField := range test.respField {
if _, ok := resp[respField]; !ok {
t.Fatalf("Error finding: %s in %s", respField, resp)
}
}
}
})
}
}
func TestGatewayNameClientInfo(t *testing.T) {
sa, _, sb, _, _ := runTrustedCluster(t)
defer sa.Shutdown()

View File

@@ -1340,13 +1340,13 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
// GatewayzOptions are the options passed to Gatewayz()
type GatewayzOptions struct {
// Name will output only remote gateways with this name
Name string
Name string `json:"name"`
// Accounts indicates if accounts with its interest should be included in the results.
Accounts bool
Accounts bool `json:"accounts"`
// AccountName will limit the list of accounts to that account name (makes Accounts implicit)
AccountName string
AccountName string `json:"account_name"`
}
// Gatewayz represents detailed information on Gateways