mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
[FIXED] Allow user filtering on connz for other user types like nkeys etc. (#4156)
Signed-off-by: Derek Collison <derek@nats.io> Resolves #4149
This commit is contained in:
@@ -5323,20 +5323,28 @@ func (c *client) doTLSHandshake(typ string, solicit bool, url *url.URL, tlsConfi
|
||||
return false, err
|
||||
}
|
||||
|
||||
// getRAwAuthUser returns the raw auth user for the client.
|
||||
// getRawAuthUserLock returns the raw auth user for the client.
|
||||
// Will acquire the client lock.
|
||||
func (c *client) getRawAuthUserLock() string {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.getRawAuthUser()
|
||||
}
|
||||
|
||||
// getRawAuthUser returns the raw auth user for the client.
|
||||
// Lock should be held.
|
||||
func (c *client) getRawAuthUser() string {
|
||||
switch {
|
||||
case c.opts.Nkey != "":
|
||||
case c.opts.Nkey != _EMPTY_:
|
||||
return c.opts.Nkey
|
||||
case c.opts.Username != "":
|
||||
case c.opts.Username != _EMPTY_:
|
||||
return c.opts.Username
|
||||
case c.opts.JWT != "":
|
||||
case c.opts.JWT != _EMPTY_:
|
||||
return c.pubKey
|
||||
case c.opts.Token != "":
|
||||
case c.opts.Token != _EMPTY_:
|
||||
return c.opts.Token
|
||||
default:
|
||||
return ""
|
||||
return _EMPTY_
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5344,11 +5352,11 @@ func (c *client) getRawAuthUser() string {
|
||||
// Lock should be held.
|
||||
func (c *client) getAuthUser() string {
|
||||
switch {
|
||||
case c.opts.Nkey != "":
|
||||
case c.opts.Nkey != _EMPTY_:
|
||||
return fmt.Sprintf("Nkey %q", c.opts.Nkey)
|
||||
case c.opts.Username != "":
|
||||
case c.opts.Username != _EMPTY_:
|
||||
return fmt.Sprintf("User %q", c.opts.Username)
|
||||
case c.opts.JWT != "":
|
||||
case c.opts.JWT != _EMPTY_:
|
||||
return fmt.Sprintf("JWT User %q", c.pubKey)
|
||||
default:
|
||||
return `User "N/A"`
|
||||
|
||||
@@ -201,9 +201,6 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
|
||||
|
||||
// Auth specifics.
|
||||
auth = opts.Username
|
||||
if !auth && (user != _EMPTY_ || acc != _EMPTY_) {
|
||||
return nil, fmt.Errorf("filter by user or account only allowed with auth option")
|
||||
}
|
||||
user = opts.User
|
||||
acc = opts.Account
|
||||
mqttCID = opts.MQTTClient
|
||||
@@ -273,7 +270,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
|
||||
}
|
||||
|
||||
// Walk the open client list with server lock held.
|
||||
s.mu.Lock()
|
||||
s.mu.RLock()
|
||||
// Default to all client unless filled in above.
|
||||
if clist == nil {
|
||||
clist = s.clients
|
||||
@@ -300,9 +297,10 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
|
||||
if acc != _EMPTY_ && len(closedClients) > 0 {
|
||||
var ccc []*closedClient
|
||||
for _, cc := range closedClients {
|
||||
if cc.acc == acc {
|
||||
ccc = append(ccc, cc)
|
||||
if cc.acc != acc {
|
||||
continue
|
||||
}
|
||||
ccc = append(ccc, cc)
|
||||
}
|
||||
c.Total -= (len(closedClients) - len(ccc))
|
||||
closedClients = ccc
|
||||
@@ -357,7 +355,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
|
||||
continue
|
||||
}
|
||||
// Do user filtering second
|
||||
if user != _EMPTY_ && client.opts.Username != user {
|
||||
if user != _EMPTY_ && client.getRawAuthUserLock() != user {
|
||||
continue
|
||||
}
|
||||
// Do mqtt client ID filtering next
|
||||
@@ -368,7 +366,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
s.mu.RUnlock()
|
||||
|
||||
// Filter by subject now if needed. We do this outside of server lock.
|
||||
if filter != _EMPTY_ {
|
||||
|
||||
@@ -4600,3 +4600,89 @@ func TestMonitorWebsocket(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitorConnzOperatorModeFilterByUser(t *testing.T) {
|
||||
accKp, accPub := createKey(t)
|
||||
accClaim := jwt.NewAccountClaims(accPub)
|
||||
accJwt := encodeClaim(t, accClaim, accPub)
|
||||
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: 127.0.0.1:-1
|
||||
http: 127.0.0.1:-1
|
||||
operator = %s
|
||||
resolver = MEMORY
|
||||
resolver_preload = {
|
||||
%s : %s
|
||||
}
|
||||
`, ojwt, accPub, accJwt)))
|
||||
|
||||
s, _ := RunServerWithConfig(conf)
|
||||
defer s.Shutdown()
|
||||
|
||||
createUser := func() (string, string) {
|
||||
ukp, _ := nkeys.CreateUser()
|
||||
seed, _ := ukp.Seed()
|
||||
upub, _ := ukp.PublicKey()
|
||||
uclaim := newJWTTestUserClaims()
|
||||
uclaim.Subject = upub
|
||||
ujwt, err := uclaim.Encode(accKp)
|
||||
require_NoError(t, err)
|
||||
return upub, genCredsFile(t, ujwt, seed)
|
||||
}
|
||||
|
||||
// Now create 2 users.
|
||||
aUser, aCreds := createUser()
|
||||
bUser, bCreds := createUser()
|
||||
|
||||
var users []*nats.Conn
|
||||
|
||||
// Create 2 for A
|
||||
for i := 0; i < 2; i++ {
|
||||
nc, err := nats.Connect(s.ClientURL(), nats.UserCredentials(aCreds))
|
||||
require_NoError(t, err)
|
||||
defer nc.Close()
|
||||
users = append(users, nc)
|
||||
}
|
||||
// Create 5 for B
|
||||
for i := 0; i < 5; i++ {
|
||||
nc, err := nats.Connect(s.ClientURL(), nats.UserCredentials(bCreds))
|
||||
require_NoError(t, err)
|
||||
defer nc.Close()
|
||||
users = append(users, nc)
|
||||
}
|
||||
|
||||
// Test A
|
||||
connz := pollConz(t, s, 1, _EMPTY_, &ConnzOptions{User: aUser, Username: true})
|
||||
require_True(t, connz.NumConns == 2)
|
||||
for _, ci := range connz.Conns {
|
||||
require_True(t, ci.AuthorizedUser == aUser)
|
||||
}
|
||||
// Test B
|
||||
connz = pollConz(t, s, 1, _EMPTY_, &ConnzOptions{User: bUser, Username: true})
|
||||
require_True(t, connz.NumConns == 5)
|
||||
for _, ci := range connz.Conns {
|
||||
require_True(t, ci.AuthorizedUser == bUser)
|
||||
}
|
||||
|
||||
// Make sure URL access is the same.
|
||||
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
|
||||
urlFull := url + fmt.Sprintf("connz?auth=true&user=%s", aUser)
|
||||
connz = pollConz(t, s, 0, urlFull, nil)
|
||||
require_True(t, connz.NumConns == 2)
|
||||
for _, ci := range connz.Conns {
|
||||
require_True(t, ci.AuthorizedUser == aUser)
|
||||
}
|
||||
|
||||
// Now test closed filtering as well.
|
||||
for _, nc := range users {
|
||||
nc.Close()
|
||||
}
|
||||
// Let them process and be moved to closed ring buffer in server.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
connz = pollConz(t, s, 1, _EMPTY_, &ConnzOptions{User: aUser, Username: true, State: ConnClosed})
|
||||
require_True(t, connz.NumConns == 2)
|
||||
for _, ci := range connz.Conns {
|
||||
require_True(t, ci.AuthorizedUser == aUser)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2767,7 +2767,7 @@ func (s *Server) saveClosedClient(c *client, nc net.Conn, reason ClosedState) {
|
||||
}
|
||||
}
|
||||
// Hold user as well.
|
||||
cc.user = c.opts.Username
|
||||
cc.user = c.getRawAuthUser()
|
||||
// Hold account name if not the global account.
|
||||
if c.acc != nil && c.acc.Name != globalAccountName {
|
||||
cc.acc = c.acc.Name
|
||||
|
||||
Reference in New Issue
Block a user