Merge branch 'main' into dev

This commit is contained in:
Derek Collison
2023-05-12 21:04:51 -07:00
5 changed files with 170 additions and 18 deletions

View File

@@ -5471,7 +5471,15 @@ func (c *client) doTLSHandshake(typ string, solicit bool, url *url.URL, tlsConfi
return false, err
}
// getRAwAuthUser returns the raw auth user for the client.
// getRawAuthUserLock returns the raw auth user for the client.
// Will acquire the client lock.
func (c *client) getRawAuthUserLock() string {
c.mu.Lock()
defer c.mu.Unlock()
return c.getRawAuthUser()
}
// getRawAuthUser returns the raw auth user for the client.
// Lock should be held.
func (c *client) getRawAuthUser() string {
switch {

View File

@@ -139,6 +139,9 @@ type ConnInfo struct {
NameTag string `json:"name_tag,omitempty"`
Tags jwt.TagList `json:"tags,omitempty"`
MQTTClient string `json:"mqtt_client,omitempty"` // This is the MQTT client id
// Internal
rtt int64 // For fast sorting
}
// TLSPeerCert contains basic information about a TLS peer certificate
@@ -192,9 +195,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
if opts != nil {
// If no sort option given or sort is by uptime, then sort by cid
if opts.Sort == _EMPTY_ {
sortOpt = ByCid
} else {
if opts.Sort != _EMPTY_ {
sortOpt = opts.Sort
if !sortOpt.IsValid() {
return nil, fmt.Errorf("invalid sorting option: %s", sortOpt)
@@ -203,9 +204,6 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
// Auth specifics.
auth = opts.Username
if !auth && (user != _EMPTY_ || acc != _EMPTY_) {
return nil, fmt.Errorf("filter by user or account only allowed with auth option")
}
user = opts.User
acc = opts.Account
mqttCID = opts.MQTTClient
@@ -275,7 +273,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
}
// Walk the open client list with server lock held.
s.mu.Lock()
s.mu.RLock()
// Default to all client unless filled in above.
if clist == nil {
clist = s.clients
@@ -302,9 +300,10 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
if acc != _EMPTY_ && len(closedClients) > 0 {
var ccc []*closedClient
for _, cc := range closedClients {
if cc.acc == acc {
ccc = append(ccc, cc)
if cc.acc != acc {
continue
}
ccc = append(ccc, cc)
}
c.Total -= (len(closedClients) - len(ccc))
closedClients = ccc
@@ -359,7 +358,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
continue
}
// Do user filtering second
if user != _EMPTY_ && client.opts.Username != user {
if user != _EMPTY_ && client.getRawAuthUserLock() != user {
continue
}
// Do mqtt client ID filtering next
@@ -370,7 +369,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
}
}
}
s.mu.Unlock()
s.mu.RUnlock()
// Filter by subject now if needed. We do this outside of server lock.
if filter != _EMPTY_ {
@@ -502,6 +501,8 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
sort.Sort(sort.Reverse(byStop{pconns}))
case ByReason:
sort.Sort(byReason{pconns})
case ByRTT:
sort.Sort(sort.Reverse(byRTT{pconns}))
}
minoff := c.Offset
@@ -531,6 +532,10 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
// Fills in the ConnInfo from the client.
// client should be locked.
func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time, auth bool) {
// For fast sort if required.
rtt := client.getRTT()
ci.rtt = int64(rtt)
ci.Cid = client.cid
ci.MQTTClient = client.getMQTTClientID()
ci.Kind = client.kindString()
@@ -539,7 +544,7 @@ func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time, auth bool)
ci.LastActivity = client.last
ci.Uptime = myUptime(now.Sub(client.start))
ci.Idle = myUptime(now.Sub(client.last))
ci.RTT = client.getRTT().String()
ci.RTT = rtt.String()
ci.OutMsgs = client.outMsgs
ci.OutBytes = client.outBytes
ci.NumSubs = uint32(len(client.subs))
@@ -588,7 +593,7 @@ func (c *client) getRTT() time.Duration {
if c.rtt == 0 {
// If a real client, go ahead and send ping now to get a value
// for RTT. For tests and telnet, or if client is closing, etc skip.
if c.opts.Lang != "" {
if c.opts.Lang != _EMPTY_ {
c.sendRTTPingLocked()
}
return 0

View File

@@ -45,7 +45,7 @@ const (
ByUptime SortOpt = "uptime" // By the amount of time connections exist
ByStop SortOpt = "stop" // By the stop time for a closed connection
ByReason SortOpt = "reason" // By the reason for a closed connection
ByRTT SortOpt = "rtt" // By the round trip time
)
// Individual sort options provide the Less for sort.Interface. Len and Swap are on cList.
@@ -139,10 +139,15 @@ func (l byReason) Less(i, j int) bool {
return l.ConnInfos[i].Reason < l.ConnInfos[j].Reason
}
// RTT - Default is descending
type byRTT struct{ ConnInfos }
func (l byRTT) Less(i, j int) bool { return l.ConnInfos[i].rtt < l.ConnInfos[j].rtt }
// IsValid determines if a sort option is valid
func (s SortOpt) IsValid() bool {
switch s {
case "", ByCid, ByStart, BySubs, ByPending, ByOutMsgs, ByInMsgs, ByOutBytes, ByInBytes, ByLast, ByIdle, ByUptime, ByStop, ByReason:
case _EMPTY_, ByCid, ByStart, BySubs, ByPending, ByOutMsgs, ByInMsgs, ByOutBytes, ByInBytes, ByLast, ByIdle, ByUptime, ByStop, ByReason, ByRTT:
return true
default:
return false

View File

@@ -1,4 +1,4 @@
// Copyright 2013-2022 The NATS Authors
// Copyright 2013-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -4851,3 +4851,137 @@ func TestMonitorRoutePerAccount(t *testing.T) {
}
}
}
func TestMonitorConnzOperatorModeFilterByUser(t *testing.T) {
accKp, accPub := createKey(t)
accClaim := jwt.NewAccountClaims(accPub)
accJwt := encodeClaim(t, accClaim, accPub)
conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
http: 127.0.0.1:-1
operator = %s
resolver = MEMORY
resolver_preload = {
%s : %s
}
`, ojwt, accPub, accJwt)))
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()
createUser := func() (string, string) {
ukp, _ := nkeys.CreateUser()
seed, _ := ukp.Seed()
upub, _ := ukp.PublicKey()
uclaim := newJWTTestUserClaims()
uclaim.Subject = upub
ujwt, err := uclaim.Encode(accKp)
require_NoError(t, err)
return upub, genCredsFile(t, ujwt, seed)
}
// Now create 2 users.
aUser, aCreds := createUser()
bUser, bCreds := createUser()
var users []*nats.Conn
// Create 2 for A
for i := 0; i < 2; i++ {
nc, err := nats.Connect(s.ClientURL(), nats.UserCredentials(aCreds))
require_NoError(t, err)
defer nc.Close()
users = append(users, nc)
}
// Create 5 for B
for i := 0; i < 5; i++ {
nc, err := nats.Connect(s.ClientURL(), nats.UserCredentials(bCreds))
require_NoError(t, err)
defer nc.Close()
users = append(users, nc)
}
// Test A
connz := pollConz(t, s, 1, _EMPTY_, &ConnzOptions{User: aUser, Username: true})
require_True(t, connz.NumConns == 2)
for _, ci := range connz.Conns {
require_True(t, ci.AuthorizedUser == aUser)
}
// Test B
connz = pollConz(t, s, 1, _EMPTY_, &ConnzOptions{User: bUser, Username: true})
require_True(t, connz.NumConns == 5)
for _, ci := range connz.Conns {
require_True(t, ci.AuthorizedUser == bUser)
}
// Make sure URL access is the same.
url := fmt.Sprintf("http://127.0.0.1:%d/", s.MonitorAddr().Port)
urlFull := url + fmt.Sprintf("connz?auth=true&user=%s", aUser)
connz = pollConz(t, s, 0, urlFull, nil)
require_True(t, connz.NumConns == 2)
for _, ci := range connz.Conns {
require_True(t, ci.AuthorizedUser == aUser)
}
// Now test closed filtering as well.
for _, nc := range users {
nc.Close()
}
// Let them process and be moved to closed ring buffer in server.
time.Sleep(100 * time.Millisecond)
connz = pollConz(t, s, 1, _EMPTY_, &ConnzOptions{User: aUser, Username: true, State: ConnClosed})
require_True(t, connz.NumConns == 2)
for _, ci := range connz.Conns {
require_True(t, ci.AuthorizedUser == aUser)
}
}
func TestMonitorConnzSortByRTT(t *testing.T) {
s := runMonitorServer()
defer s.Shutdown()
for i := 0; i < 10; i++ {
nc, err := nats.Connect(s.ClientURL())
require_NoError(t, err)
defer nc.Close()
}
connz := pollConz(t, s, 1, _EMPTY_, &ConnzOptions{Sort: ByRTT})
require_True(t, connz.NumConns == 10)
var rtt int64
for _, ci := range connz.Conns {
if rtt == 0 {
rtt = ci.rtt
} else {
if ci.rtt > rtt {
t.Fatalf("RTT not in descending order: %v vs %v",
time.Duration(rtt), time.Duration(ci.rtt))
}
rtt = ci.rtt
}
}
// Make sure url works as well.
url := fmt.Sprintf("http://127.0.0.1:%d/connz?sort=rtt", s.MonitorAddr().Port)
connz = pollConz(t, s, 0, url, nil)
require_True(t, connz.NumConns == 10)
rtt = 0
for _, ci := range connz.Conns {
crttd, err := time.ParseDuration(ci.RTT)
require_NoError(t, err)
crtt := int64(crttd)
if rtt == 0 {
rtt = crtt
} else {
if crtt > rtt {
t.Fatalf("RTT not in descending order: %v vs %v",
time.Duration(rtt), time.Duration(crtt))
}
rtt = ci.rtt
}
}
}

View File

@@ -3076,7 +3076,7 @@ func (s *Server) saveClosedClient(c *client, nc net.Conn, reason ClosedState) {
}
}
// Hold user as well.
cc.user = c.opts.Username
cc.user = c.getRawAuthUser()
// Hold account name if not the global account.
if c.acc != nil && c.acc.Name != globalAccountName {
cc.acc = c.acc.Name