mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Add in account scoped auth error event. If external auth, supply reason from the callout service.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
// Copyright 2012-2022 The NATS Authors
|
||||
// Copyright 2012-2023 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -581,12 +581,20 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au
|
||||
|
||||
// Check if we have auth callouts enabled at the server level or in the bound account.
|
||||
defer func() {
|
||||
// Default reason
|
||||
reason := AuthenticationViolation.String()
|
||||
// No-op
|
||||
if juc == nil && opts.AuthCallout == nil {
|
||||
if !authorized {
|
||||
s.sendAccountAuthErrorEvent(c, c.acc, reason)
|
||||
}
|
||||
return
|
||||
}
|
||||
// We have a juc defined here, check account.
|
||||
if juc != nil && !acc.hasExternalAuth() {
|
||||
if !authorized {
|
||||
s.sendAccountAuthErrorEvent(c, c.acc, reason)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -608,7 +616,7 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au
|
||||
// If we are here we have an auth callout defined and we have failed auth so far
|
||||
// so we will callout to our auth backend for processing.
|
||||
if !skip {
|
||||
authorized = s.processClientOrLeafCallout(c, opts)
|
||||
authorized, reason = s.processClientOrLeafCallout(c, opts)
|
||||
}
|
||||
// Check if we are authorized and in the auth callout account, and if so add in deny publish permissions for the auth subject.
|
||||
if authorized {
|
||||
@@ -623,6 +631,14 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au
|
||||
c.mergeDenyPermissions(pub, []string{AuthCalloutSubject})
|
||||
}
|
||||
c.mu.Unlock()
|
||||
} else {
|
||||
// If we are here we failed external authorization.
|
||||
// Send an account scoped event. Server config mode acc will be nil,
|
||||
// so lookup the auth callout assigned account, that is where this will be sent.
|
||||
if acc == nil {
|
||||
acc, _ = s.lookupAccount(opts.AuthCallout.Account)
|
||||
}
|
||||
s.sendAccountAuthErrorEvent(c, acc, reason)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -1106,7 +1122,7 @@ func checkClientTLSCertSubject(c *client, fn tlsMapAuthFn) bool {
|
||||
// https://github.com/golang/go/issues/12342
|
||||
dn, err := ldap.FromRawCertSubject(cert.RawSubject)
|
||||
if err == nil {
|
||||
if match, ok := fn("", dn, false); ok {
|
||||
if match, ok := fn(_EMPTY_, dn, false); ok {
|
||||
c.Debugf("Using DistinguishedNameMatch for auth [%q]", match)
|
||||
return true
|
||||
}
|
||||
@@ -1189,22 +1205,22 @@ func (s *Server) isRouterAuthorized(c *client) bool {
|
||||
|
||||
if opts.Cluster.TLSMap || opts.Cluster.TLSCheckKnownURLs {
|
||||
return checkClientTLSCertSubject(c, func(user string, _ *ldap.DN, isDNSAltName bool) (string, bool) {
|
||||
if user == "" {
|
||||
return "", false
|
||||
if user == _EMPTY_ {
|
||||
return _EMPTY_, false
|
||||
}
|
||||
if opts.Cluster.TLSCheckKnownURLs && isDNSAltName {
|
||||
if dnsAltNameMatches(dnsAltNameLabels(user), opts.Routes) {
|
||||
return "", true
|
||||
return _EMPTY_, true
|
||||
}
|
||||
}
|
||||
if opts.Cluster.TLSMap && opts.Cluster.Username == user {
|
||||
return "", true
|
||||
return _EMPTY_, true
|
||||
}
|
||||
return "", false
|
||||
return _EMPTY_, false
|
||||
})
|
||||
}
|
||||
|
||||
if opts.Cluster.Username == "" {
|
||||
if opts.Cluster.Username == _EMPTY_ {
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -1225,25 +1241,25 @@ func (s *Server) isGatewayAuthorized(c *client) bool {
|
||||
// Check whether TLS map is enabled, otherwise use single user/pass.
|
||||
if opts.Gateway.TLSMap || opts.Gateway.TLSCheckKnownURLs {
|
||||
return checkClientTLSCertSubject(c, func(user string, _ *ldap.DN, isDNSAltName bool) (string, bool) {
|
||||
if user == "" {
|
||||
return "", false
|
||||
if user == _EMPTY_ {
|
||||
return _EMPTY_, false
|
||||
}
|
||||
if opts.Gateway.TLSCheckKnownURLs && isDNSAltName {
|
||||
labels := dnsAltNameLabels(user)
|
||||
for _, gw := range opts.Gateway.Gateways {
|
||||
if gw != nil && dnsAltNameMatches(labels, gw.URLs) {
|
||||
return "", true
|
||||
return _EMPTY_, true
|
||||
}
|
||||
}
|
||||
}
|
||||
if opts.Gateway.TLSMap && opts.Gateway.Username == user {
|
||||
return "", true
|
||||
return _EMPTY_, true
|
||||
}
|
||||
return "", false
|
||||
return _EMPTY_, false
|
||||
})
|
||||
}
|
||||
|
||||
if opts.Gateway.Username == "" {
|
||||
if opts.Gateway.Username == _EMPTY_ {
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2022 The NATS Authors
|
||||
// Copyright 2022-2023 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/jwt/v2"
|
||||
@@ -30,7 +31,7 @@ const (
|
||||
)
|
||||
|
||||
// Process a callout on this client's behalf.
|
||||
func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorized bool) {
|
||||
func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorized bool, errStr string) {
|
||||
isOperatorMode := len(opts.TrustedKeys) > 0
|
||||
|
||||
var acc *Account
|
||||
@@ -39,8 +40,9 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize
|
||||
var err error
|
||||
acc, err = s.LookupAccount(aname)
|
||||
if err != nil {
|
||||
s.Warnf("No valid account %q for auth callout request: %v", aname, err)
|
||||
return false
|
||||
errStr = fmt.Sprintf("No valid account %q for auth callout request: %v", aname, err)
|
||||
s.Warnf(errStr)
|
||||
return false, errStr
|
||||
}
|
||||
} else {
|
||||
acc = c.acc
|
||||
@@ -67,15 +69,14 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize
|
||||
pub, _ := ukp.PublicKey()
|
||||
|
||||
reply := s.newRespInbox()
|
||||
respCh := make(chan bool, 1)
|
||||
respCh := make(chan string, 1)
|
||||
|
||||
processReply := func(_ *subscription, rc *client, racc *Account, subject, reply string, rmsg []byte) {
|
||||
_, msg := rc.msgParts(rmsg)
|
||||
// This signals not authorized.
|
||||
// Since this is an account subscription will always have "\r\n".
|
||||
if len(msg) <= LEN_CR_LF {
|
||||
s.Warnf("Auth callout violation: %q on account %q", "no reason supplied", racc.Name)
|
||||
respCh <- false
|
||||
respCh <- fmt.Sprintf("Auth callout violation: %q on account %q", "no reason supplied", racc.Name)
|
||||
return
|
||||
}
|
||||
// Strip trailing CRLF.
|
||||
@@ -86,25 +87,24 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize
|
||||
var err error
|
||||
msg, err = xkp.Open(msg, pubAccXKey)
|
||||
if err != nil {
|
||||
s.Warnf("Error decrypting auth callout response on account %q: %v", racc.Name, err)
|
||||
respCh <- false
|
||||
respCh <- fmt.Sprintf("Error decrypting auth callout response on account %q: %v", racc.Name, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
arc, err := jwt.DecodeAuthorizationResponseClaims(string(msg))
|
||||
if err != nil {
|
||||
s.Warnf("Error decoding auth callout response on account %q: %v", racc.Name, err)
|
||||
respCh <- false
|
||||
respCh <- fmt.Sprintf("Error decoding auth callout response on account %q: %v", racc.Name, err)
|
||||
return
|
||||
}
|
||||
|
||||
// FIXME(dlc) - push error through here.
|
||||
if arc.Error != nil || arc.User == nil {
|
||||
if arc.Error != nil {
|
||||
s.Warnf("Auth callout violation: %q on account %q", arc.Error.Description, racc.Name)
|
||||
respCh <- fmt.Sprintf("Auth callout violation: %q on account %q", arc.Error.Description, racc.Name)
|
||||
} else {
|
||||
respCh <- fmt.Sprintf("Auth callout violation: no user returned on account %q", racc.Name)
|
||||
}
|
||||
respCh <- false
|
||||
return
|
||||
}
|
||||
|
||||
@@ -119,42 +119,40 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize
|
||||
// By default issuer needs to match server config or the requesting account in operator mode.
|
||||
if arc.Issuer != issuer {
|
||||
if !isOperatorMode {
|
||||
s.Warnf("Wrong issuer for auth callout response on account %q, expected %q got %q", racc.Name, issuer, arc.Issuer)
|
||||
respCh <- false
|
||||
respCh <- fmt.Sprintf("Wrong issuer for auth callout response on account %q, expected %q got %q", racc.Name, issuer, arc.Issuer)
|
||||
return
|
||||
} else if !acc.isAllowedAcount(arc.Issuer) {
|
||||
s.Warnf("Account %q not permitted as valid account option for auth callout on %q for account %q", arc.Issuer, issuer, racc.Name)
|
||||
respCh <- false
|
||||
respCh <- fmt.Sprintf("Account %q not permitted as valid account option for auth callout for account %q",
|
||||
arc.Issuer, racc.Name)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Require the response to have pinned the audience to this server.
|
||||
if arc.Audience != s.info.ID {
|
||||
s.Warnf("Wrong server audience received for auth callout response on account %q, expected %q got %q", racc.Name, s.info.ID, arc.Audience)
|
||||
respCh <- false
|
||||
respCh <- fmt.Sprintf("Wrong server audience received for auth callout response on account %q, expected %q got %q",
|
||||
racc.Name, s.info.ID, arc.Audience)
|
||||
return
|
||||
}
|
||||
|
||||
juc := arc.User
|
||||
// Make sure that the user is what we requested.
|
||||
if juc.Subject != pub {
|
||||
s.Warnf("Expected authorized user of %q but got %q on account %q", pub, juc.Subject, racc.Name)
|
||||
respCh <- false
|
||||
respCh <- fmt.Sprintf("Expected authorized user of %q but got %q on account %q", pub, juc.Subject, racc.Name)
|
||||
return
|
||||
}
|
||||
|
||||
allowNow, validFor := validateTimes(juc)
|
||||
if !allowNow {
|
||||
c.Errorf("Outside connect times")
|
||||
respCh <- false
|
||||
respCh <- fmt.Sprintf("Authorized user on account %q outside of valid connect times", racc.Name)
|
||||
return
|
||||
}
|
||||
allowedConnTypes, err := convertAllowedConnectionTypes(juc.AllowedConnectionTypes)
|
||||
if err != nil {
|
||||
c.Debugf("%v", err)
|
||||
if len(allowedConnTypes) == 0 {
|
||||
respCh <- false
|
||||
respCh <- fmt.Sprintf("Authorized user on account %q using invalid connection type", racc.Name)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -164,14 +162,12 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize
|
||||
if aname := juc.Audience; aname != _EMPTY_ {
|
||||
targetAcc, err = s.LookupAccount(aname)
|
||||
if err != nil {
|
||||
s.Warnf("No valid account %q for auth callout response on account %q: %v", aname, racc.Name, err)
|
||||
respCh <- false
|
||||
respCh <- fmt.Sprintf("No valid account %q for auth callout response on account %q: %v", aname, racc.Name, err)
|
||||
return
|
||||
}
|
||||
// In operator mode make sure this account matches the issuer.
|
||||
if isOperatorMode && aname != arc.Issuer {
|
||||
s.Warnf("Account %q does not match issuer %q", aname, juc.Issuer)
|
||||
respCh <- false
|
||||
respCh <- fmt.Sprintf("Account %q does not match issuer %q", aname, juc.Issuer)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -179,8 +175,7 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize
|
||||
// Build internal user and bind to the targeted account.
|
||||
nkuser := buildInternalNkeyUser(juc, allowedConnTypes, targetAcc)
|
||||
if err := c.RegisterNkeyUser(nkuser); err != nil {
|
||||
s.Warnf("Could not register auth callout user: %v", err)
|
||||
respCh <- false
|
||||
respCh <- fmt.Sprintf("Could not register auth callout user: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -198,13 +193,14 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize
|
||||
// Check if we need to set an auth timer if the user jwt expires.
|
||||
c.setExpiration(juc.Claims(), validFor)
|
||||
|
||||
respCh <- true
|
||||
respCh <- _EMPTY_
|
||||
}
|
||||
|
||||
sub, err := acc.subscribeInternal(reply, processReply)
|
||||
if err != nil {
|
||||
s.Warnf("Error setting up reply subscription for auth request: %v", err)
|
||||
return false
|
||||
errStr = fmt.Sprintf("Error setting up reply subscription for auth request: %v", err)
|
||||
s.Warnf(errStr)
|
||||
return false, errStr
|
||||
}
|
||||
defer acc.unsubscribeInternal(sub)
|
||||
|
||||
@@ -283,8 +279,9 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize
|
||||
|
||||
b, err := claim.Encode(s.kp)
|
||||
if err != nil {
|
||||
s.Warnf("Error encoding auth request claim on account %q: %v", acc.Name, err)
|
||||
return false
|
||||
errStr = fmt.Sprintf("Error encoding auth request claim on account %q: %v", acc.Name, err)
|
||||
s.Warnf(errStr)
|
||||
return false, errStr
|
||||
}
|
||||
req := []byte(b)
|
||||
var hdr map[string]string
|
||||
@@ -293,8 +290,9 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize
|
||||
if xkp != nil {
|
||||
req, err = xkp.Seal([]byte(req), pubAccXKey)
|
||||
if err != nil {
|
||||
s.Warnf("Error encrypting auth request claim on account %q: %v", acc.Name, err)
|
||||
return false
|
||||
errStr = fmt.Sprintf("Error encrypting auth request claim on account %q: %v", acc.Name, err)
|
||||
s.Warnf(errStr)
|
||||
return false, errStr
|
||||
}
|
||||
hdr = map[string]string{AuthRequestXKeyHeader: xkey}
|
||||
}
|
||||
@@ -303,12 +301,16 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize
|
||||
s.sendInternalAccountMsgWithReply(acc, AuthCalloutSubject, reply, hdr, req, false)
|
||||
|
||||
select {
|
||||
case authorized = <-respCh:
|
||||
case errStr = <-respCh:
|
||||
if authorized = errStr == _EMPTY_; !authorized {
|
||||
s.Warnf(errStr)
|
||||
}
|
||||
case <-time.After(authTimeout):
|
||||
s.Debugf("Authorization callout response not received in time on account %q", acc.Name)
|
||||
errStr = fmt.Sprintf("Authorization callout response not received in time on account %q", acc.Name)
|
||||
s.Debugf(errStr)
|
||||
}
|
||||
|
||||
return authorized
|
||||
return authorized, errStr
|
||||
}
|
||||
|
||||
// Fill in client information for the request.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright 2022 The NATS Authors
|
||||
// Copyright 2022-2023 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -926,3 +926,75 @@ func TestAuthCalloutAuthUserFailDoesNotInvokeCallout(t *testing.T) {
|
||||
t.Fatalf("Expected callout to not be called")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAuthCalloutAuthErrEvents(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
listen: "127.0.0.1:-1"
|
||||
server_name: A
|
||||
accounts {
|
||||
AUTH { users [ {user: "auth", password: "pwd"} ] }
|
||||
FOO {}
|
||||
BAR {}
|
||||
}
|
||||
authorization {
|
||||
auth_callout {
|
||||
issuer: "ABJHLOVMPA4CI6R5KLNGOB4GSLNIY7IOUPAJC4YFNDLQVIOBYQGUWVLA"
|
||||
account: AUTH
|
||||
auth_users: [ auth ]
|
||||
}
|
||||
}
|
||||
`))
|
||||
defer removeFile(t, conf)
|
||||
s, _ := RunServerWithConfig(conf)
|
||||
defer s.Shutdown()
|
||||
|
||||
nc, err := nats.Connect(s.ClientURL(), nats.UserInfo("auth", "pwd"))
|
||||
require_NoError(t, err)
|
||||
defer nc.Close()
|
||||
|
||||
// This is where the event fires, in this account.
|
||||
sub, err := nc.SubscribeSync(authErrorAccountEventSubj)
|
||||
require_NoError(t, err)
|
||||
|
||||
_, err = nc.Subscribe(AuthCalloutSubject, func(m *nats.Msg) {
|
||||
user, si, _, opts, _ := decodeAuthRequest(t, m.Data)
|
||||
// Allow dlc user and map to the BAZ account.
|
||||
if opts.Username == "dlc" && opts.Password == "zzz" {
|
||||
ujwt := createAuthUser(t, si.ID, user, _EMPTY_, "FOO", nil, 0, nil)
|
||||
m.Respond([]byte(ujwt))
|
||||
} else if opts.Username == "dlc" {
|
||||
errResp := createErrResponse(t, user, si.ID, "WRONG PASSWORD", nil)
|
||||
m.Respond([]byte(errResp))
|
||||
} else {
|
||||
errResp := createErrResponse(t, user, si.ID, "BAD CREDS", nil)
|
||||
m.Respond([]byte(errResp))
|
||||
}
|
||||
})
|
||||
require_NoError(t, err)
|
||||
|
||||
// This one will use callout since not defined in server config.
|
||||
nc, err = nats.Connect(s.ClientURL(), nats.UserInfo("dlc", "zzz"))
|
||||
require_NoError(t, err)
|
||||
nc.Close()
|
||||
checkSubsPending(t, sub, 0)
|
||||
|
||||
checkAuthErrEvent := func(user, pass, reason string) {
|
||||
_, err = nats.Connect(s.ClientURL(), nats.UserInfo(user, pass))
|
||||
require_Error(t, err)
|
||||
|
||||
m, err := sub.NextMsg(time.Second)
|
||||
require_NoError(t, err)
|
||||
|
||||
var dm DisconnectEventMsg
|
||||
err = json.Unmarshal(m.Data, &dm)
|
||||
require_NoError(t, err)
|
||||
|
||||
if !strings.Contains(dm.Reason, reason) {
|
||||
t.Fatalf("Expected %q reason, but got %q", reason, dm.Reason)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
checkAuthErrEvent("dlc", "xxx", "WRONG PASSWORD")
|
||||
checkAuthErrEvent("rip", "abc", "BAD CREDS")
|
||||
}
|
||||
|
||||
104
server/events.go
104
server/events.go
@@ -1,4 +1,4 @@
|
||||
// Copyright 2018-2022 The NATS Authors
|
||||
// Copyright 2018-2023 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
@@ -47,20 +47,21 @@ const (
|
||||
accPingReqSubj = "$SYS.REQ.ACCOUNT.PING.%s" // atm. only used for STATZ and CONNZ import from system account
|
||||
// kept for backward compatibility when using http resolver
|
||||
// this overlaps with the names for events but you'd have to have the operator private key in order to succeed.
|
||||
accUpdateEventSubjOld = "$SYS.ACCOUNT.%s.CLAIMS.UPDATE"
|
||||
accUpdateEventSubjNew = "$SYS.REQ.ACCOUNT.%s.CLAIMS.UPDATE"
|
||||
connsRespSubj = "$SYS._INBOX_.%s"
|
||||
accConnsEventSubjNew = "$SYS.ACCOUNT.%s.SERVER.CONNS"
|
||||
accConnsEventSubjOld = "$SYS.SERVER.ACCOUNT.%s.CONNS" // kept for backward compatibility
|
||||
shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN"
|
||||
authErrorEventSubj = "$SYS.SERVER.%s.CLIENT.AUTH.ERR"
|
||||
serverStatsSubj = "$SYS.SERVER.%s.STATSZ"
|
||||
serverDirectReqSubj = "$SYS.REQ.SERVER.%s.%s"
|
||||
serverPingReqSubj = "$SYS.REQ.SERVER.PING.%s"
|
||||
serverStatsPingReqSubj = "$SYS.REQ.SERVER.PING" // use $SYS.REQ.SERVER.PING.STATSZ instead
|
||||
leafNodeConnectEventSubj = "$SYS.ACCOUNT.%s.LEAFNODE.CONNECT" // for internal use only
|
||||
remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s"
|
||||
inboxRespSubj = "$SYS._INBOX.%s.%s"
|
||||
accUpdateEventSubjOld = "$SYS.ACCOUNT.%s.CLAIMS.UPDATE"
|
||||
accUpdateEventSubjNew = "$SYS.REQ.ACCOUNT.%s.CLAIMS.UPDATE"
|
||||
connsRespSubj = "$SYS._INBOX_.%s"
|
||||
accConnsEventSubjNew = "$SYS.ACCOUNT.%s.SERVER.CONNS"
|
||||
accConnsEventSubjOld = "$SYS.SERVER.ACCOUNT.%s.CONNS" // kept for backward compatibility
|
||||
shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN"
|
||||
authErrorEventSubj = "$SYS.SERVER.%s.CLIENT.AUTH.ERR"
|
||||
authErrorAccountEventSubj = "$SYS.ACCOUNT.CLIENT.AUTH.ERR"
|
||||
serverStatsSubj = "$SYS.SERVER.%s.STATSZ"
|
||||
serverDirectReqSubj = "$SYS.REQ.SERVER.%s.%s"
|
||||
serverPingReqSubj = "$SYS.REQ.SERVER.PING.%s"
|
||||
serverStatsPingReqSubj = "$SYS.REQ.SERVER.PING" // use $SYS.REQ.SERVER.PING.STATSZ instead
|
||||
leafNodeConnectEventSubj = "$SYS.ACCOUNT.%s.LEAFNODE.CONNECT" // for internal use only
|
||||
remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s"
|
||||
inboxRespSubj = "$SYS._INBOX.%s.%s"
|
||||
|
||||
// Used to return information to a user on bound account and user permissions.
|
||||
userDirectInfoSubj = "$SYS.REQ.USER.INFO"
|
||||
@@ -511,6 +512,23 @@ func (s *Server) sendInternalAccountMsgWithReply(a *Account, subject, reply stri
|
||||
return nil
|
||||
}
|
||||
|
||||
// Send system style message to an account scope.
|
||||
func (s *Server) sendInternalAccountSysMsg(a *Account, subj string, si *ServerInfo, msg interface{}) {
|
||||
s.mu.RLock()
|
||||
if s.sys == nil || s.sys.sendq == nil || a == nil {
|
||||
s.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
sendq := s.sys.sendq
|
||||
s.mu.RUnlock()
|
||||
|
||||
a.mu.Lock()
|
||||
c := a.internalClient()
|
||||
a.mu.Unlock()
|
||||
|
||||
sendq.push(newPubMsg(c, subj, _EMPTY_, si, nil, msg, noCompression, false, false))
|
||||
}
|
||||
|
||||
// This will queue up a message to be sent.
|
||||
// Lock should not be held.
|
||||
func (s *Server) sendInternalMsgLocked(subj, rply string, si *ServerInfo, msg interface{}) {
|
||||
@@ -2016,6 +2034,7 @@ func (s *Server) accountDisconnectEvent(c *client, now time.Time, reason string)
|
||||
s.sendInternalMsgLocked(subj, _EMPTY_, &m.Server, &m)
|
||||
}
|
||||
|
||||
// This is the system level event sent to the system account for operators.
|
||||
func (s *Server) sendAuthErrorEvent(c *client) {
|
||||
s.mu.Lock()
|
||||
if !s.eventsEnabled() {
|
||||
@@ -2070,6 +2089,61 @@ func (s *Server) sendAuthErrorEvent(c *client) {
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// This is the account level event sent to the origin account for account owners.
|
||||
func (s *Server) sendAccountAuthErrorEvent(c *client, acc *Account, reason string) {
|
||||
if acc == nil {
|
||||
return
|
||||
}
|
||||
s.mu.Lock()
|
||||
if !s.eventsEnabled() {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
eid := s.nextEventID()
|
||||
s.mu.Unlock()
|
||||
|
||||
now := time.Now().UTC()
|
||||
c.mu.Lock()
|
||||
m := DisconnectEventMsg{
|
||||
TypedEvent: TypedEvent{
|
||||
Type: DisconnectEventMsgType,
|
||||
ID: eid,
|
||||
Time: now,
|
||||
},
|
||||
Client: ClientInfo{
|
||||
Start: &c.start,
|
||||
Stop: &now,
|
||||
Host: c.host,
|
||||
ID: c.cid,
|
||||
Account: acc.Name,
|
||||
User: c.getRawAuthUser(),
|
||||
Name: c.opts.Name,
|
||||
Lang: c.opts.Lang,
|
||||
Version: c.opts.Version,
|
||||
RTT: c.getRTT(),
|
||||
Jwt: c.opts.JWT,
|
||||
IssuerKey: issuerForClient(c),
|
||||
Tags: c.tags,
|
||||
NameTag: c.nameTag,
|
||||
Kind: c.kindString(),
|
||||
ClientType: c.clientTypeString(),
|
||||
MQTTClient: c.getMQTTClientID(),
|
||||
},
|
||||
Sent: DataStats{
|
||||
Msgs: c.inMsgs,
|
||||
Bytes: c.inBytes,
|
||||
},
|
||||
Received: DataStats{
|
||||
Msgs: c.outMsgs,
|
||||
Bytes: c.outBytes,
|
||||
},
|
||||
Reason: reason,
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
s.sendInternalAccountSysMsg(acc, authErrorAccountEventSubj, &m.Server, &m)
|
||||
}
|
||||
|
||||
// Internal message callback.
|
||||
// If the msg is needed past the callback it is required to be copied.
|
||||
// rmsg contains header and the message. use client.msgParts(rmsg) to split them apart
|
||||
|
||||
Reference in New Issue
Block a user