Create dedicated options struct for system events that invoke monitoring

The dedicated struct contains filter options not used in monitoring
This also alters the json to filter by server name from "name" to
"server_name". Filtering is not released yet. Thus ok to change.

Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
Matthias Hanel
2020-08-11 17:50:59 -04:00
parent 400b044ea0
commit 913355250d
2 changed files with 135 additions and 71 deletions

View File

@@ -600,28 +600,28 @@ func (s *Server) initEventTracking() {
monSrvc := map[string]msgHandler{
"VARZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
optz := &VarzOptions{}
s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Varz(optz) })
optz := &VarzEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Varz(&optz.VarzOptions) })
},
"SUBSZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
optz := &SubszOptions{}
s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Subsz(optz) })
optz := &SubszEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Subsz(&optz.SubszOptions) })
},
"CONNZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
optz := &ConnzOptions{}
s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Connz(optz) })
optz := &ConnzEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Connz(&optz.ConnzOptions) })
},
"ROUTEZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
optz := &RoutezOptions{}
s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Routez(optz) })
optz := &RoutezEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Routez(&optz.RoutezOptions) })
},
"GATEWAYZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
optz := &GatewayzOptions{}
s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Gatewayz(optz) })
optz := &GatewayzEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Gatewayz(&optz.GatewayzOptions) })
},
"LEAFZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
optz := &LeafzOptions{}
s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Leafz(optz) })
optz := &LeafzEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Leafz(&optz.LeafzOptions) })
},
}
@@ -833,46 +833,87 @@ func (s *Server) leafNodeConnected(sub *subscription, _ *client, subject, reply
}
// Common filter options for system requests STATSZ VARZ SUBSZ CONNZ ROUTEZ GATEWAYZ LEAFZ
type ZFilterOptions struct {
Name string `json:"name"`
Cluster string `json:"cluster"`
Host string `json:"host"`
// These options are
type EventFilterOptions struct {
Name string `json:"server_name,omitempty"` // filter by server name
Cluster string `json:"cluster,omitempty"` // filter by cluster name
Host string `json:"host,omitempty"` // filter by host name
}
// StatszEventOptions are options passed to Statsz
type StatszEventOptions struct {
// No actual options yet
EventFilterOptions
}
// In the context of system events, ConnzEventOptions are options passed to Connz
type ConnzEventOptions struct {
ConnzOptions
EventFilterOptions
}
// In the context of system events, RoutezEventOptions are options passed to Routez
type RoutezEventOptions struct {
RoutezOptions
EventFilterOptions
}
// In the context of system events, SubzEventOptions are options passed to Subz
type SubszEventOptions struct {
SubszOptions
EventFilterOptions
}
// In the context of system events, VarzEventOptions are options passed to Varz
type VarzEventOptions struct {
VarzOptions
EventFilterOptions
}
// In the context of system events, GatewayzEventOptions are options passed to Gatewayz
type GatewayzEventOptions struct {
GatewayzOptions
EventFilterOptions
}
// In the context of system events, LeafzEventOptions are options passed to Leafz
type LeafzEventOptions struct {
LeafzOptions
EventFilterOptions
}
// returns true if the request does NOT apply to this server and can be ignored.
// DO NOT hold the server lock when
func (s *Server) filterRequest(msg []byte) (bool, error) {
if len(msg) == 0 {
return false, nil
}
var fOpts ZFilterOptions
err := json.Unmarshal(msg, &fOpts)
if err != nil {
return false, err
func (s *Server) filterRequest(fOpts *EventFilterOptions) bool {
if fOpts == nil {
return false
}
if fOpts.Name != "" && !strings.Contains(s.info.Name, fOpts.Name) {
return true, nil
return true
}
if fOpts.Host != "" && !strings.Contains(s.info.Host, fOpts.Host) {
return true, nil
return true
}
if fOpts.Cluster != "" {
s.mu.Lock()
cluster := s.info.Cluster
s.mu.Unlock()
if !strings.Contains(cluster, fOpts.Cluster) {
return true, nil
return true
}
}
return false, nil
return false
}
// statszReq is a request for us to respond with current statz.
// statszReq is a request for us to respond with current statsz.
func (s *Server) statszReq(sub *subscription, _ *client, subject, reply string, msg []byte) {
if !s.EventsEnabled() || reply == _EMPTY_ {
return
}
if ignore, err := s.filterRequest(msg); err != nil {
opts := StatszEventOptions{}
if len(msg) == 0 {
} else if err := json.Unmarshal(msg, &opts); err != nil {
server := &ServerInfo{}
response := map[string]interface{}{"server": server}
response["error"] = map[string]interface{}{
@@ -881,7 +922,7 @@ func (s *Server) statszReq(sub *subscription, _ *client, subject, reply string,
}
s.sendInternalMsgLocked(reply, _EMPTY_, server, response)
return
} else if ignore {
} else if ignore := s.filterRequest(&opts.EventFilterOptions); ignore {
return
}
s.mu.Lock()
@@ -889,7 +930,7 @@ func (s *Server) statszReq(sub *subscription, _ *client, subject, reply string,
s.mu.Unlock()
}
func (s *Server) zReq(reply string, msg []byte, optz interface{}, respf func() (interface{}, error)) {
func (s *Server) zReq(reply string, msg []byte, fOpts *EventFilterOptions, optz interface{}, respf func() (interface{}, error)) {
if !s.EventsEnabled() || reply == _EMPTY_ {
return
}
@@ -898,13 +939,11 @@ func (s *Server) zReq(reply string, msg []byte, optz interface{}, respf func() (
var err error
status := 0
if len(msg) != 0 {
filter := false
if filter, err = s.filterRequest(msg); filter {
if err = json.Unmarshal(msg, optz); err != nil {
status = http.StatusBadRequest // status is only included on error, so record how far execution got
} else if s.filterRequest(fOpts) {
return
} else if err == nil {
err = json.Unmarshal(msg, optz)
}
status = http.StatusBadRequest // status is only included on error, so record how far execution got
}
if err == nil {
response["data"], err = respf()

View File

@@ -1533,48 +1533,59 @@ func TestServerEventsPingStatsZ(t *testing.T) {
sa, _, sb, optsB, akp := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
nc, err := nats.Connect(url, createUserCreds(t, sb, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
requestTbl := []string{
test := func(req []byte) {
reply := nc.NewRespInbox()
sub, _ := nc.SubscribeSync(reply)
nc.PublishRequest(serverStatsPingReqSubj, reply, req)
// Make sure its a statsz
m := ServerStatsMsg{}
// Receive both manually.
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if err := json.Unmarshal(msg.Data, &m); err != nil {
t.Fatalf("Error unmarshalling the statz json: %v", err)
}
msg, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if err := json.Unmarshal(msg.Data, &m); err != nil {
t.Fatalf("Error unmarshalling the statz json: %v", err)
}
}
strRequestTbl := []string{
`{"cluster":"TEST"}`,
`{"cluster":"CLUSTER"}`,
`{"name":"SRV"}`,
`{"name":"_"}`,
`{"server-name":"SRV"}`,
`{"server-name":"_"}`,
fmt.Sprintf(`{"host":"%s"}`, optsB.Host),
fmt.Sprintf(`{"host":"%s", "cluster":"CLUSTER", "name":"SRV"}`, optsB.Host),
}
for _, msg := range requestTbl {
t.Run(msg, func(t *testing.T) {
reply := nc.NewRespInbox()
sub, _ := nc.SubscribeSync(reply)
nc.PublishRequest(serverStatsPingReqSubj, reply, []byte(msg))
// Make sure its a statsz
m := ServerStatsMsg{}
// Receive both manually.
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if err := json.Unmarshal(msg.Data, &m); err != nil {
t.Fatalf("Error unmarshalling the statz json: %v", err)
}
msg, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if err := json.Unmarshal(msg.Data, &m); err != nil {
t.Fatalf("Error unmarshalling the statz json: %v", err)
}
for i, opt := range strRequestTbl {
t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) {
test([]byte(opt))
})
}
requestTbl := []StatszEventOptions{
{EventFilterOptions: EventFilterOptions{Cluster: "TEST"}},
{EventFilterOptions: EventFilterOptions{Cluster: "CLUSTER"}},
{EventFilterOptions: EventFilterOptions{Name: "SRV"}},
{EventFilterOptions: EventFilterOptions{Name: "_"}},
{EventFilterOptions: EventFilterOptions{Host: optsB.Host}},
{EventFilterOptions: EventFilterOptions{Host: optsB.Host, Cluster: "CLUSTER", Name: "SRV"}},
}
for i, opt := range requestTbl {
t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) {
msg, _ := json.MarshalIndent(&opt, "", " ")
test(msg)
})
}
}
@@ -1594,10 +1605,24 @@ func TestServerEventsPingStatsZFilter(t *testing.T) {
requestTbl := []string{
`{"cluster":"DOESNOTEXIST"}`,
`{"host":"DOESNOTEXIST"}`,
`{"name":"DOESNOTEXIST"}`,
`{"server_name":"DOESNOTEXIST"}`,
}
for _, msg := range requestTbl {
t.Run(msg, func(t *testing.T) {
for i, msg := range requestTbl {
t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) {
// Receive both manually.
if _, err := nc.Request(serverStatsPingReqSubj, []byte(msg), time.Second/4); err != nats.ErrTimeout {
t.Fatalf("Error, expected timeout: %v", err)
}
})
}
requestObjTbl := []EventFilterOptions{
{Cluster: "DOESNOTEXIST"},
{Host: "DOESNOTEXIST"},
{Name: "DOESNOTEXIST"},
}
for i, opt := range requestObjTbl {
t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) {
msg, _ := json.MarshalIndent(&opt, "", " ")
// Receive both manually.
if _, err := nc.Request(serverStatsPingReqSubj, []byte(msg), time.Second/4); err != nats.ErrTimeout {
t.Fatalf("Error, expected timeout: %v", err)