Merge pull request #1481 from nats-io/filter_ping

Add filtering by name and cluster to PING events
This commit is contained in:
Ivan Kozlovic
2020-06-18 13:45:07 -06:00
committed by GitHub
5 changed files with 174 additions and 33 deletions

View File

@@ -242,7 +242,7 @@ RESET:
servername := s.info.Name
seqp := &s.sys.seq
js := s.js != nil
var cluster string
cluster := s.info.Cluster
if s.gateway.enabled {
cluster = s.getGatewayName()
}
@@ -832,14 +832,61 @@ func (s *Server) leafNodeConnected(sub *subscription, _ *client, subject, reply
}
}
// Common filter options for system requests STATSZ VARZ SUBSZ CONNZ ROUTEZ GATEWAYZ LEAFZ
type ZFilterOptions struct {
Name string `json:"name"`
Cluster string `json:"cluster"`
Host string `json:"host"`
}
// returns true if the request does NOT apply to this server and can be ignored.
// DO NOT hold the server lock when
func (s *Server) filterRequest(msg []byte) (bool, error) {
if len(msg) == 0 {
return false, nil
}
var fOpts ZFilterOptions
err := json.Unmarshal(msg, &fOpts)
if err != nil {
return false, err
}
if fOpts.Name != "" && !strings.Contains(s.info.Name, fOpts.Name) {
return true, nil
}
if fOpts.Host != "" && !strings.Contains(s.info.Host, fOpts.Host) {
return true, nil
}
if fOpts.Cluster != "" {
s.mu.Lock()
cluster := s.info.Cluster
s.mu.Unlock()
if !strings.Contains(cluster, fOpts.Cluster) {
return true, nil
}
}
return false, nil
}
// statszReq is a request for us to respond with current statz.
func (s *Server) statszReq(sub *subscription, _ *client, subject, reply string, msg []byte) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.eventsEnabled() || reply == _EMPTY_ {
if !s.EventsEnabled() || reply == _EMPTY_ {
return
}
if ignore, err := s.filterRequest(msg); err != nil {
server := &ServerInfo{}
response := map[string]interface{}{"server": server}
response["error"] = map[string]interface{}{
"code": http.StatusBadRequest,
"description": err.Error(),
}
s.sendInternalMsgLocked(reply, _EMPTY_, server, response)
return
} else if ignore {
return
}
s.mu.Lock()
s.sendStatsz(reply)
s.mu.Unlock()
}
func (s *Server) zReq(reply string, msg []byte, optz interface{}, respf func() (interface{}, error)) {
@@ -851,7 +898,12 @@ func (s *Server) zReq(reply string, msg []byte, optz interface{}, respf func() (
var err error
status := 0
if len(msg) != 0 {
err = json.Unmarshal(msg, optz)
filter := false
if filter, err = s.filterRequest(msg); filter {
return
} else if err == nil {
err = json.Unmarshal(msg, optz)
}
status = http.StatusBadRequest // status is only included on error, so record how far execution got
}
if err == nil {

View File

@@ -100,7 +100,7 @@ func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkey
optsA.TrustedKeys = []string{pub}
optsA.AccountResolver = mr
optsA.SystemAccount = apub
optsA.ServerName = "A"
optsA.ServerName = "A_SRV"
// Add in dummy gateway
optsA.Gateway.Name = "TEST CLUSTER 22"
optsA.Gateway.Host = "127.0.0.1"
@@ -110,7 +110,7 @@ func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkey
sa := RunServer(optsA)
optsB := nextServerOpts(optsA)
optsB.ServerName = "B"
optsB.ServerName = "B_SRV"
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port))
sb := RunServer(optsB)
@@ -1452,7 +1452,7 @@ func TestServerEventsStatsZ(t *testing.T) {
if lr := len(m3.Stats.Routes); lr != 1 {
t.Fatalf("Expected a route, but got %d", lr)
}
if sr := m3.Stats.Routes[0]; sr.Name != "B" {
if sr := m3.Stats.Routes[0]; sr.Name != "B_SRV" {
t.Fatalf("Expected server A's route to B to have Name set to %q, got %q", "B", sr.Name)
}
@@ -1470,8 +1470,8 @@ func TestServerEventsStatsZ(t *testing.T) {
if lr := len(m.Stats.Routes); lr != 1 {
t.Fatalf("Expected a route, but got %d", lr)
}
if sr := m.Stats.Routes[0]; sr.Name != "A" {
t.Fatalf("Expected server B's route to A to have Name set to %q, got %q", "A", sr.Name)
if sr := m.Stats.Routes[0]; sr.Name != "A_SRV" {
t.Fatalf("Expected server B's route to A to have Name set to %q, got %q", "A_SRV", sr.Name)
}
}
@@ -1487,28 +1487,94 @@ func TestServerEventsPingStatsZ(t *testing.T) {
}
defer nc.Close()
reply := nc.NewRespInbox()
sub, _ := nc.SubscribeSync(reply)
requestTbl := []string{
`{"cluster":"TEST"}`,
`{"cluster":"CLUSTER"}`,
`{"name":"SRV"}`,
`{"name":"_"}`,
fmt.Sprintf(`{"host":"%s"}`, optsB.Host),
fmt.Sprintf(`{"host":"%s", "cluster":"CLUSTER", "name":"SRV"}`, optsB.Host),
}
nc.PublishRequest(serverStatsPingReqSubj, reply, nil)
for _, msg := range requestTbl {
t.Run(msg, func(t *testing.T) {
reply := nc.NewRespInbox()
sub, _ := nc.SubscribeSync(reply)
// Make sure its a statsz
m := ServerStatsMsg{}
nc.PublishRequest(serverStatsPingReqSubj, reply, []byte(msg))
// Make sure its a statsz
m := ServerStatsMsg{}
// Receive both manually.
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if err := json.Unmarshal(msg.Data, &m); err != nil {
t.Fatalf("Error unmarshalling the statz json: %v", err)
}
msg, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if err := json.Unmarshal(msg.Data, &m); err != nil {
t.Fatalf("Error unmarshalling the statz json: %v", err)
}
})
}
}
func TestServerEventsPingStatsZFilter(t *testing.T) {
sa, _, sb, optsB, akp := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
nc, err := nats.Connect(url, createUserCreds(t, sb, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
requestTbl := []string{
`{"cluster":"DOESNOTEXIST"}`,
`{"host":"DOESNOTEXIST"}`,
`{"name":"DOESNOTEXIST"}`,
}
for _, msg := range requestTbl {
t.Run(msg, func(t *testing.T) {
// Receive both manually.
if _, err := nc.Request(serverStatsPingReqSubj, []byte(msg), time.Second/4); err != nats.ErrTimeout {
t.Fatalf("Error, expected timeout: %v", err)
}
})
}
}
func TestServerEventsPingStatsZFailFilter(t *testing.T) {
sa, _, sb, optsB, akp := runTrustedCluster(t)
defer sa.Shutdown()
defer sb.Shutdown()
url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
nc, err := nats.Connect(url, createUserCreds(t, sb, akp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
// Receive both manually.
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if err := json.Unmarshal(msg.Data, &m); err != nil {
t.Fatalf("Error unmarshalling the statz json: %v", err)
}
msg, err = sub.NextMsg(time.Second)
if err != nil {
t.Fatalf("Error receiving msg: %v", err)
}
if err := json.Unmarshal(msg.Data, &m); err != nil {
t.Fatalf("Error unmarshalling the statz json: %v", err)
if msg, err := nc.Request(serverStatsPingReqSubj, []byte(`{MALFORMEDJSON`), time.Second/4); err != nil {
t.Fatalf("Error: %v", err)
} else {
resp := make(map[string]map[string]interface{})
if err := json.Unmarshal(msg.Data, &resp); err != nil {
t.Fatalf("Error unmarshalling the response json: %v", err)
}
if resp["error"]["code"].(float64) != http.StatusBadRequest {
t.Fatal("bad error code")
}
}
}
@@ -1566,6 +1632,17 @@ func TestServerEventsPingMonitorz(t *testing.T) {
[]string{"now", "outbound_gateways", "inbound_gateways"}},
{"LEAFZ", &LeafzOptions{Subscriptions: true}, &Leafz{},
[]string{"now", "leafs"}},
{"ROUTEZ", json.RawMessage(`{"cluster":""}`), &Routez{},
[]string{"now", "routes"}},
{"ROUTEZ", json.RawMessage(`{"name":""}`), &Routez{},
[]string{"now", "routes"}},
{"ROUTEZ", json.RawMessage(`{"cluster":"TEST CLUSTER 22"}`), &Routez{},
[]string{"now", "routes"}},
{"ROUTEZ", json.RawMessage(`{"cluster":"CLUSTER"}`), &Routez{},
[]string{"now", "routes"}},
{"ROUTEZ", json.RawMessage(`{"cluster":"TEST CLUSTER 22", "subscriptions":true}`), &Routez{},
[]string{"now", "routes"}},
}
for i, test := range tests {
@@ -1595,10 +1672,10 @@ func TestServerEventsPingMonitorz(t *testing.T) {
}
serverName := ""
if response1["server"]["name"] == "A" {
serverName = "B"
} else if response1["server"]["name"] == "B" {
serverName = "A"
if response1["server"]["name"] == "A_SRV" {
serverName = "B_SRV"
} else if response1["server"]["name"] == "B_SRV" {
serverName = "A_SRV"
} else {
t.Fatalf("Error finding server in %s", string(msg.Data))
}

View File

@@ -1047,6 +1047,7 @@ type JetStreamVarz struct {
// ClusterOptsVarz contains monitoring cluster information
type ClusterOptsVarz struct {
Name string `json:"name,omitempty"`
Host string `json:"addr,omitempty"`
Port int `json:"cluster_port,omitempty"`
AuthTimeout float64 `json:"auth_timeout,omitempty"`
@@ -1197,6 +1198,7 @@ func (s *Server) createVarz(pcpu float64, rss int64) *Varz {
HTTPBasePath: opts.HTTPBasePath,
HTTPSPort: opts.HTTPSPort,
Cluster: ClusterOptsVarz{
Name: info.Cluster,
Host: c.Host,
Port: c.Port,
AuthTimeout: c.AuthTimeout,

View File

@@ -2415,6 +2415,7 @@ func TestMonitorCluster(t *testing.T) {
defer s.Shutdown()
expected := ClusterOptsVarz{
"A",
opts.Cluster.Host,
opts.Cluster.Port,
opts.Cluster.AuthTimeout,
@@ -2434,11 +2435,12 @@ func TestMonitorCluster(t *testing.T) {
// Having this here to make sure that if fields are added in ClusterOptsVarz,
// we make sure to update this test (compiler will report an error if we don't)
_ = ClusterOptsVarz{"", 0, 0, nil}
_ = ClusterOptsVarz{"", "", 0, 0, nil}
// Alter the fields to make sure that we have a proper deep copy
// of what may be stored in the server. Anything we change here
// should not affect the next returned value.
v.Cluster.Name = "wrong"
v.Cluster.Host = "wrong"
v.Cluster.Port = 0
v.Cluster.AuthTimeout = 0

View File

@@ -427,9 +427,17 @@ func (s *Server) ClusterName() string {
// setClusterName will update the cluster name for this server.
func (s *Server) setClusterName(name string) {
s.mu.Lock()
var resetCh chan struct{}
if s.sys != nil && s.info.Cluster != name {
// can't hold the lock as go routine reading it may be waiting for lock as well
resetCh = s.sys.resetCh
}
s.info.Cluster = name
s.routeInfo.Cluster = name
s.mu.Unlock()
if resetCh != nil {
resetCh <- struct{}{}
}
s.Noticef("Cluster name updated to %s", name)
}