Merge pull request #2996 from nats-io/ws_mqtt_in_varz

[ADDED] Monitoring: MQTT and Websocket blocks in `/varz` endpoint
This commit is contained in:
Ivan Kozlovic
2022-04-04 10:53:19 -06:00
committed by GitHub
2 changed files with 171 additions and 0 deletions

View File

@@ -1140,6 +1140,8 @@ type Varz struct {
Cluster ClusterOptsVarz `json:"cluster,omitempty"`
Gateway GatewayOptsVarz `json:"gateway,omitempty"`
LeafNode LeafNodeOptsVarz `json:"leaf,omitempty"`
MQTT MQTTOptsVarz `json:"mqtt,omitempty"`
Websocket WebsocketOptsVarz `json:"websocket,omitempty"`
JetStream JetStreamVarz `json:"jetstream,omitempty"`
TLSTimeout float64 `json:"tls_timeout"`
WriteDeadline time.Duration `json:"write_deadline"`
@@ -1236,6 +1238,37 @@ type RemoteLeafOptsVarz struct {
Deny *DenyRules `json:"deny,omitempty"`
}
// MQTTOptsVarz contains monitoring MQTT information
type MQTTOptsVarz struct {
Host string `json:"host,omitempty"`
Port int `json:"port,omitempty"`
NoAuthUser string `json:"no_auth_user,omitempty"`
AuthTimeout float64 `json:"auth_timeout,omitempty"`
TLSMap bool `json:"tls_map,omitempty"`
TLSTimeout float64 `json:"tls_timeout,omitempty"`
TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"`
JsDomain string `json:"js_domain,omitempty"`
AckWait time.Duration `json:"ack_wait,omitempty"`
MaxAckPending uint16 `json:"max_ack_pending,omitempty"`
}
// WebsocketOptsVarz contains monitoring websocket information
type WebsocketOptsVarz struct {
Host string `json:"host,omitempty"`
Port int `json:"port,omitempty"`
Advertise string `json:"advertise,omitempty"`
NoAuthUser string `json:"no_auth_user,omitempty"`
JWTCookie string `json:"jwt_cookie,omitempty"`
HandshakeTimeout time.Duration `json:"handshake_timeout,omitempty"`
AuthTimeout float64 `json:"auth_timeout,omitempty"`
NoTLS bool `json:"no_tls,omitempty"`
TLSMap bool `json:"tls_map,omitempty"`
TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"`
SameOrigin bool `json:"same_origin,omitempty"`
AllowedOrigins []string `json:"allowed_origins,omitempty"`
Compression bool `json:"compression,omitempty"`
}
// VarzOptions are the options passed to Varz().
// Currently, there are no options defined.
type VarzOptions struct{}
@@ -1379,6 +1412,8 @@ func (s *Server) createVarz(pcpu float64, rss int64) *Varz {
c := &opts.Cluster
gw := &opts.Gateway
ln := &opts.LeafNode
mqtt := &opts.MQTT
ws := &opts.Websocket
clustTlsReq := c.TLSConfig != nil
gatewayTlsReq := gw.TLSConfig != nil
leafTlsReq := ln.TLSConfig != nil
@@ -1428,6 +1463,31 @@ func (s *Server) createVarz(pcpu float64, rss int64) *Varz {
TLSVerify: leafTlsVerify,
Remotes: []RemoteLeafOptsVarz{},
},
MQTT: MQTTOptsVarz{
Host: mqtt.Host,
Port: mqtt.Port,
NoAuthUser: mqtt.NoAuthUser,
AuthTimeout: mqtt.AuthTimeout,
TLSMap: mqtt.TLSMap,
TLSTimeout: mqtt.TLSTimeout,
JsDomain: mqtt.JsDomain,
AckWait: mqtt.AckWait,
MaxAckPending: mqtt.MaxAckPending,
},
Websocket: WebsocketOptsVarz{
Host: ws.Host,
Port: ws.Port,
Advertise: ws.Advertise,
NoAuthUser: ws.NoAuthUser,
JWTCookie: ws.JWTCookie,
AuthTimeout: ws.AuthTimeout,
NoTLS: ws.NoTLS,
TLSMap: ws.TLSMap,
SameOrigin: ws.SameOrigin,
AllowedOrigins: copyStrings(ws.AllowedOrigins),
Compression: ws.Compression,
HandshakeTimeout: ws.HandshakeTimeout,
},
Start: s.start,
MaxSubs: opts.MaxSubs,
Cores: numCores,
@@ -1515,6 +1575,19 @@ func (s *Server) updateVarzConfigReloadableFields(v *Varz) {
if s.sys != nil && s.sys.account != nil {
v.SystemAccount = s.sys.account.GetName()
}
v.MQTT.TLSPinnedCerts = getPinnedCertsAsSlice(opts.MQTT.TLSPinnedCerts)
v.Websocket.TLSPinnedCerts = getPinnedCertsAsSlice(opts.Websocket.TLSPinnedCerts)
}
func getPinnedCertsAsSlice(certs PinnedCertSet) []string {
if len(certs) == 0 {
return nil
}
res := make([]string, 0, len(certs))
for cn := range certs {
res = append(res, cn)
}
return res
}
// Updates the runtime Varz fields, that is, fields that change during

View File

@@ -4351,3 +4351,101 @@ func TestMonitorReloadTLSConfig(t *testing.T) {
t.Fatalf("Error: %v", err)
}
}
func TestMonitorMQTT(t *testing.T) {
o := DefaultOptions()
o.HTTPHost = "127.0.0.1"
o.HTTPPort = -1
o.ServerName = "mqtt_server"
o.Users = []*User{{Username: "someuser"}}
pinnedCerts := make(PinnedCertSet)
pinnedCerts["7f83b1657ff1fc53b92dc18148a1d65dfc2d4b1fa3d677284addd200126d9069"] = struct{}{}
o.MQTT = MQTTOpts{
Host: "127.0.0.1",
Port: -1,
NoAuthUser: "someuser",
JsDomain: "js",
AuthTimeout: 2.0,
TLSMap: true,
TLSTimeout: 3.0,
TLSPinnedCerts: pinnedCerts,
AckWait: 4 * time.Second,
MaxAckPending: 256,
}
s := RunServer(o)
defer s.Shutdown()
expected := &MQTTOptsVarz{
Host: "127.0.0.1",
Port: o.MQTT.Port,
NoAuthUser: "someuser",
JsDomain: "js",
AuthTimeout: 2.0,
TLSMap: true,
TLSTimeout: 3.0,
TLSPinnedCerts: []string{"7f83b1657ff1fc53b92dc18148a1d65dfc2d4b1fa3d677284addd200126d9069"},
AckWait: 4 * time.Second,
MaxAckPending: 256,
}
url := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port)
for mode := 0; mode < 2; mode++ {
v := pollVarz(t, s, mode, url, nil)
vm := &v.MQTT
if !reflect.DeepEqual(vm, expected) {
t.Fatalf("Expected\n%+v\nGot:\n%+v", expected, vm)
}
}
}
func TestMonitorWebsocket(t *testing.T) {
o := DefaultOptions()
o.HTTPHost = "127.0.0.1"
o.HTTPPort = -1
kp, _ := nkeys.FromSeed(oSeed)
pub, _ := kp.PublicKey()
o.TrustedKeys = []string{pub}
o.Users = []*User{{Username: "someuser"}}
pinnedCerts := make(PinnedCertSet)
pinnedCerts["7f83b1657ff1fc53b92dc18148a1d65dfc2d4b1fa3d677284addd200126d9069"] = struct{}{}
o.Websocket = WebsocketOpts{
Host: "127.0.0.1",
Port: -1,
Advertise: "somehost:8080",
NoAuthUser: "someuser",
JWTCookie: "somecookiename",
AuthTimeout: 2.0,
NoTLS: true,
TLSMap: true,
TLSPinnedCerts: pinnedCerts,
SameOrigin: true,
AllowedOrigins: []string{"origin1", "origin2"},
Compression: true,
HandshakeTimeout: 4 * time.Second,
}
s := RunServer(o)
defer s.Shutdown()
expected := &WebsocketOptsVarz{
Host: "127.0.0.1",
Port: o.Websocket.Port,
Advertise: "somehost:8080",
NoAuthUser: "someuser",
JWTCookie: "somecookiename",
AuthTimeout: 2.0,
NoTLS: true,
TLSMap: true,
TLSPinnedCerts: []string{"7f83b1657ff1fc53b92dc18148a1d65dfc2d4b1fa3d677284addd200126d9069"},
SameOrigin: true,
AllowedOrigins: []string{"origin1", "origin2"},
Compression: true,
HandshakeTimeout: 4 * time.Second,
}
url := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port)
for mode := 0; mode < 2; mode++ {
v := pollVarz(t, s, mode, url, nil)
vw := &v.Websocket
if !reflect.DeepEqual(vw, expected) {
t.Fatalf("Expected\n%+v\nGot:\n%+v", expected, vw)
}
}
}