diff --git a/server/events.go b/server/events.go index 1e5ec2fd..df2d8cb1 100644 --- a/server/events.go +++ b/server/events.go @@ -600,28 +600,28 @@ func (s *Server) initEventTracking() { monSrvc := map[string]msgHandler{ "VARZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { - optz := &VarzOptions{} - s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Varz(optz) }) + optz := &VarzEventOptions{} + s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Varz(&optz.VarzOptions) }) }, "SUBSZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { - optz := &SubszOptions{} - s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Subsz(optz) }) + optz := &SubszEventOptions{} + s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Subsz(&optz.SubszOptions) }) }, "CONNZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { - optz := &ConnzOptions{} - s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Connz(optz) }) + optz := &ConnzEventOptions{} + s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Connz(&optz.ConnzOptions) }) }, "ROUTEZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { - optz := &RoutezOptions{} - s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Routez(optz) }) + optz := &RoutezEventOptions{} + s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Routez(&optz.RoutezOptions) }) }, "GATEWAYZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { - optz := &GatewayzOptions{} - s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Gatewayz(optz) }) + optz := &GatewayzEventOptions{} + s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Gatewayz(&optz.GatewayzOptions) }) }, "LEAFZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) { - optz := &LeafzOptions{} - s.zReq(reply, msg, optz, func() (interface{}, error) { return s.Leafz(optz) }) + optz := &LeafzEventOptions{} + s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Leafz(&optz.LeafzOptions) }) }, } @@ -833,46 +833,87 @@ func (s *Server) leafNodeConnected(sub *subscription, _ *client, subject, reply } // Common filter options for system requests STATSZ VARZ SUBSZ CONNZ ROUTEZ GATEWAYZ LEAFZ -type ZFilterOptions struct { - Name string `json:"name"` - Cluster string `json:"cluster"` - Host string `json:"host"` +// These options are +type EventFilterOptions struct { + Name string `json:"server_name,omitempty"` // filter by server name + Cluster string `json:"cluster,omitempty"` // filter by cluster name + Host string `json:"host,omitempty"` // filter by host name +} + +// StatszEventOptions are options passed to Statsz +type StatszEventOptions struct { + // No actual options yet + + EventFilterOptions +} + +// In the context of system events, ConnzEventOptions are options passed to Connz +type ConnzEventOptions struct { + ConnzOptions + EventFilterOptions +} + +// In the context of system events, RoutezEventOptions are options passed to Routez +type RoutezEventOptions struct { + RoutezOptions + EventFilterOptions +} + +// In the context of system events, SubzEventOptions are options passed to Subz +type SubszEventOptions struct { + SubszOptions + EventFilterOptions +} + +// In the context of system events, VarzEventOptions are options passed to Varz +type VarzEventOptions struct { + VarzOptions + EventFilterOptions +} + +// In the context of system events, GatewayzEventOptions are options passed to Gatewayz +type GatewayzEventOptions struct { + GatewayzOptions + EventFilterOptions +} + +// In the context of system events, LeafzEventOptions are options passed to Leafz +type LeafzEventOptions struct { + LeafzOptions + EventFilterOptions } // returns true if the request does NOT apply to this server and can be ignored. // DO NOT hold the server lock when -func (s *Server) filterRequest(msg []byte) (bool, error) { - if len(msg) == 0 { - return false, nil - } - var fOpts ZFilterOptions - err := json.Unmarshal(msg, &fOpts) - if err != nil { - return false, err +func (s *Server) filterRequest(fOpts *EventFilterOptions) bool { + if fOpts == nil { + return false } if fOpts.Name != "" && !strings.Contains(s.info.Name, fOpts.Name) { - return true, nil + return true } if fOpts.Host != "" && !strings.Contains(s.info.Host, fOpts.Host) { - return true, nil + return true } if fOpts.Cluster != "" { s.mu.Lock() cluster := s.info.Cluster s.mu.Unlock() if !strings.Contains(cluster, fOpts.Cluster) { - return true, nil + return true } } - return false, nil + return false } -// statszReq is a request for us to respond with current statz. +// statszReq is a request for us to respond with current statsz. func (s *Server) statszReq(sub *subscription, _ *client, subject, reply string, msg []byte) { if !s.EventsEnabled() || reply == _EMPTY_ { return } - if ignore, err := s.filterRequest(msg); err != nil { + opts := StatszEventOptions{} + if len(msg) == 0 { + } else if err := json.Unmarshal(msg, &opts); err != nil { server := &ServerInfo{} response := map[string]interface{}{"server": server} response["error"] = map[string]interface{}{ @@ -881,7 +922,7 @@ func (s *Server) statszReq(sub *subscription, _ *client, subject, reply string, } s.sendInternalMsgLocked(reply, _EMPTY_, server, response) return - } else if ignore { + } else if ignore := s.filterRequest(&opts.EventFilterOptions); ignore { return } s.mu.Lock() @@ -889,7 +930,7 @@ func (s *Server) statszReq(sub *subscription, _ *client, subject, reply string, s.mu.Unlock() } -func (s *Server) zReq(reply string, msg []byte, optz interface{}, respf func() (interface{}, error)) { +func (s *Server) zReq(reply string, msg []byte, fOpts *EventFilterOptions, optz interface{}, respf func() (interface{}, error)) { if !s.EventsEnabled() || reply == _EMPTY_ { return } @@ -898,13 +939,11 @@ func (s *Server) zReq(reply string, msg []byte, optz interface{}, respf func() ( var err error status := 0 if len(msg) != 0 { - filter := false - if filter, err = s.filterRequest(msg); filter { + if err = json.Unmarshal(msg, optz); err != nil { + status = http.StatusBadRequest // status is only included on error, so record how far execution got + } else if s.filterRequest(fOpts) { return - } else if err == nil { - err = json.Unmarshal(msg, optz) } - status = http.StatusBadRequest // status is only included on error, so record how far execution got } if err == nil { response["data"], err = respf() diff --git a/server/events_test.go b/server/events_test.go index 5fc6ef4d..2afd0d86 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -1533,48 +1533,59 @@ func TestServerEventsPingStatsZ(t *testing.T) { sa, _, sb, optsB, akp := runTrustedCluster(t) defer sa.Shutdown() defer sb.Shutdown() - url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port) nc, err := nats.Connect(url, createUserCreds(t, sb, akp)) if err != nil { t.Fatalf("Error on connect: %v", err) } defer nc.Close() - - requestTbl := []string{ + test := func(req []byte) { + reply := nc.NewRespInbox() + sub, _ := nc.SubscribeSync(reply) + nc.PublishRequest(serverStatsPingReqSubj, reply, req) + // Make sure its a statsz + m := ServerStatsMsg{} + // Receive both manually. + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + if err := json.Unmarshal(msg.Data, &m); err != nil { + t.Fatalf("Error unmarshalling the statz json: %v", err) + } + msg, err = sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + if err := json.Unmarshal(msg.Data, &m); err != nil { + t.Fatalf("Error unmarshalling the statz json: %v", err) + } + } + strRequestTbl := []string{ `{"cluster":"TEST"}`, `{"cluster":"CLUSTER"}`, - `{"name":"SRV"}`, - `{"name":"_"}`, + `{"server-name":"SRV"}`, + `{"server-name":"_"}`, fmt.Sprintf(`{"host":"%s"}`, optsB.Host), fmt.Sprintf(`{"host":"%s", "cluster":"CLUSTER", "name":"SRV"}`, optsB.Host), } - - for _, msg := range requestTbl { - t.Run(msg, func(t *testing.T) { - reply := nc.NewRespInbox() - sub, _ := nc.SubscribeSync(reply) - - nc.PublishRequest(serverStatsPingReqSubj, reply, []byte(msg)) - - // Make sure its a statsz - m := ServerStatsMsg{} - - // Receive both manually. - msg, err := sub.NextMsg(time.Second) - if err != nil { - t.Fatalf("Error receiving msg: %v", err) - } - if err := json.Unmarshal(msg.Data, &m); err != nil { - t.Fatalf("Error unmarshalling the statz json: %v", err) - } - msg, err = sub.NextMsg(time.Second) - if err != nil { - t.Fatalf("Error receiving msg: %v", err) - } - if err := json.Unmarshal(msg.Data, &m); err != nil { - t.Fatalf("Error unmarshalling the statz json: %v", err) - } + for i, opt := range strRequestTbl { + t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) { + test([]byte(opt)) + }) + } + requestTbl := []StatszEventOptions{ + {EventFilterOptions: EventFilterOptions{Cluster: "TEST"}}, + {EventFilterOptions: EventFilterOptions{Cluster: "CLUSTER"}}, + {EventFilterOptions: EventFilterOptions{Name: "SRV"}}, + {EventFilterOptions: EventFilterOptions{Name: "_"}}, + {EventFilterOptions: EventFilterOptions{Host: optsB.Host}}, + {EventFilterOptions: EventFilterOptions{Host: optsB.Host, Cluster: "CLUSTER", Name: "SRV"}}, + } + for i, opt := range requestTbl { + t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) { + msg, _ := json.MarshalIndent(&opt, "", " ") + test(msg) }) } } @@ -1594,10 +1605,24 @@ func TestServerEventsPingStatsZFilter(t *testing.T) { requestTbl := []string{ `{"cluster":"DOESNOTEXIST"}`, `{"host":"DOESNOTEXIST"}`, - `{"name":"DOESNOTEXIST"}`, + `{"server_name":"DOESNOTEXIST"}`, } - for _, msg := range requestTbl { - t.Run(msg, func(t *testing.T) { + for i, msg := range requestTbl { + t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) { + // Receive both manually. + if _, err := nc.Request(serverStatsPingReqSubj, []byte(msg), time.Second/4); err != nats.ErrTimeout { + t.Fatalf("Error, expected timeout: %v", err) + } + }) + } + requestObjTbl := []EventFilterOptions{ + {Cluster: "DOESNOTEXIST"}, + {Host: "DOESNOTEXIST"}, + {Name: "DOESNOTEXIST"}, + } + for i, opt := range requestObjTbl { + t.Run(fmt.Sprintf("%s-%d", t.Name(), i), func(t *testing.T) { + msg, _ := json.MarshalIndent(&opt, "", " ") // Receive both manually. if _, err := nc.Request(serverStatsPingReqSubj, []byte(msg), time.Second/4); err != nats.ErrTimeout { t.Fatalf("Error, expected timeout: %v", err)