mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Add in user info requests to have connected users get info for bound account and permissions.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1567,9 +1567,14 @@ func (a *Account) checkStreamImportsForCycles(to string, visited map[string]bool
|
||||
// SetServiceImportSharing will allow sharing of information about requests with the export account.
|
||||
// Used for service latency tracking at the moment.
|
||||
func (a *Account) SetServiceImportSharing(destination *Account, to string, allow bool) error {
|
||||
return a.setServiceImportSharing(destination, to, true, allow)
|
||||
}
|
||||
|
||||
// setServiceImportSharing will allow sharing of information about requests with the export account.
|
||||
func (a *Account) setServiceImportSharing(destination *Account, to string, check, allow bool) error {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
if a.isClaimAccount() {
|
||||
if check && a.isClaimAccount() {
|
||||
return fmt.Errorf("claim based accounts can not be updated directly")
|
||||
}
|
||||
for _, si := range a.imports.services {
|
||||
|
||||
@@ -960,6 +960,61 @@ func (c *client) setPermissions(perms *Permissions) {
|
||||
}
|
||||
}
|
||||
|
||||
// Build public permissions from internal ones.
|
||||
// Used for user info requests.
|
||||
func (c *client) publicPermissions() *Permissions {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.perms == nil {
|
||||
return nil
|
||||
}
|
||||
perms := &Permissions{
|
||||
Publish: &SubjectPermission{},
|
||||
Subscribe: &SubjectPermission{},
|
||||
}
|
||||
|
||||
_subs := [32]*subscription{}
|
||||
|
||||
// Publish
|
||||
if c.perms.pub.allow != nil {
|
||||
subs := _subs[:0]
|
||||
c.perms.pub.allow.All(&subs)
|
||||
for _, sub := range subs {
|
||||
perms.Publish.Allow = append(perms.Publish.Allow, string(sub.subject))
|
||||
}
|
||||
}
|
||||
if c.perms.pub.deny != nil {
|
||||
subs := _subs[:0]
|
||||
c.perms.pub.deny.All(&subs)
|
||||
for _, sub := range subs {
|
||||
perms.Publish.Deny = append(perms.Publish.Deny, string(sub.subject))
|
||||
}
|
||||
}
|
||||
// Subsribe
|
||||
if c.perms.sub.allow != nil {
|
||||
subs := _subs[:0]
|
||||
c.perms.sub.allow.All(&subs)
|
||||
for _, sub := range subs {
|
||||
perms.Subscribe.Allow = append(perms.Subscribe.Allow, string(sub.subject))
|
||||
}
|
||||
}
|
||||
if c.perms.sub.deny != nil {
|
||||
subs := _subs[:0]
|
||||
c.perms.sub.deny.All(&subs)
|
||||
for _, sub := range subs {
|
||||
perms.Subscribe.Deny = append(perms.Subscribe.Deny, string(sub.subject))
|
||||
}
|
||||
}
|
||||
// Responses.
|
||||
if c.perms.resp != nil {
|
||||
rp := *c.perms.resp
|
||||
perms.Response = &rp
|
||||
}
|
||||
|
||||
return perms
|
||||
}
|
||||
|
||||
type denyType int
|
||||
|
||||
const (
|
||||
@@ -5302,16 +5357,16 @@ func (c *client) doTLSHandshake(typ string, solicit bool, url *url.URL, tlsConfi
|
||||
// Lock should be held.
|
||||
func (c *client) getRawAuthUser() string {
|
||||
switch {
|
||||
case c.opts.Nkey != "":
|
||||
case c.opts.Nkey != _EMPTY_:
|
||||
return c.opts.Nkey
|
||||
case c.opts.Username != "":
|
||||
case c.opts.Username != _EMPTY_:
|
||||
return c.opts.Username
|
||||
case c.opts.JWT != "":
|
||||
case c.opts.JWT != _EMPTY_:
|
||||
return c.pubKey
|
||||
case c.opts.Token != "":
|
||||
case c.opts.Token != _EMPTY_:
|
||||
return c.opts.Token
|
||||
default:
|
||||
return ""
|
||||
return _EMPTY_
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5319,12 +5374,14 @@ func (c *client) getRawAuthUser() string {
|
||||
// Lock should be held.
|
||||
func (c *client) getAuthUser() string {
|
||||
switch {
|
||||
case c.opts.Nkey != "":
|
||||
case c.opts.Nkey != _EMPTY_:
|
||||
return fmt.Sprintf("Nkey %q", c.opts.Nkey)
|
||||
case c.opts.Username != "":
|
||||
case c.opts.Username != _EMPTY_:
|
||||
return fmt.Sprintf("User %q", c.opts.Username)
|
||||
case c.opts.JWT != "":
|
||||
case c.opts.JWT != _EMPTY_:
|
||||
return fmt.Sprintf("JWT User %q", c.pubKey)
|
||||
case c.opts.Token != _EMPTY_:
|
||||
return fmt.Sprintf("Token %q", c.opts.Token)
|
||||
default:
|
||||
return `User "N/A"`
|
||||
}
|
||||
|
||||
@@ -2605,3 +2605,79 @@ func TestClientAuthRequiredNoAuthUser(t *testing.T) {
|
||||
t.Fatalf("Expected AuthRequired to be false due to 'no_auth_user'")
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientUserInfoReq(t *testing.T) {
|
||||
conf := createConfFile(t, []byte(`
|
||||
listen: 127.0.0.1:-1
|
||||
PERMS = {
|
||||
publish = { allow: "$SYS.REQ.>", deny: "$SYS.REQ.ACCOUNT.>" }
|
||||
subscribe = "_INBOX.>"
|
||||
allow_responses: true
|
||||
}
|
||||
accounts: {
|
||||
A: { users: [ { user: dlc, password: pass, permissions: $PERMS } ] }
|
||||
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
|
||||
}
|
||||
no_auth_user: dlc
|
||||
`))
|
||||
defer removeFile(t, conf)
|
||||
|
||||
s, _ := RunServerWithConfig(conf)
|
||||
defer s.Shutdown()
|
||||
|
||||
nc, err := nats.Connect(s.ClientURL())
|
||||
require_NoError(t, err)
|
||||
defer nc.Close()
|
||||
|
||||
resp, err := nc.Request("$SYS.REQ.USER.INFO", nil, time.Second)
|
||||
require_NoError(t, err)
|
||||
|
||||
response := ServerAPIResponse{Data: &UserInfo{}}
|
||||
err = json.Unmarshal(resp.Data, &response)
|
||||
require_NoError(t, err)
|
||||
|
||||
userInfo := response.Data.(*UserInfo)
|
||||
|
||||
dlc := &UserInfo{
|
||||
UserID: "dlc",
|
||||
Account: "A",
|
||||
Permissions: &Permissions{
|
||||
Publish: &SubjectPermission{
|
||||
Allow: []string{"$SYS.REQ.>"},
|
||||
Deny: []string{"$SYS.REQ.ACCOUNT.>"},
|
||||
},
|
||||
Subscribe: &SubjectPermission{
|
||||
Allow: []string{"_INBOX.>"},
|
||||
},
|
||||
Response: &ResponsePermission{
|
||||
MaxMsgs: DEFAULT_ALLOW_RESPONSE_MAX_MSGS,
|
||||
Expires: DEFAULT_ALLOW_RESPONSE_EXPIRATION,
|
||||
},
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(dlc, userInfo) {
|
||||
t.Fatalf("User info for %q did not match", "dlc")
|
||||
}
|
||||
|
||||
// Make sure system users work ok too.
|
||||
nc, err = nats.Connect(s.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
|
||||
require_NoError(t, err)
|
||||
defer nc.Close()
|
||||
|
||||
resp, err = nc.Request("$SYS.REQ.USER.INFO", nil, time.Second)
|
||||
require_NoError(t, err)
|
||||
|
||||
response = ServerAPIResponse{Data: &UserInfo{}}
|
||||
err = json.Unmarshal(resp.Data, &response)
|
||||
require_NoError(t, err)
|
||||
|
||||
userInfo = response.Data.(*UserInfo)
|
||||
|
||||
admin := &UserInfo{
|
||||
UserID: "admin",
|
||||
Account: "$SYS",
|
||||
}
|
||||
if !reflect.DeepEqual(admin, userInfo) {
|
||||
t.Fatalf("User info for %q did not match", "admin")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,6 +62,10 @@ const (
|
||||
remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s"
|
||||
inboxRespSubj = "$SYS._INBOX.%s.%s"
|
||||
|
||||
// Used to return information to a user on bound account and user permissions.
|
||||
userDirectInfoSubj = "$SYS.REQ.USER.INFO"
|
||||
userDirectReqSubj = "$SYS.REQ.USER.%s.INFO"
|
||||
|
||||
// FIXME(dlc) - Should account scope, even with wc for now, but later on
|
||||
// we can then shard as needed.
|
||||
accNumSubsReqSubj = "$SYS.REQ.ACCOUNT.NSUBS"
|
||||
@@ -1030,6 +1034,13 @@ func (s *Server) initEventTracking() {
|
||||
}
|
||||
}
|
||||
|
||||
// User info.
|
||||
// TODO(dlc) - Can be internal and not forwarded since bound server for the client connection
|
||||
// is only one that will answer. This breaks tests since we still forward on remote server connect.
|
||||
if _, err := s.sysSubscribe(fmt.Sprintf(userDirectReqSubj, "*"), s.userInfoReq); err != nil {
|
||||
s.Errorf("Error setting up internal tracking: %v", err)
|
||||
}
|
||||
|
||||
// For now only the STATZ subject has an account specific ping equivalent.
|
||||
if _, err := s.sysSubscribe(fmt.Sprintf(accPingReqSubj, "STATZ"),
|
||||
func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
|
||||
@@ -1064,6 +1075,38 @@ func (s *Server) initEventTracking() {
|
||||
}
|
||||
}
|
||||
|
||||
// UserInfo returns basic information to a user about bound account and user permissions.
|
||||
// For account information they will need to ping that separately, and this allows security
|
||||
// controls on each subsystem if desired, e.g. account info, jetstream account info, etc.
|
||||
type UserInfo struct {
|
||||
UserID string `json:"user"`
|
||||
Account string `json:"account"`
|
||||
Permissions *Permissions `json:"permissions,omitempty"`
|
||||
}
|
||||
|
||||
// Process a user info request.
|
||||
func (s *Server) userInfoReq(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
|
||||
if !s.EventsEnabled() || reply == _EMPTY_ {
|
||||
return
|
||||
}
|
||||
|
||||
response := &ServerAPIResponse{Server: &ServerInfo{}}
|
||||
|
||||
ci, _, _, _, err := s.getRequestInfo(c, msg)
|
||||
if err != nil {
|
||||
response.Error = &ApiError{Code: http.StatusBadRequest}
|
||||
s.sendInternalResponse(reply, response)
|
||||
return
|
||||
}
|
||||
|
||||
response.Data = &UserInfo{
|
||||
UserID: ci.User,
|
||||
Account: ci.Account,
|
||||
Permissions: c.publicPermissions(),
|
||||
}
|
||||
s.sendInternalResponse(reply, response)
|
||||
}
|
||||
|
||||
// register existing accounts with any system exports.
|
||||
func (s *Server) registerSystemImportsForExisting() {
|
||||
var accounts []*Account
|
||||
@@ -1117,6 +1160,20 @@ func (s *Server) addSystemAccountExports(sacc *Account) {
|
||||
}
|
||||
}
|
||||
|
||||
// User info export.
|
||||
userInfoSubj := fmt.Sprintf(userDirectReqSubj, "*")
|
||||
if !sacc.hasServiceExportMatching(userInfoSubj) {
|
||||
if err := sacc.AddServiceExport(userInfoSubj, nil); err != nil {
|
||||
s.Errorf("Error adding system service export for %q: %v", userInfoSubj, err)
|
||||
}
|
||||
mappedSubj := fmt.Sprintf(userDirectReqSubj, sacc.GetName())
|
||||
if err := sacc.AddServiceImport(sacc, userDirectInfoSubj, mappedSubj); err != nil {
|
||||
s.Errorf("Error setting up system service import %s: %v", mappedSubj, err)
|
||||
}
|
||||
// Make sure to share details.
|
||||
sacc.setServiceImportSharing(sacc, mappedSubj, false, true)
|
||||
}
|
||||
|
||||
// Register any accounts that existed prior.
|
||||
s.registerSystemImportsForExisting()
|
||||
|
||||
@@ -1721,6 +1778,12 @@ func (s *Server) registerSystemImports(a *Account) {
|
||||
importSrvc(fmt.Sprintf(accPingReqSubj, "CONNZ"), mappedConnzSubj)
|
||||
importSrvc(fmt.Sprintf(serverPingReqSubj, "CONNZ"), mappedConnzSubj)
|
||||
importSrvc(fmt.Sprintf(accPingReqSubj, "STATZ"), fmt.Sprintf(accDirectReqSubj, a.Name, "STATZ"))
|
||||
|
||||
// This is for user's looking up their own info.
|
||||
mappedSubject := fmt.Sprintf(userDirectReqSubj, a.Name)
|
||||
importSrvc(userDirectInfoSubj, mappedSubject)
|
||||
// Make sure to share details.
|
||||
a.setServiceImportSharing(sacc, mappedSubject, false, true)
|
||||
}
|
||||
|
||||
// Setup tracking for this account. This allows us to track global account activity.
|
||||
|
||||
Reference in New Issue
Block a user