mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Merge pull request #844 from nats-io/sysacc
Internal clients aren't weighed against limits
This commit is contained in:
@@ -41,21 +41,22 @@ type rme struct {
|
||||
// Account are subject namespace definitions. By default no messages are shared between accounts.
|
||||
// You can share via exports and imports of streams and services.
|
||||
type Account struct {
|
||||
Name string
|
||||
Nkey string
|
||||
Issuer string
|
||||
claimJWT string
|
||||
updated time.Time
|
||||
mu sync.RWMutex
|
||||
sl *Sublist
|
||||
etmr *time.Timer
|
||||
ctmr *time.Timer
|
||||
strack map[string]int
|
||||
nrclients int
|
||||
clients map[*client]*client
|
||||
rm map[string]*rme
|
||||
imports importMap
|
||||
exports exportMap
|
||||
Name string
|
||||
Nkey string
|
||||
Issuer string
|
||||
claimJWT string
|
||||
updated time.Time
|
||||
mu sync.RWMutex
|
||||
sl *Sublist
|
||||
etmr *time.Timer
|
||||
ctmr *time.Timer
|
||||
strack map[string]int
|
||||
nrclients int
|
||||
sysclients int
|
||||
clients map[*client]*client
|
||||
rm map[string]*rme
|
||||
imports importMap
|
||||
exports exportMap
|
||||
limits
|
||||
nae int
|
||||
pruning bool
|
||||
@@ -143,7 +144,12 @@ func (a *Account) NumConnections() int {
|
||||
func (a *Account) NumLocalConnections() int {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
return len(a.clients)
|
||||
return a.numLocalConnections()
|
||||
}
|
||||
|
||||
// Do not account for the system accounts.
|
||||
func (a *Account) numLocalConnections() int {
|
||||
return len(a.clients) - a.sysclients
|
||||
}
|
||||
|
||||
// MaxClientsReached returns if we have reached our limit for number of connections.
|
||||
@@ -155,7 +161,7 @@ func (a *Account) MaxTotalConnectionsReached() bool {
|
||||
|
||||
func (a *Account) maxTotalConnectionsReached() bool {
|
||||
if a.mconns != jwt.NoLimit {
|
||||
return len(a.clients)+a.nrclients >= a.mconns
|
||||
return len(a.clients)-a.sysclients+a.nrclients >= a.mconns
|
||||
}
|
||||
return false
|
||||
}
|
||||
@@ -191,6 +197,9 @@ func (a *Account) addClient(c *client) int {
|
||||
if a.clients != nil {
|
||||
a.clients[c] = c
|
||||
}
|
||||
if c.kind == SYSTEM {
|
||||
a.sysclients++
|
||||
}
|
||||
a.mu.Unlock()
|
||||
if c != nil && c.srv != nil && a != c.srv.gacc {
|
||||
c.srv.accConnsUpdate(a)
|
||||
@@ -203,6 +212,9 @@ func (a *Account) removeClient(c *client) int {
|
||||
a.mu.Lock()
|
||||
n := len(a.clients)
|
||||
delete(a.clients, c)
|
||||
if c.kind == SYSTEM {
|
||||
a.sysclients--
|
||||
}
|
||||
a.mu.Unlock()
|
||||
if c != nil && c.srv != nil && a != c.srv.gacc {
|
||||
c.srv.accConnsUpdate(a)
|
||||
|
||||
@@ -426,7 +426,7 @@ func (c *client) registerWithAccount(acc *Account) error {
|
||||
}
|
||||
}
|
||||
// Check if we have a max connections violation
|
||||
if acc.MaxTotalConnectionsReached() {
|
||||
if c.kind == CLIENT && acc.MaxTotalConnectionsReached() {
|
||||
return ErrTooManyAccountConnections
|
||||
}
|
||||
|
||||
@@ -457,7 +457,7 @@ func (c *client) subsAtLimit() bool {
|
||||
// Lock is held on entry.
|
||||
// FIXME(dlc) - Should server be able to override here?
|
||||
func (c *client) applyAccountLimits() {
|
||||
if c.acc == nil {
|
||||
if c.acc == nil || c.kind != CLIENT {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -40,7 +40,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.0.0-beta.1"
|
||||
VERSION = "2.0.0-beta.2"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
@@ -637,7 +637,7 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
|
||||
// Build event with account name and number of local clients.
|
||||
m := accNumConns{
|
||||
Account: a.Name,
|
||||
Conns: len(a.clients),
|
||||
Conns: a.numLocalConnections(),
|
||||
}
|
||||
a.mu.Unlock()
|
||||
|
||||
@@ -645,7 +645,7 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
|
||||
|
||||
// Set timer to fire again unless we are at zero.
|
||||
a.mu.Lock()
|
||||
if len(a.clients) == 0 {
|
||||
if a.numLocalConnections() == 0 {
|
||||
clearTimer(&a.etmr)
|
||||
} else {
|
||||
// Check to see if we have an HB running and update.
|
||||
|
||||
@@ -556,22 +556,34 @@ func TestSystemAccountSystemConnectionLimitsHonored(t *testing.T) {
|
||||
sysAccB := sb.SystemAccount()
|
||||
sb.updateAccountWithClaimJWT(sysAccB, ajwt)
|
||||
|
||||
// Check system here first, with no external it should be zero.
|
||||
sacc := sa.SystemAccount()
|
||||
if nlc := sacc.NumLocalConnections(); nlc != 0 {
|
||||
t.Fatalf("Expected no local connections, got %d", nlc)
|
||||
}
|
||||
|
||||
urlA := fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port)
|
||||
urlB := fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port)
|
||||
|
||||
// Create a user on each server. Break on first failure.
|
||||
tc := 0
|
||||
for {
|
||||
nca1, err := nats.Connect(urlA, createUserCreds(t, sa, sakp))
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
defer nca1.Close()
|
||||
tc++
|
||||
|
||||
ncb1, err := nats.Connect(urlB, createUserCreds(t, sb, sakp))
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
defer ncb1.Close()
|
||||
tc++
|
||||
}
|
||||
if tc != 10 {
|
||||
t.Fatalf("Expected to get 10 external connections, got %d", tc)
|
||||
}
|
||||
|
||||
checkFor(t, 1*time.Second, 50*time.Millisecond, func() error {
|
||||
@@ -581,7 +593,6 @@ func TestSystemAccountSystemConnectionLimitsHonored(t *testing.T) {
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// Test that the remote accounting works when a server is started some time later.
|
||||
@@ -1030,7 +1041,7 @@ func TestServerEventStatsZ(t *testing.T) {
|
||||
if m.Stats.ActiveAccounts != 2 {
|
||||
t.Fatalf("Did not match active accounts of 2, got %d", m.Stats.ActiveAccounts)
|
||||
}
|
||||
if m.Stats.Sent.Msgs != 2 {
|
||||
if m.Stats.Sent.Msgs != 1 {
|
||||
t.Fatalf("Did not match sent msgs of 1, got %d", m.Stats.Sent.Msgs)
|
||||
}
|
||||
if m.Stats.Received.Msgs != 1 {
|
||||
|
||||
@@ -23,13 +23,17 @@ const (
|
||||
nonceLen = 15 // base64.RawURLEncoding.EncodedLen(nonceRawLen)
|
||||
)
|
||||
|
||||
// nonceRequired tells us if we should send a nonce.
|
||||
// Assumes server lock is held
|
||||
func (s *Server) nonceRequired() bool {
|
||||
s.optsMu.RLock()
|
||||
defer s.optsMu.RUnlock()
|
||||
// NonceRequired tells us if we should send a nonce.
|
||||
func (s *Server) NonceRequired() bool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.nonceRequired()
|
||||
}
|
||||
|
||||
return len(s.opts.Nkeys) > 0 || len(s.opts.TrustedKeys) > 0
|
||||
// nonceRequired tells us if we should send a nonce.
|
||||
// Lock should be held on entry.
|
||||
func (s *Server) nonceRequired() bool {
|
||||
return len(s.nkeys) > 0 || len(s.trustedKeys) > 0
|
||||
}
|
||||
|
||||
// Generate a nonce for INFO challenge.
|
||||
|
||||
@@ -240,6 +240,7 @@ func TestOperatorSigningKeys(t *testing.T) {
|
||||
|
||||
func TestOperatorMemResolverPreload(t *testing.T) {
|
||||
s, opts := RunServerWithConfig("./configs/resolver_preload.conf")
|
||||
defer s.Shutdown()
|
||||
|
||||
// Make sure we can look up the account.
|
||||
acc, _ := s.LookupAccount("ADM2CIIL3RWXBA6T2HW3FODNCQQOUJEHHQD6FKCPVAMHDNTTSMO73ROX")
|
||||
@@ -254,3 +255,20 @@ func TestOperatorMemResolverPreload(t *testing.T) {
|
||||
t.Fatalf("System account does not match, wanted %q, got %q", opts.SystemAccount, sacc.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOperatorConfigReloadDoesntKillNonce(t *testing.T) {
|
||||
s, _ := runOperatorServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
if !s.NonceRequired() {
|
||||
t.Fatalf("Error nonce should be required")
|
||||
}
|
||||
|
||||
if err := s.Reload(); err != nil {
|
||||
t.Fatalf("Error on reload: %v", err)
|
||||
}
|
||||
|
||||
if !s.NonceRequired() {
|
||||
t.Fatalf("Error nonce should still be required after reload")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user