diff --git a/server/monitor.go b/server/monitor.go index 7f1f7940..01918ab9 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1140,6 +1140,8 @@ type Varz struct { Cluster ClusterOptsVarz `json:"cluster,omitempty"` Gateway GatewayOptsVarz `json:"gateway,omitempty"` LeafNode LeafNodeOptsVarz `json:"leaf,omitempty"` + MQTT MQTTOptsVarz `json:"mqtt,omitempty"` + Websocket WebsocketOptsVarz `json:"websocket,omitempty"` JetStream JetStreamVarz `json:"jetstream,omitempty"` TLSTimeout float64 `json:"tls_timeout"` WriteDeadline time.Duration `json:"write_deadline"` @@ -1236,6 +1238,37 @@ type RemoteLeafOptsVarz struct { Deny *DenyRules `json:"deny,omitempty"` } +// MQTTOptsVarz contains monitoring MQTT information +type MQTTOptsVarz struct { + Host string `json:"host,omitempty"` + Port int `json:"port,omitempty"` + NoAuthUser string `json:"no_auth_user,omitempty"` + AuthTimeout float64 `json:"auth_timeout,omitempty"` + TLSMap bool `json:"tls_map,omitempty"` + TLSTimeout float64 `json:"tls_timeout,omitempty"` + TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"` + JsDomain string `json:"js_domain,omitempty"` + AckWait time.Duration `json:"ack_wait,omitempty"` + MaxAckPending uint16 `json:"max_ack_pending,omitempty"` +} + +// WebsocketOptsVarz contains monitoring websocket information +type WebsocketOptsVarz struct { + Host string `json:"host,omitempty"` + Port int `json:"port,omitempty"` + Advertise string `json:"advertise,omitempty"` + NoAuthUser string `json:"no_auth_user,omitempty"` + JWTCookie string `json:"jwt_cookie,omitempty"` + HandshakeTimeout time.Duration `json:"handshake_timeout,omitempty"` + AuthTimeout float64 `json:"auth_timeout,omitempty"` + NoTLS bool `json:"no_tls,omitempty"` + TLSMap bool `json:"tls_map,omitempty"` + TLSPinnedCerts []string `json:"tls_pinned_certs,omitempty"` + SameOrigin bool `json:"same_origin,omitempty"` + AllowedOrigins []string `json:"allowed_origins,omitempty"` + Compression bool `json:"compression,omitempty"` +} + // VarzOptions are the options passed to Varz(). // Currently, there are no options defined. type VarzOptions struct{} @@ -1379,6 +1412,8 @@ func (s *Server) createVarz(pcpu float64, rss int64) *Varz { c := &opts.Cluster gw := &opts.Gateway ln := &opts.LeafNode + mqtt := &opts.MQTT + ws := &opts.Websocket clustTlsReq := c.TLSConfig != nil gatewayTlsReq := gw.TLSConfig != nil leafTlsReq := ln.TLSConfig != nil @@ -1428,6 +1463,31 @@ func (s *Server) createVarz(pcpu float64, rss int64) *Varz { TLSVerify: leafTlsVerify, Remotes: []RemoteLeafOptsVarz{}, }, + MQTT: MQTTOptsVarz{ + Host: mqtt.Host, + Port: mqtt.Port, + NoAuthUser: mqtt.NoAuthUser, + AuthTimeout: mqtt.AuthTimeout, + TLSMap: mqtt.TLSMap, + TLSTimeout: mqtt.TLSTimeout, + JsDomain: mqtt.JsDomain, + AckWait: mqtt.AckWait, + MaxAckPending: mqtt.MaxAckPending, + }, + Websocket: WebsocketOptsVarz{ + Host: ws.Host, + Port: ws.Port, + Advertise: ws.Advertise, + NoAuthUser: ws.NoAuthUser, + JWTCookie: ws.JWTCookie, + AuthTimeout: ws.AuthTimeout, + NoTLS: ws.NoTLS, + TLSMap: ws.TLSMap, + SameOrigin: ws.SameOrigin, + AllowedOrigins: copyStrings(ws.AllowedOrigins), + Compression: ws.Compression, + HandshakeTimeout: ws.HandshakeTimeout, + }, Start: s.start, MaxSubs: opts.MaxSubs, Cores: numCores, @@ -1515,6 +1575,19 @@ func (s *Server) updateVarzConfigReloadableFields(v *Varz) { if s.sys != nil && s.sys.account != nil { v.SystemAccount = s.sys.account.GetName() } + v.MQTT.TLSPinnedCerts = getPinnedCertsAsSlice(opts.MQTT.TLSPinnedCerts) + v.Websocket.TLSPinnedCerts = getPinnedCertsAsSlice(opts.Websocket.TLSPinnedCerts) +} + +func getPinnedCertsAsSlice(certs PinnedCertSet) []string { + if len(certs) == 0 { + return nil + } + res := make([]string, 0, len(certs)) + for cn := range certs { + res = append(res, cn) + } + return res } // Updates the runtime Varz fields, that is, fields that change during diff --git a/server/monitor_test.go b/server/monitor_test.go index 7f970776..7b49813b 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -4351,3 +4351,101 @@ func TestMonitorReloadTLSConfig(t *testing.T) { t.Fatalf("Error: %v", err) } } + +func TestMonitorMQTT(t *testing.T) { + o := DefaultOptions() + o.HTTPHost = "127.0.0.1" + o.HTTPPort = -1 + o.ServerName = "mqtt_server" + o.Users = []*User{{Username: "someuser"}} + pinnedCerts := make(PinnedCertSet) + pinnedCerts["7f83b1657ff1fc53b92dc18148a1d65dfc2d4b1fa3d677284addd200126d9069"] = struct{}{} + o.MQTT = MQTTOpts{ + Host: "127.0.0.1", + Port: -1, + NoAuthUser: "someuser", + JsDomain: "js", + AuthTimeout: 2.0, + TLSMap: true, + TLSTimeout: 3.0, + TLSPinnedCerts: pinnedCerts, + AckWait: 4 * time.Second, + MaxAckPending: 256, + } + s := RunServer(o) + defer s.Shutdown() + + expected := &MQTTOptsVarz{ + Host: "127.0.0.1", + Port: o.MQTT.Port, + NoAuthUser: "someuser", + JsDomain: "js", + AuthTimeout: 2.0, + TLSMap: true, + TLSTimeout: 3.0, + TLSPinnedCerts: []string{"7f83b1657ff1fc53b92dc18148a1d65dfc2d4b1fa3d677284addd200126d9069"}, + AckWait: 4 * time.Second, + MaxAckPending: 256, + } + url := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port) + for mode := 0; mode < 2; mode++ { + v := pollVarz(t, s, mode, url, nil) + vm := &v.MQTT + if !reflect.DeepEqual(vm, expected) { + t.Fatalf("Expected\n%+v\nGot:\n%+v", expected, vm) + } + } +} + +func TestMonitorWebsocket(t *testing.T) { + o := DefaultOptions() + o.HTTPHost = "127.0.0.1" + o.HTTPPort = -1 + kp, _ := nkeys.FromSeed(oSeed) + pub, _ := kp.PublicKey() + o.TrustedKeys = []string{pub} + o.Users = []*User{{Username: "someuser"}} + pinnedCerts := make(PinnedCertSet) + pinnedCerts["7f83b1657ff1fc53b92dc18148a1d65dfc2d4b1fa3d677284addd200126d9069"] = struct{}{} + o.Websocket = WebsocketOpts{ + Host: "127.0.0.1", + Port: -1, + Advertise: "somehost:8080", + NoAuthUser: "someuser", + JWTCookie: "somecookiename", + AuthTimeout: 2.0, + NoTLS: true, + TLSMap: true, + TLSPinnedCerts: pinnedCerts, + SameOrigin: true, + AllowedOrigins: []string{"origin1", "origin2"}, + Compression: true, + HandshakeTimeout: 4 * time.Second, + } + s := RunServer(o) + defer s.Shutdown() + + expected := &WebsocketOptsVarz{ + Host: "127.0.0.1", + Port: o.Websocket.Port, + Advertise: "somehost:8080", + NoAuthUser: "someuser", + JWTCookie: "somecookiename", + AuthTimeout: 2.0, + NoTLS: true, + TLSMap: true, + TLSPinnedCerts: []string{"7f83b1657ff1fc53b92dc18148a1d65dfc2d4b1fa3d677284addd200126d9069"}, + SameOrigin: true, + AllowedOrigins: []string{"origin1", "origin2"}, + Compression: true, + HandshakeTimeout: 4 * time.Second, + } + url := fmt.Sprintf("http://127.0.0.1:%d/varz", s.MonitorAddr().Port) + for mode := 0; mode < 2; mode++ { + v := pollVarz(t, s, mode, url, nil) + vw := &v.Websocket + if !reflect.DeepEqual(vw, expected) { + t.Fatalf("Expected\n%+v\nGot:\n%+v", expected, vw) + } + } +}