mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
[adding] kind and client_type to account connect/disconnect events (#2351)
* [adding] kind and client_type to client info. specifically account connect/disconnect events Kind is Client/Leafnode but can take the value of Router/Gateway/JetStream/Account/System in the future. When kind is Client, then client_type is set to mqtt/websocket/nats This fixes #2291 Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
@@ -671,7 +671,7 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) boo
|
||||
acc.mu.RLock()
|
||||
c.Debugf("Authenticated JWT: %s %q (claim-name: %q, claim-tags: %q) "+
|
||||
"signed with %q by Account %q (claim-name: %q, claim-tags: %q) signed with %q",
|
||||
c.typeString(), juc.Subject, juc.Name, juc.Tags, juc.Issuer, issuer, acc.nameTag, acc.tags, acc.Issuer)
|
||||
c.kindString(), juc.Subject, juc.Name, juc.Tags, juc.Issuer, issuer, acc.nameTag, acc.tags, acc.Issuer)
|
||||
acc.mu.RUnlock()
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -474,6 +474,20 @@ func (c *client) clientType() int {
|
||||
}
|
||||
}
|
||||
|
||||
var clientTypeStringMap = map[int]string{
|
||||
NON_CLIENT: _EMPTY_,
|
||||
NATS: "nats",
|
||||
WS: "websocket",
|
||||
MQTT: "mqtt",
|
||||
}
|
||||
|
||||
func (c *client) clientTypeString() string {
|
||||
if typeStringVal, ok := clientTypeStringMap[c.clientType()]; ok {
|
||||
return typeStringVal
|
||||
}
|
||||
return _EMPTY_
|
||||
}
|
||||
|
||||
// This is the main subscription struct that indicates
|
||||
// interest in published messages.
|
||||
// FIXME(dlc) - This is getting bloated for normal subs, need
|
||||
@@ -1493,11 +1507,11 @@ func (c *client) markConnAsClosed(reason ClosedState) {
|
||||
// we use Noticef on create, so use that too for delete.
|
||||
if c.srv != nil {
|
||||
if c.kind == LEAF {
|
||||
c.Noticef("%s connection closed: %s account: %s", c.typeString(), reason, c.acc.traceLabel())
|
||||
c.Noticef("%s connection closed: %s account: %s", c.kindString(), reason, c.acc.traceLabel())
|
||||
} else if c.kind == ROUTER || c.kind == GATEWAY {
|
||||
c.Noticef("%s connection closed: %s", c.typeString(), reason)
|
||||
c.Noticef("%s connection closed: %s", c.kindString(), reason)
|
||||
} else { // Client, System, Jetstream, and Account connections.
|
||||
c.Debugf("%s connection closed: %s", c.typeString(), reason)
|
||||
c.Debugf("%s connection closed: %s", c.kindString(), reason)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4331,7 +4345,7 @@ func (c *client) processPingTimer() {
|
||||
return
|
||||
}
|
||||
|
||||
c.Debugf("%s Ping Timer", c.typeString())
|
||||
c.Debugf("%s Ping Timer", c.kindString())
|
||||
|
||||
var sendPing bool
|
||||
|
||||
@@ -4467,7 +4481,7 @@ func (c *client) flushAndClose(minimalFlush bool) {
|
||||
}
|
||||
}
|
||||
|
||||
var typeStringMap = map[int]string{
|
||||
var kindStringMap = map[int]string{
|
||||
CLIENT: "Client",
|
||||
ROUTER: "Router",
|
||||
GATEWAY: "Gateway",
|
||||
@@ -4477,11 +4491,10 @@ var typeStringMap = map[int]string{
|
||||
SYSTEM: "System",
|
||||
}
|
||||
|
||||
func (c *client) typeString() string {
|
||||
if typeStringVal, ok := typeStringMap[c.kind]; ok {
|
||||
return typeStringVal
|
||||
func (c *client) kindString() string {
|
||||
if kindStringVal, ok := kindStringMap[c.kind]; ok {
|
||||
return kindStringVal
|
||||
}
|
||||
|
||||
return "Unknown Type"
|
||||
}
|
||||
|
||||
@@ -4943,6 +4956,8 @@ func (c *client) getClientInfo(detailed bool) *ClientInfo {
|
||||
ci.IssuerKey = issuerForClient(c)
|
||||
ci.NameTag = c.nameTag
|
||||
ci.Tags = c.tags
|
||||
ci.Kind = c.kindString()
|
||||
ci.ClientType = c.clientTypeString()
|
||||
}
|
||||
c.mu.Unlock()
|
||||
return &ci
|
||||
|
||||
@@ -893,7 +893,7 @@ func TestTypeString(t *testing.T) {
|
||||
}
|
||||
for _, cs := range cases {
|
||||
c := &client{kind: cs.intType}
|
||||
typeStringVal := c.typeString()
|
||||
typeStringVal := c.kindString()
|
||||
|
||||
if typeStringVal != cs.stringType {
|
||||
t.Fatalf("Expected typeString value %q, but instead received %q", cs.stringType, typeStringVal)
|
||||
|
||||
126
server/events.go
126
server/events.go
@@ -53,8 +53,8 @@ const (
|
||||
serverStatsSubj = "$SYS.SERVER.%s.STATSZ"
|
||||
serverDirectReqSubj = "$SYS.REQ.SERVER.%s.%s"
|
||||
serverPingReqSubj = "$SYS.REQ.SERVER.PING.%s"
|
||||
serverStatsPingReqSubj = "$SYS.REQ.SERVER.PING" // use $SYS.REQ.SERVER.PING.STATSZ instead
|
||||
leafNodeConnectEventSubj = "$SYS.ACCOUNT.%s.LEAFNODE.CONNECT"
|
||||
serverStatsPingReqSubj = "$SYS.REQ.SERVER.PING" // use $SYS.REQ.SERVER.PING.STATSZ instead
|
||||
leafNodeConnectEventSubj = "$SYS.ACCOUNT.%s.LEAFNODE.CONNECT" // for internal use only
|
||||
remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s"
|
||||
inboxRespSubj = "$SYS._INBOX.%s.%s"
|
||||
|
||||
@@ -165,23 +165,25 @@ type ServerInfo struct {
|
||||
|
||||
// ClientInfo is detailed information about the client forming a connection.
|
||||
type ClientInfo struct {
|
||||
Start *time.Time `json:"start,omitempty"`
|
||||
Host string `json:"host,omitempty"`
|
||||
ID uint64 `json:"id,omitempty"`
|
||||
Account string `json:"acc"`
|
||||
Service string `json:"svc,omitempty"`
|
||||
User string `json:"user,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Lang string `json:"lang,omitempty"`
|
||||
Version string `json:"ver,omitempty"`
|
||||
RTT time.Duration `json:"rtt,omitempty"`
|
||||
Server string `json:"server,omitempty"`
|
||||
Cluster string `json:"cluster,omitempty"`
|
||||
Stop *time.Time `json:"stop,omitempty"`
|
||||
Jwt string `json:"jwt,omitempty"`
|
||||
IssuerKey string `json:"issuer_key,omitempty"`
|
||||
NameTag string `json:"name_tag,omitempty"`
|
||||
Tags jwt.TagList `json:"tags,omitempty"`
|
||||
Start *time.Time `json:"start,omitempty"`
|
||||
Host string `json:"host,omitempty"`
|
||||
ID uint64 `json:"id,omitempty"`
|
||||
Account string `json:"acc"`
|
||||
Service string `json:"svc,omitempty"`
|
||||
User string `json:"user,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Lang string `json:"lang,omitempty"`
|
||||
Version string `json:"ver,omitempty"`
|
||||
RTT time.Duration `json:"rtt,omitempty"`
|
||||
Server string `json:"server,omitempty"`
|
||||
Cluster string `json:"cluster,omitempty"`
|
||||
Stop *time.Time `json:"stop,omitempty"`
|
||||
Jwt string `json:"jwt,omitempty"`
|
||||
IssuerKey string `json:"issuer_key,omitempty"`
|
||||
NameTag string `json:"name_tag,omitempty"`
|
||||
Tags jwt.TagList `json:"tags,omitempty"`
|
||||
Kind string `json:"kind,omitempty"`
|
||||
ClientType string `json:"client_type,omitempty"`
|
||||
}
|
||||
|
||||
// ServerStats hold various statistics that we will periodically send out.
|
||||
@@ -1440,18 +1442,20 @@ func (s *Server) accountConnectEvent(c *client) {
|
||||
Time: time.Now().UTC(),
|
||||
},
|
||||
Client: ClientInfo{
|
||||
Start: &c.start,
|
||||
Host: c.host,
|
||||
ID: c.cid,
|
||||
Account: accForClient(c),
|
||||
User: c.getRawAuthUser(),
|
||||
Name: c.opts.Name,
|
||||
Lang: c.opts.Lang,
|
||||
Version: c.opts.Version,
|
||||
Jwt: c.opts.JWT,
|
||||
IssuerKey: issuerForClient(c),
|
||||
Tags: c.tags,
|
||||
NameTag: c.nameTag,
|
||||
Start: &c.start,
|
||||
Host: c.host,
|
||||
ID: c.cid,
|
||||
Account: accForClient(c),
|
||||
User: c.getRawAuthUser(),
|
||||
Name: c.opts.Name,
|
||||
Lang: c.opts.Lang,
|
||||
Version: c.opts.Version,
|
||||
Jwt: c.opts.JWT,
|
||||
IssuerKey: issuerForClient(c),
|
||||
Tags: c.tags,
|
||||
NameTag: c.nameTag,
|
||||
Kind: c.kindString(),
|
||||
ClientType: c.clientTypeString(),
|
||||
},
|
||||
}
|
||||
c.mu.Unlock()
|
||||
@@ -1487,20 +1491,22 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
|
||||
Time: now,
|
||||
},
|
||||
Client: ClientInfo{
|
||||
Start: &c.start,
|
||||
Stop: &now,
|
||||
Host: c.host,
|
||||
ID: c.cid,
|
||||
Account: accForClient(c),
|
||||
User: c.getRawAuthUser(),
|
||||
Name: c.opts.Name,
|
||||
Lang: c.opts.Lang,
|
||||
Version: c.opts.Version,
|
||||
RTT: c.getRTT(),
|
||||
Jwt: c.opts.JWT,
|
||||
IssuerKey: issuerForClient(c),
|
||||
Tags: c.tags,
|
||||
NameTag: c.nameTag,
|
||||
Start: &c.start,
|
||||
Stop: &now,
|
||||
Host: c.host,
|
||||
ID: c.cid,
|
||||
Account: accForClient(c),
|
||||
User: c.getRawAuthUser(),
|
||||
Name: c.opts.Name,
|
||||
Lang: c.opts.Lang,
|
||||
Version: c.opts.Version,
|
||||
RTT: c.getRTT(),
|
||||
Jwt: c.opts.JWT,
|
||||
IssuerKey: issuerForClient(c),
|
||||
Tags: c.tags,
|
||||
NameTag: c.nameTag,
|
||||
Kind: c.kindString(),
|
||||
ClientType: c.clientTypeString(),
|
||||
},
|
||||
Sent: DataStats{
|
||||
Msgs: atomic.LoadInt64(&c.inMsgs),
|
||||
@@ -1536,20 +1542,22 @@ func (s *Server) sendAuthErrorEvent(c *client) {
|
||||
Time: now,
|
||||
},
|
||||
Client: ClientInfo{
|
||||
Start: &c.start,
|
||||
Stop: &now,
|
||||
Host: c.host,
|
||||
ID: c.cid,
|
||||
Account: accForClient(c),
|
||||
User: c.getRawAuthUser(),
|
||||
Name: c.opts.Name,
|
||||
Lang: c.opts.Lang,
|
||||
Version: c.opts.Version,
|
||||
RTT: c.getRTT(),
|
||||
Jwt: c.opts.JWT,
|
||||
IssuerKey: issuerForClient(c),
|
||||
Tags: c.tags,
|
||||
NameTag: c.nameTag,
|
||||
Start: &c.start,
|
||||
Stop: &now,
|
||||
Host: c.host,
|
||||
ID: c.cid,
|
||||
Account: accForClient(c),
|
||||
User: c.getRawAuthUser(),
|
||||
Name: c.opts.Name,
|
||||
Lang: c.opts.Lang,
|
||||
Version: c.opts.Version,
|
||||
RTT: c.getRTT(),
|
||||
Jwt: c.opts.JWT,
|
||||
IssuerKey: issuerForClient(c),
|
||||
Tags: c.tags,
|
||||
NameTag: c.nameTag,
|
||||
Kind: c.kindString(),
|
||||
ClientType: c.clientTypeString(),
|
||||
},
|
||||
Sent: DataStats{
|
||||
Msgs: c.inMsgs,
|
||||
|
||||
@@ -317,7 +317,7 @@ func (srv *Server) NewOCSPMonitor(config *tlsConfigKind) (*tls.Config, *OCSPMoni
|
||||
certFile string
|
||||
caFile string
|
||||
)
|
||||
if kind == typeStringMap[CLIENT] {
|
||||
if kind == kindStringMap[CLIENT] {
|
||||
tcOpts = opts.tlsConfigOpts
|
||||
if opts.TLSCert != _EMPTY_ {
|
||||
certFile = opts.TLSCert
|
||||
@@ -412,7 +412,7 @@ func (srv *Server) NewOCSPMonitor(config *tlsConfigKind) (*tls.Config, *OCSPMoni
|
||||
|
||||
// Check whether need to verify staples from a client connection depending on the type.
|
||||
switch kind {
|
||||
case typeStringMap[ROUTER], typeStringMap[GATEWAY], typeStringMap[LEAF]:
|
||||
case kindStringMap[ROUTER], kindStringMap[GATEWAY], kindStringMap[LEAF]:
|
||||
tc.VerifyConnection = func(s tls.ConnectionState) error {
|
||||
oresp := s.OCSPResponse
|
||||
if oresp == nil {
|
||||
@@ -491,7 +491,7 @@ func (s *Server) configureOCSP() []*tlsConfigKind {
|
||||
if config := sopts.TLSConfig; config != nil {
|
||||
opts := sopts.tlsConfigOpts
|
||||
o := &tlsConfigKind{
|
||||
kind: typeStringMap[CLIENT],
|
||||
kind: kindStringMap[CLIENT],
|
||||
tlsConfig: config,
|
||||
tlsOpts: opts,
|
||||
apply: func(tc *tls.Config) { sopts.TLSConfig = tc },
|
||||
@@ -501,7 +501,7 @@ func (s *Server) configureOCSP() []*tlsConfigKind {
|
||||
if config := sopts.Cluster.TLSConfig; config != nil {
|
||||
opts := sopts.Cluster.tlsConfigOpts
|
||||
o := &tlsConfigKind{
|
||||
kind: typeStringMap[ROUTER],
|
||||
kind: kindStringMap[ROUTER],
|
||||
tlsConfig: config,
|
||||
tlsOpts: opts,
|
||||
apply: func(tc *tls.Config) { sopts.Cluster.TLSConfig = tc },
|
||||
@@ -511,7 +511,7 @@ func (s *Server) configureOCSP() []*tlsConfigKind {
|
||||
if config := sopts.LeafNode.TLSConfig; config != nil {
|
||||
opts := sopts.LeafNode.tlsConfigOpts
|
||||
o := &tlsConfigKind{
|
||||
kind: typeStringMap[LEAF],
|
||||
kind: kindStringMap[LEAF],
|
||||
tlsConfig: config,
|
||||
tlsOpts: opts,
|
||||
apply: func(tc *tls.Config) {
|
||||
@@ -531,7 +531,7 @@ func (s *Server) configureOCSP() []*tlsConfigKind {
|
||||
opts := remote.tlsConfigOpts
|
||||
if config := remote.TLSConfig; config != nil {
|
||||
o := &tlsConfigKind{
|
||||
kind: typeStringMap[LEAF],
|
||||
kind: kindStringMap[LEAF],
|
||||
tlsConfig: config,
|
||||
tlsOpts: opts,
|
||||
apply: func(tc *tls.Config) {
|
||||
@@ -548,7 +548,7 @@ func (s *Server) configureOCSP() []*tlsConfigKind {
|
||||
if config := sopts.Gateway.TLSConfig; config != nil {
|
||||
opts := sopts.Gateway.tlsConfigOpts
|
||||
o := &tlsConfigKind{
|
||||
kind: typeStringMap[GATEWAY],
|
||||
kind: kindStringMap[GATEWAY],
|
||||
tlsConfig: config,
|
||||
tlsOpts: opts,
|
||||
apply: func(tc *tls.Config) { sopts.Gateway.TLSConfig = tc },
|
||||
@@ -559,7 +559,7 @@ func (s *Server) configureOCSP() []*tlsConfigKind {
|
||||
opts := remote.tlsConfigOpts
|
||||
if config := remote.TLSConfig; config != nil {
|
||||
o := &tlsConfigKind{
|
||||
kind: typeStringMap[GATEWAY],
|
||||
kind: kindStringMap[GATEWAY],
|
||||
tlsConfig: config,
|
||||
tlsOpts: opts,
|
||||
apply: func(tc *tls.Config) {
|
||||
|
||||
@@ -1190,7 +1190,7 @@ authErr:
|
||||
parseErr:
|
||||
c.sendErr("Unknown Protocol Operation")
|
||||
snip := protoSnippet(i, PROTO_SNIPPET_SIZE, buf)
|
||||
err := fmt.Errorf("%s parser ERROR, state=%d, i=%d: proto='%s...'", c.typeString(), c.state, i, snip)
|
||||
err := fmt.Errorf("%s parser ERROR, state=%d, i=%d: proto='%s...'", c.kindString(), c.state, i, snip)
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user