diff --git a/server/events.go b/server/events.go index dbb94cfb..5c19e507 100644 --- a/server/events.go +++ b/server/events.go @@ -242,7 +242,7 @@ RESET: servername := s.info.Name seqp := &s.sys.seq js := s.js != nil - var cluster string + cluster := s.info.Cluster if s.gateway.enabled { cluster = s.getGatewayName() } @@ -832,14 +832,61 @@ func (s *Server) leafNodeConnected(sub *subscription, _ *client, subject, reply } } +// Common filter options for system requests STATSZ VARZ SUBSZ CONNZ ROUTEZ GATEWAYZ LEAFZ +type ZFilterOptions struct { + Name string `json:"name"` + Cluster string `json:"cluster"` + Host string `json:"host"` +} + +// returns true if the request does NOT apply to this server and can be ignored. +// DO NOT hold the server lock when +func (s *Server) filterRequest(msg []byte) (bool, error) { + if len(msg) == 0 { + return false, nil + } + var fOpts ZFilterOptions + err := json.Unmarshal(msg, &fOpts) + if err != nil { + return false, err + } + if fOpts.Name != "" && !strings.Contains(s.info.Name, fOpts.Name) { + return true, nil + } + if fOpts.Host != "" && !strings.Contains(s.info.Host, fOpts.Host) { + return true, nil + } + if fOpts.Cluster != "" { + s.mu.Lock() + cluster := s.info.Cluster + s.mu.Unlock() + if !strings.Contains(cluster, fOpts.Cluster) { + return true, nil + } + } + return false, nil +} + // statszReq is a request for us to respond with current statz. func (s *Server) statszReq(sub *subscription, _ *client, subject, reply string, msg []byte) { - s.mu.Lock() - defer s.mu.Unlock() - if !s.eventsEnabled() || reply == _EMPTY_ { + if !s.EventsEnabled() || reply == _EMPTY_ { return } + if ignore, err := s.filterRequest(msg); err != nil { + server := &ServerInfo{} + response := map[string]interface{}{"server": server} + response["error"] = map[string]interface{}{ + "code": http.StatusBadRequest, + "description": err.Error(), + } + s.sendInternalMsgLocked(reply, _EMPTY_, server, response) + return + } else if ignore { + return + } + s.mu.Lock() s.sendStatsz(reply) + s.mu.Unlock() } func (s *Server) zReq(reply string, msg []byte, optz interface{}, respf func() (interface{}, error)) { @@ -851,7 +898,12 @@ func (s *Server) zReq(reply string, msg []byte, optz interface{}, respf func() ( var err error status := 0 if len(msg) != 0 { - err = json.Unmarshal(msg, optz) + filter := false + if filter, err = s.filterRequest(msg); filter { + return + } else if err == nil { + err = json.Unmarshal(msg, optz) + } status = http.StatusBadRequest // status is only included on error, so record how far execution got } if err == nil { diff --git a/server/events_test.go b/server/events_test.go index 774b2f1b..0c72474a 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -100,7 +100,7 @@ func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkey optsA.TrustedKeys = []string{pub} optsA.AccountResolver = mr optsA.SystemAccount = apub - optsA.ServerName = "A" + optsA.ServerName = "A_SRV" // Add in dummy gateway optsA.Gateway.Name = "TEST CLUSTER 22" optsA.Gateway.Host = "127.0.0.1" @@ -110,7 +110,7 @@ func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options, nkey sa := RunServer(optsA) optsB := nextServerOpts(optsA) - optsB.ServerName = "B" + optsB.ServerName = "B_SRV" optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port)) sb := RunServer(optsB) @@ -1452,7 +1452,7 @@ func TestServerEventsStatsZ(t *testing.T) { if lr := len(m3.Stats.Routes); lr != 1 { t.Fatalf("Expected a route, but got %d", lr) } - if sr := m3.Stats.Routes[0]; sr.Name != "B" { + if sr := m3.Stats.Routes[0]; sr.Name != "B_SRV" { t.Fatalf("Expected server A's route to B to have Name set to %q, got %q", "B", sr.Name) } @@ -1470,8 +1470,8 @@ func TestServerEventsStatsZ(t *testing.T) { if lr := len(m.Stats.Routes); lr != 1 { t.Fatalf("Expected a route, but got %d", lr) } - if sr := m.Stats.Routes[0]; sr.Name != "A" { - t.Fatalf("Expected server B's route to A to have Name set to %q, got %q", "A", sr.Name) + if sr := m.Stats.Routes[0]; sr.Name != "A_SRV" { + t.Fatalf("Expected server B's route to A to have Name set to %q, got %q", "A_SRV", sr.Name) } } @@ -1487,28 +1487,94 @@ func TestServerEventsPingStatsZ(t *testing.T) { } defer nc.Close() - reply := nc.NewRespInbox() - sub, _ := nc.SubscribeSync(reply) + requestTbl := []string{ + `{"cluster":"TEST"}`, + `{"cluster":"CLUSTER"}`, + `{"name":"SRV"}`, + `{"name":"_"}`, + fmt.Sprintf(`{"host":"%s"}`, optsB.Host), + fmt.Sprintf(`{"host":"%s", "cluster":"CLUSTER", "name":"SRV"}`, optsB.Host), + } - nc.PublishRequest(serverStatsPingReqSubj, reply, nil) + for _, msg := range requestTbl { + t.Run(msg, func(t *testing.T) { + reply := nc.NewRespInbox() + sub, _ := nc.SubscribeSync(reply) - // Make sure its a statsz - m := ServerStatsMsg{} + nc.PublishRequest(serverStatsPingReqSubj, reply, []byte(msg)) + + // Make sure its a statsz + m := ServerStatsMsg{} + + // Receive both manually. + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + if err := json.Unmarshal(msg.Data, &m); err != nil { + t.Fatalf("Error unmarshalling the statz json: %v", err) + } + msg, err = sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + if err := json.Unmarshal(msg.Data, &m); err != nil { + t.Fatalf("Error unmarshalling the statz json: %v", err) + } + }) + } +} + +func TestServerEventsPingStatsZFilter(t *testing.T) { + sa, _, sb, optsB, akp := runTrustedCluster(t) + defer sa.Shutdown() + defer sb.Shutdown() + + url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port) + nc, err := nats.Connect(url, createUserCreds(t, sb, akp)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() + + requestTbl := []string{ + `{"cluster":"DOESNOTEXIST"}`, + `{"host":"DOESNOTEXIST"}`, + `{"name":"DOESNOTEXIST"}`, + } + for _, msg := range requestTbl { + t.Run(msg, func(t *testing.T) { + // Receive both manually. + if _, err := nc.Request(serverStatsPingReqSubj, []byte(msg), time.Second/4); err != nats.ErrTimeout { + t.Fatalf("Error, expected timeout: %v", err) + } + }) + } +} + +func TestServerEventsPingStatsZFailFilter(t *testing.T) { + sa, _, sb, optsB, akp := runTrustedCluster(t) + defer sa.Shutdown() + defer sb.Shutdown() + + url := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port) + nc, err := nats.Connect(url, createUserCreds(t, sb, akp)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc.Close() // Receive both manually. - msg, err := sub.NextMsg(time.Second) - if err != nil { - t.Fatalf("Error receiving msg: %v", err) - } - if err := json.Unmarshal(msg.Data, &m); err != nil { - t.Fatalf("Error unmarshalling the statz json: %v", err) - } - msg, err = sub.NextMsg(time.Second) - if err != nil { - t.Fatalf("Error receiving msg: %v", err) - } - if err := json.Unmarshal(msg.Data, &m); err != nil { - t.Fatalf("Error unmarshalling the statz json: %v", err) + if msg, err := nc.Request(serverStatsPingReqSubj, []byte(`{MALFORMEDJSON`), time.Second/4); err != nil { + t.Fatalf("Error: %v", err) + } else { + resp := make(map[string]map[string]interface{}) + if err := json.Unmarshal(msg.Data, &resp); err != nil { + t.Fatalf("Error unmarshalling the response json: %v", err) + } + if resp["error"]["code"].(float64) != http.StatusBadRequest { + t.Fatal("bad error code") + } } } @@ -1566,6 +1632,17 @@ func TestServerEventsPingMonitorz(t *testing.T) { []string{"now", "outbound_gateways", "inbound_gateways"}}, {"LEAFZ", &LeafzOptions{Subscriptions: true}, &Leafz{}, []string{"now", "leafs"}}, + + {"ROUTEZ", json.RawMessage(`{"cluster":""}`), &Routez{}, + []string{"now", "routes"}}, + {"ROUTEZ", json.RawMessage(`{"name":""}`), &Routez{}, + []string{"now", "routes"}}, + {"ROUTEZ", json.RawMessage(`{"cluster":"TEST CLUSTER 22"}`), &Routez{}, + []string{"now", "routes"}}, + {"ROUTEZ", json.RawMessage(`{"cluster":"CLUSTER"}`), &Routez{}, + []string{"now", "routes"}}, + {"ROUTEZ", json.RawMessage(`{"cluster":"TEST CLUSTER 22", "subscriptions":true}`), &Routez{}, + []string{"now", "routes"}}, } for i, test := range tests { @@ -1595,10 +1672,10 @@ func TestServerEventsPingMonitorz(t *testing.T) { } serverName := "" - if response1["server"]["name"] == "A" { - serverName = "B" - } else if response1["server"]["name"] == "B" { - serverName = "A" + if response1["server"]["name"] == "A_SRV" { + serverName = "B_SRV" + } else if response1["server"]["name"] == "B_SRV" { + serverName = "A_SRV" } else { t.Fatalf("Error finding server in %s", string(msg.Data)) } diff --git a/server/monitor.go b/server/monitor.go index 81b4c2e1..c4345ab5 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1047,6 +1047,7 @@ type JetStreamVarz struct { // ClusterOptsVarz contains monitoring cluster information type ClusterOptsVarz struct { + Name string `json:"name,omitempty"` Host string `json:"addr,omitempty"` Port int `json:"cluster_port,omitempty"` AuthTimeout float64 `json:"auth_timeout,omitempty"` @@ -1197,6 +1198,7 @@ func (s *Server) createVarz(pcpu float64, rss int64) *Varz { HTTPBasePath: opts.HTTPBasePath, HTTPSPort: opts.HTTPSPort, Cluster: ClusterOptsVarz{ + Name: info.Cluster, Host: c.Host, Port: c.Port, AuthTimeout: c.AuthTimeout, diff --git a/server/monitor_test.go b/server/monitor_test.go index 6d0cc10b..2a5555c0 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -2415,6 +2415,7 @@ func TestMonitorCluster(t *testing.T) { defer s.Shutdown() expected := ClusterOptsVarz{ + "A", opts.Cluster.Host, opts.Cluster.Port, opts.Cluster.AuthTimeout, @@ -2434,11 +2435,12 @@ func TestMonitorCluster(t *testing.T) { // Having this here to make sure that if fields are added in ClusterOptsVarz, // we make sure to update this test (compiler will report an error if we don't) - _ = ClusterOptsVarz{"", 0, 0, nil} + _ = ClusterOptsVarz{"", "", 0, 0, nil} // Alter the fields to make sure that we have a proper deep copy // of what may be stored in the server. Anything we change here // should not affect the next returned value. + v.Cluster.Name = "wrong" v.Cluster.Host = "wrong" v.Cluster.Port = 0 v.Cluster.AuthTimeout = 0 diff --git a/server/server.go b/server/server.go index c7ff96b8..e22514e0 100644 --- a/server/server.go +++ b/server/server.go @@ -427,9 +427,17 @@ func (s *Server) ClusterName() string { // setClusterName will update the cluster name for this server. func (s *Server) setClusterName(name string) { s.mu.Lock() + var resetCh chan struct{} + if s.sys != nil && s.info.Cluster != name { + // can't hold the lock as go routine reading it may be waiting for lock as well + resetCh = s.sys.resetCh + } s.info.Cluster = name s.routeInfo.Cluster = name s.mu.Unlock() + if resetCh != nil { + resetCh <- struct{}{} + } s.Noticef("Cluster name updated to %s", name) }