From 913355250d2a2550dcd63290b58a21935c385b43 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Tue, 11 Aug 2020 17:50:59 -0400 Subject: [PATCH] Create dedicated options struct for system events that invoke monitoring The dedicated struct contains filter options not used in monitoring This also alters the json to filter by server name from "name" to "server_name". Filtering is not released yet. Thus ok to change. Signed-off-by: Matthias Hanel --- server/events.go | 113 ++++++++++++++++++++++++++++-------------- server/events_test.go | 93 +++++++++++++++++++++------------- 2 files changed, 135 insertions(+), 71 deletions(-) diff --git a/server/events.go b/server/events.go index 1e5ec2fd..df2d8cb1 100644 --- a/server/events.go +++ b/server/events.go @@ -600,28 +600,28 @@ func (s *Server) initEventTracking() { monSrvc := map[string]msgHandler{ "VARZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { - optz := &VarzOptions{} - s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Varz(optz) }) + optz := &VarzEventOptions{} + s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Varz(&optz.VarzOptions) }) }, "SUBSZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { - optz := &SubszOptions{} - s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Subsz(optz) }) + optz := &SubszEventOptions{} + s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Subsz(&optz.SubszOptions) }) }, "CONNZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { - optz := &ConnzOptions{} - s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Connz(optz) }) + optz := &ConnzEventOptions{} + s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Connz(&optz.ConnzOptions) }) }, "ROUTEZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { - optz := &RoutezOptions{} - s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Routez(optz) }) + optz := &RoutezEventOptions{} + s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Routez(&optz.RoutezOptions) }) }, "GATEWAYZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { - optz := &GatewayzOptions{} - s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Gatewayz(optz) }) + optz := &GatewayzEventOptions{} + s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Gatewayz(&optz.GatewayzOptions) }) }, "LEAFZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { - optz := &LeafzOptions{} - s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Leafz(optz) }) + optz := &LeafzEventOptions{} + s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Leafz(&optz.LeafzOptions) }) }, } @@ -833,46 +833,87 @@ func (s *Server) leafNodeConnected(sub *subscription, _ *client, subject, reply } // Common filter options for system requests STATSZ VARZ SUBSZ CONNZ ROUTEZ GATEWAYZ LEAFZ -type ZFilterOptions struct { - Name string `json:"name"` - Cluster string `json:"cluster"` - Host string `json:"host"` +// These options are +type EventFilterOptions struct { + Name string `json:"server_name,omitempty"` // filter by server name + Cluster string `json:"cluster,omitempty"` // filter by cluster name + Host string `json:"host,omitempty"` // filter by host name +} + +// StatszEventOptions are options passed to Statsz +type StatszEventOptions struct { + // No actual options yet + + EventFilterOptions +} + +// In the context of system events, ConnzEventOptions are options passed to Connz +type ConnzEventOptions struct { + ConnzOptions + EventFilterOptions +} + +// In the context of system events, RoutezEventOptions are options passed to Routez +type RoutezEventOptions struct { + RoutezOptions + EventFilterOptions +} + +// In the context of system events, SubzEventOptions are options passed to Subz +type SubszEventOptions struct { + SubszOptions + EventFilterOptions +} + +// In the context of system events, VarzEventOptions are options passed to Varz +type VarzEventOptions struct { + VarzOptions + EventFilterOptions +} + +// In the context of system events, GatewayzEventOptions are options passed to Gatewayz +type GatewayzEventOptions struct { + GatewayzOptions + EventFilterOptions +} + +// In the context of system events, LeafzEventOptions are options passed to Leafz +type LeafzEventOptions struct { + LeafzOptions + EventFilterOptions } // returns true if the request does NOT apply to this server and can be ignored. // DO NOT hold the server lock when -func (s *Server) filterRequest(msg []byte) (bool, error) { - if len(msg) == 0 { - return false, nil - } - var fOpts ZFilterOptions - err := json.Unmarshal(msg, &fOpts) - if err != nil { - return false, err +func (s *Server) filterRequest(fOpts *EventFilterOptions) bool { + if fOpts == nil { + return false } if fOpts.Name != "" && !strings.Contains(s.info.Name, fOpts.Name) { - return true, nil + return true } if fOpts.Host != "" && !strings.Contains(s.info.Host, fOpts.Host) { - return true, nil + return true } if fOpts.Cluster != "" { s.mu.Lock() cluster := s.info.Cluster s.mu.Unlock() if !strings.Contains(cluster, fOpts.Cluster) { - return true, nil + return true } } - return false, nil + return false } -// statszReq is a request for us to respond with current statz. +// statszReq is a request for us to respond with current statsz. func (s *Server) statszReq(sub *subscription, _ *client, subject, reply string, msg []byte) { if !s.EventsEnabled() || reply == _EMPTY_ { return } - if ignore, err := s.filterRequest(msg); err != nil { + opts := StatszEventOptions{} + if len(msg) == 0 { + } else if err := json.Unmarshal(msg, &opts); err != nil { server := &ServerInfo{} response := map[string]interface{}{"server": server} response["error"] = map[string]interface{}{ @@ -881,7 +922,7 @@ func (s *Server) statszReq(sub *subscription, _ *client, subject, reply string, } s.sendInternalMsgLocked(reply, _EMPTY_, server, response) return - } else if ignore { + } else if ignore := s.filterRequest(&opts.EventFilterOptions); ignore { return } s.mu.Lock() @@ -889,7 +930,7 @@ func (s *Server) statszReq(sub *subscription, _ *client, subject, reply string, s.mu.Unlock() } -func (s *Server) zReq(reply string, msg []byte, optz interface{}, respf func() (interface{}, error)) { +func (s *Server) zReq(reply string, msg []byte, fOpts *EventFilterOptions, optz interface{}, respf func() (interface{}, error)) { if !s.EventsEnabled() || reply == _EMPTY_ { return } @@ -898,13 +939,11 @@ func (s *Server) zReq(reply string, msg []byte, optz interface{}, respf func() ( var err error status := 0 if len(msg) != 0 { - filter := false - if filter, err = s.filterRequest(msg); filter { + if err = json.Unmarshal(msg, optz); err != nil { + status = http.StatusBadRequest // status is only included on error, so record how far execution got + } else if s.filterRequest(fOpts) { return - } else if err == nil { - err = json.Unmarshal(msg, optz) } - status = http.StatusBadRequest // status is only included on error, so record how far execution got } if err == nil { response["data"], err = respf() diff --git a/server/events_test.go b/server/events_test.go index 5fc6ef4d..2afd0d86 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1533,48 +1533,59 @@ func TestServerEventsPingStatsZ(t *testing.T) { sa, _, sb, optsB, akp := runTrustedCluster(t) defer sa.Shutdown() defer sb.Shutdown() - url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port) nc, err := nats.Connect(url, createUserCreds(t, sb, akp)) if err != nil { t.Fatalf("Error on connect: %v", err) } defer nc.Close() - - requestTbl := []string{ + test := func(req []byte) { + reply := nc.NewRespInbox() + sub, _ := nc.SubscribeSync(reply) + nc.PublishRequest(serverStatsPingReqSubj, reply, req) + // Make sure its a statsz + m := ServerStatsMsg{} + // Receive both manually. + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + if err := json.Unmarshal(msg.Data, &m); err != nil { + t.Fatalf("Error unmarshalling the statz json: %v", err) + } + msg, err = sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + if err := json.Unmarshal(msg.Data, &m); err != nil { + t.Fatalf("Error unmarshalling the statz json: %v", err) + } + } + strRequestTbl := []string{ `{"cluster":"TEST"}`, `{"cluster":"CLUSTER"}`, - `{"name":"SRV"}`, - `{"name":"_"}`, + `{"server-name":"SRV"}`, + `{"server-name":"_"}`, fmt.Sprintf(`{"host":"%s"}`, optsB.Host), fmt.Sprintf(`{"host":"%s", "cluster":"CLUSTER", "name":"SRV"}`, optsB.Host), } - - for _, msg := range requestTbl { - t.Run(msg, func(t *testing.T) { - reply := nc.NewRespInbox() - sub, _ := nc.SubscribeSync(reply) - - nc.PublishRequest(serverStatsPingReqSubj, reply, []byte(msg)) - - // Make sure its a statsz - m := ServerStatsMsg{} - - // Receive both manually. - msg, err := sub.NextMsg(time.Second) - if err != nil { - t.Fatalf("Error receiving msg: %v", err) - } - if err := json.Unmarshal(msg.Data, &m); err != nil { - t.Fatalf("Error unmarshalling the statz json: %v", err) - } - msg, err = sub.NextMsg(time.Second) - if err != nil { - t.Fatalf("Error receiving msg: %v", err) - } - if err := json.Unmarshal(msg.Data, &m); err != nil { - t.Fatalf("Error unmarshalling the statz json: %v", err) - } + for i, opt := range strRequestTbl { + t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) { + test([]byte(opt)) + }) + } + requestTbl := []StatszEventOptions{ + {EventFilterOptions: EventFilterOptions{Cluster: "TEST"}}, + {EventFilterOptions: EventFilterOptions{Cluster: "CLUSTER"}}, + {EventFilterOptions: EventFilterOptions{Name: "SRV"}}, + {EventFilterOptions: EventFilterOptions{Name: "_"}}, + {EventFilterOptions: EventFilterOptions{Host: optsB.Host}}, + {EventFilterOptions: EventFilterOptions{Host: optsB.Host, Cluster: "CLUSTER", Name: "SRV"}}, + } + for i, opt := range requestTbl { + t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) { + msg, _ := json.MarshalIndent(&opt, "", " ") + test(msg) }) } } @@ -1594,10 +1605,24 @@ func TestServerEventsPingStatsZFilter(t *testing.T) { requestTbl := []string{ `{"cluster":"DOESNOTEXIST"}`, `{"host":"DOESNOTEXIST"}`, - `{"name":"DOESNOTEXIST"}`, + `{"server_name":"DOESNOTEXIST"}`, } - for _, msg := range requestTbl { - t.Run(msg, func(t *testing.T) { + for i, msg := range requestTbl { + t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) { + // Receive both manually. + if _, err := nc.Request(serverStatsPingReqSubj, []byte(msg), time.Second/4); err != nats.ErrTimeout { + t.Fatalf("Error, expected timeout: %v", err) + } + }) + } + requestObjTbl := []EventFilterOptions{ + {Cluster: "DOESNOTEXIST"}, + {Host: "DOESNOTEXIST"}, + {Name: "DOESNOTEXIST"}, + } + for i, opt := range requestObjTbl { + t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) { + msg, _ := json.MarshalIndent(&opt, "", " ") // Receive both manually. if _, err := nc.Request(serverStatsPingReqSubj, []byte(msg), time.Second/4); err != nats.ErrTimeout { t.Fatalf("Error, expected timeout: %v", err)