mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
Merge pull request #971 from nats-io/leaf_limits
Leafnode account based connections limits
This commit is contained in:
@@ -43,9 +43,11 @@ type Account struct {
|
||||
sl *Sublist
|
||||
etmr *time.Timer
|
||||
ctmr *time.Timer
|
||||
strack map[string]int32
|
||||
strack map[string]sconns
|
||||
nrclients int32
|
||||
sysclients int32
|
||||
nleafs int32
|
||||
nrleafs int32
|
||||
clients map[*client]*client
|
||||
rm map[string]int32
|
||||
imports importMap
|
||||
@@ -63,10 +65,17 @@ type limits struct {
|
||||
mpay int32
|
||||
msubs int32
|
||||
mconns int32
|
||||
mleafs int32
|
||||
maxnae int32
|
||||
maxaettl time.Duration
|
||||
}
|
||||
|
||||
// Used to track remote clients and leafnodes per remote server.
|
||||
type sconns struct {
|
||||
conns int32
|
||||
leafs int32
|
||||
}
|
||||
|
||||
// Import stream mapping struct
|
||||
type streamImport struct {
|
||||
acc *Account
|
||||
@@ -110,7 +119,7 @@ func NewAccount(name string) *Account {
|
||||
a := &Account{
|
||||
Name: name,
|
||||
sl: NewSublist(),
|
||||
limits: limits{-1, -1, -1, 0, 0},
|
||||
limits: limits{-1, -1, -1, -1, 0, 0},
|
||||
}
|
||||
return a
|
||||
}
|
||||
@@ -146,10 +155,14 @@ func (a *Account) NumLocalConnections() int {
|
||||
|
||||
// Do not account for the system accounts.
|
||||
func (a *Account) numLocalConnections() int {
|
||||
return len(a.clients) - int(a.sysclients)
|
||||
return len(a.clients) - int(a.sysclients) - int(a.nleafs)
|
||||
}
|
||||
|
||||
// MaxClientsReached returns if we have reached our limit for number of connections.
|
||||
func (a *Account) numLocalLeafNodes() int {
|
||||
return int(a.nleafs)
|
||||
}
|
||||
|
||||
// MaxTotalConnectionsReached returns if we have reached our limit for number of connections.
|
||||
func (a *Account) MaxTotalConnectionsReached() bool {
|
||||
a.mu.RLock()
|
||||
mtc := a.maxTotalConnectionsReached()
|
||||
@@ -168,8 +181,52 @@ func (a *Account) maxTotalConnectionsReached() bool {
|
||||
// wide for total number of active connections.
|
||||
func (a *Account) MaxActiveConnections() int {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
return int(a.mconns)
|
||||
mconns := int(a.mconns)
|
||||
a.mu.RUnlock()
|
||||
return mconns
|
||||
}
|
||||
|
||||
// MaxTotalLeafNodesReached() returns if we have reached our limit for number of leafnodes.
|
||||
func (a *Account) MaxTotalLeafNodesReached() bool {
|
||||
a.mu.RLock()
|
||||
mtc := a.maxTotalLeafNodesReached()
|
||||
a.mu.RUnlock()
|
||||
return mtc
|
||||
}
|
||||
|
||||
func (a *Account) maxTotalLeafNodesReached() bool {
|
||||
if a.mleafs != jwt.NoLimit {
|
||||
return a.nleafs+a.nrleafs >= a.mleafs
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// NumLeafNodes returns the active number of local and remote
|
||||
// leaf node connections.
|
||||
func (a *Account) NumLeafNodes() int {
|
||||
a.mu.RLock()
|
||||
nln := int(a.nleafs + a.nrleafs)
|
||||
a.mu.RUnlock()
|
||||
return nln
|
||||
}
|
||||
|
||||
// NumRemoteLeafNodes returns the active number of remote
|
||||
// leaf node connections.
|
||||
func (a *Account) NumRemoteLeafNodes() int {
|
||||
a.mu.RLock()
|
||||
nrn := int(a.nrleafs)
|
||||
a.mu.RUnlock()
|
||||
return nrn
|
||||
}
|
||||
|
||||
// MaxActiveLeafnodes return the set limit for the account system
|
||||
// wide for total number of leavenode connections.
|
||||
// NOTE: these are tracked separately.
|
||||
func (a *Account) MaxActiveLeafNodes() int {
|
||||
a.mu.RLock()
|
||||
mleafs := int(a.mleafs)
|
||||
a.mu.RUnlock()
|
||||
return mleafs
|
||||
}
|
||||
|
||||
// RoutedSubs returns how many subjects we would send across a route when first
|
||||
@@ -187,7 +244,7 @@ func (a *Account) TotalSubs() int {
|
||||
return int(a.sl.Count())
|
||||
}
|
||||
|
||||
// addClient keeps our accounting of local active clients updated.
|
||||
// addClient keeps our accounting of local active clients or leafnodes updated.
|
||||
// Returns previous total.
|
||||
func (a *Account) addClient(c *client) int {
|
||||
a.mu.Lock()
|
||||
@@ -195,11 +252,16 @@ func (a *Account) addClient(c *client) int {
|
||||
if a.clients != nil {
|
||||
a.clients[c] = c
|
||||
}
|
||||
if c.kind == SYSTEM {
|
||||
a.sysclients++
|
||||
added := n != len(a.clients)
|
||||
if added {
|
||||
if c.kind == SYSTEM {
|
||||
a.sysclients++
|
||||
} else if c.kind == LEAF {
|
||||
a.nleafs++
|
||||
}
|
||||
}
|
||||
a.mu.Unlock()
|
||||
if c != nil && c.srv != nil && a != c.srv.gacc {
|
||||
if c != nil && c.srv != nil && a != c.srv.gacc && added {
|
||||
c.srv.accConnsUpdate(a)
|
||||
}
|
||||
return n
|
||||
@@ -210,11 +272,16 @@ func (a *Account) removeClient(c *client) int {
|
||||
a.mu.Lock()
|
||||
n := len(a.clients)
|
||||
delete(a.clients, c)
|
||||
if c.kind == SYSTEM {
|
||||
a.sysclients--
|
||||
removed := n != len(a.clients)
|
||||
if removed {
|
||||
if c.kind == SYSTEM {
|
||||
a.sysclients--
|
||||
} else if c.kind == LEAF {
|
||||
a.nleafs--
|
||||
}
|
||||
}
|
||||
a.mu.Unlock()
|
||||
if c != nil && c.srv != nil && a != c.srv.gacc {
|
||||
if c != nil && c.srv != nil && a != c.srv.gacc && removed {
|
||||
c.srv.accConnsUpdate(a)
|
||||
}
|
||||
return n
|
||||
@@ -1006,6 +1073,7 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
a.msubs = int32(ac.Limits.Subs)
|
||||
a.mpay = int32(ac.Limits.Payload)
|
||||
a.mconns = int32(ac.Limits.Conn)
|
||||
a.mleafs = int32(ac.Limits.LeafNodeConn)
|
||||
a.mu.Unlock()
|
||||
|
||||
clients := gatherClients()
|
||||
|
||||
@@ -634,10 +634,9 @@ func (s *Server) isLeafNodeAuthorized(c *client) bool {
|
||||
}
|
||||
|
||||
nkey := buildInternalNkeyUser(juc, acc)
|
||||
c.RegisterNkeyUser(nkey)
|
||||
|
||||
// Generate a connect event if we have a system account.
|
||||
// FIXME(dlc) - Make one for leafnodes if we track active connections.
|
||||
if err := c.RegisterNkeyUser(nkey); err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if we need to set an auth timer if the user jwt expires.
|
||||
c.checkExpiration(juc.Claims())
|
||||
|
||||
@@ -444,21 +444,26 @@ func (c *client) registerWithAccount(acc *Account) error {
|
||||
c.srv.decActiveAccounts()
|
||||
}
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
kind := c.kind
|
||||
srv := c.srv
|
||||
c.acc = acc
|
||||
c.applyAccountLimits()
|
||||
c.mu.Unlock()
|
||||
|
||||
// Check if we have a max connections violation
|
||||
if c.kind == CLIENT && acc.MaxTotalConnectionsReached() {
|
||||
if kind == CLIENT && acc.MaxTotalConnectionsReached() {
|
||||
return ErrTooManyAccountConnections
|
||||
} else if kind == LEAF && acc.MaxTotalLeafNodesReached() {
|
||||
return ErrTooManyAccountConnections
|
||||
}
|
||||
|
||||
// Add in new one.
|
||||
if prev := acc.addClient(c); prev == 0 && c.srv != nil {
|
||||
c.srv.incActiveAccounts()
|
||||
if prev := acc.addClient(c); prev == 0 && srv != nil {
|
||||
srv.incActiveAccounts()
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.acc = acc
|
||||
c.applyAccountLimits()
|
||||
c.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -471,7 +476,7 @@ func (c *client) subsAtLimit() bool {
|
||||
// Lock is held on entry.
|
||||
// FIXME(dlc) - Should server be able to override here?
|
||||
func (c *client) applyAccountLimits() {
|
||||
if c.acc == nil || c.kind != CLIENT {
|
||||
if c.acc == nil || (c.kind != CLIENT && c.kind != LEAF) {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -534,12 +539,12 @@ func (c *client) RegisterUser(user *User) {
|
||||
// RegisterNkey allows auth to call back into a new nkey
|
||||
// client with the authenticated user. This is used to map
|
||||
// any permissions into the client and setup accounts.
|
||||
func (c *client) RegisterNkeyUser(user *NkeyUser) {
|
||||
func (c *client) RegisterNkeyUser(user *NkeyUser) error {
|
||||
// Register with proper account and sublist.
|
||||
if user.Account != nil {
|
||||
if err := c.registerWithAccount(user.Account); err != nil {
|
||||
c.reportErrRegisterAccount(user.Account, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -552,10 +557,10 @@ func (c *client) RegisterNkeyUser(user *NkeyUser) {
|
||||
// Reset perms to nil in case client previously had them.
|
||||
c.perms = nil
|
||||
c.mperms = nil
|
||||
return
|
||||
} else {
|
||||
c.setPermissions(user.Permissions)
|
||||
}
|
||||
|
||||
c.setPermissions(user.Permissions)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Initializes client.perms structure.
|
||||
@@ -1123,6 +1128,7 @@ func (c *client) processConnect(arg []byte) error {
|
||||
lang := c.opts.Lang
|
||||
account := c.opts.Account
|
||||
accountNew := c.opts.AccountNew
|
||||
ujwt := c.opts.JWT
|
||||
c.mu.Unlock()
|
||||
|
||||
if srv != nil {
|
||||
@@ -1139,11 +1145,20 @@ func (c *client) processConnect(arg []byte) error {
|
||||
|
||||
// Check for Auth
|
||||
if ok := srv.checkAuthentication(c); !ok {
|
||||
// We may fail here because we reached max limits on an account.
|
||||
if ujwt != "" {
|
||||
c.mu.Lock()
|
||||
acc := c.acc
|
||||
c.mu.Unlock()
|
||||
if acc != nil && acc != srv.gacc {
|
||||
return ErrTooManyAccountConnections
|
||||
}
|
||||
}
|
||||
c.authViolation()
|
||||
return ErrAuthentication
|
||||
}
|
||||
|
||||
// Check for Account designation
|
||||
// Check for Account designation, this section should be only used when there is not a jwt.
|
||||
if account != "" {
|
||||
var acc *Account
|
||||
var wasNew bool
|
||||
@@ -1152,7 +1167,7 @@ func (c *client) processConnect(arg []byte) error {
|
||||
acc, err = srv.LookupAccount(account)
|
||||
if err != nil {
|
||||
c.Errorf(err.Error())
|
||||
c.sendErr("Account Not Found")
|
||||
c.sendErr(ErrMissingAccount.Error())
|
||||
return err
|
||||
} else if accountNew && acc != nil {
|
||||
c.sendErrAndErr(ErrAccountExists.Error())
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/gnatsd/server/pse"
|
||||
"github.com/nats-io/jwt"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -91,6 +92,7 @@ type AccountNumConns struct {
|
||||
Server ServerInfo `json:"server"`
|
||||
Account string `json:"acc"`
|
||||
Conns int `json:"conns"`
|
||||
LeafNodes int `json:"leafnodes"`
|
||||
TotalConns int `json:"total_conns"`
|
||||
}
|
||||
|
||||
@@ -482,7 +484,8 @@ func (s *Server) processRemoteServerShutdown(sid string) {
|
||||
a.mu.Lock()
|
||||
prev := a.strack[sid]
|
||||
delete(a.strack, sid)
|
||||
a.nrclients -= prev
|
||||
a.nrclients -= prev.conns
|
||||
a.nrleafs -= prev.leafs
|
||||
a.mu.Unlock()
|
||||
return true
|
||||
})
|
||||
@@ -625,11 +628,18 @@ func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg
|
||||
s.sys.client.Errorf("Error unmarshalling account connection event message: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// See if we have the account registered, if not drop it.
|
||||
acc, _ := s.lookupAccount(m.Account)
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// check again here if we have been shutdown.
|
||||
if !s.running || !s.eventsEnabled() {
|
||||
return
|
||||
}
|
||||
|
||||
// Double check that this is not us, should never happen, so error if it does.
|
||||
if m.Server.ID == s.info.ID {
|
||||
s.sys.client.Errorf("Processing our own account connection event message: ignored")
|
||||
@@ -642,12 +652,13 @@ func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg
|
||||
// If we are here we have interest in tracking this account. Update our accounting.
|
||||
acc.mu.Lock()
|
||||
if acc.strack == nil {
|
||||
acc.strack = make(map[string]int32)
|
||||
acc.strack = make(map[string]sconns)
|
||||
}
|
||||
// This does not depend on receiving all updates since each one is idempotent.
|
||||
prev := acc.strack[m.Server.ID]
|
||||
acc.strack[m.Server.ID] = int32(m.Conns)
|
||||
acc.nrclients += int32(m.Conns) - prev
|
||||
acc.strack[m.Server.ID] = sconns{conns: int32(m.Conns), leafs: int32(m.LeafNodes)}
|
||||
acc.nrclients += int32(m.Conns) - prev.conns
|
||||
acc.nrleafs += int32(m.LeafNodes) - prev.leafs
|
||||
acc.mu.Unlock()
|
||||
|
||||
s.updateRemoteServer(&m.Server)
|
||||
@@ -675,8 +686,7 @@ func (s *Server) enableAccountTracking(a *Account) {
|
||||
// Lock should NOT be held on entry.
|
||||
func (s *Server) sendLeafNodeConnect(a *Account) {
|
||||
s.mu.Lock()
|
||||
// If we do not have any gateways defined this should also be a no-op.
|
||||
// FIXME(dlc) - if we do accounting for operator limits might have to send regardless.
|
||||
// If we are not in operator mode, or do not have any gateways defined, this should also be a no-op.
|
||||
if a == nil || !s.eventsEnabled() || !s.gateway.enabled {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
@@ -699,21 +709,22 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj string) {
|
||||
if !s.eventsEnabled() || a == nil || a == s.gacc {
|
||||
return
|
||||
}
|
||||
a.mu.Lock()
|
||||
a.mu.RLock()
|
||||
|
||||
// If no limits set, don't update, no need to.
|
||||
if a.mconns == 0 {
|
||||
a.mu.Unlock()
|
||||
if a.mconns == jwt.NoLimit && a.mleafs == jwt.NoLimit {
|
||||
a.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
// Build event with account name and number of local clients.
|
||||
// Build event with account name and number of local clients and leafnodes.
|
||||
m := AccountNumConns{
|
||||
Account: a.Name,
|
||||
Conns: a.numLocalConnections(),
|
||||
TotalConns: len(s.clients),
|
||||
LeafNodes: a.numLocalLeafNodes(),
|
||||
TotalConns: a.numLocalConnections() + a.numLocalLeafNodes(),
|
||||
}
|
||||
a.mu.Unlock()
|
||||
a.mu.RUnlock()
|
||||
|
||||
s.sendInternalMsg(subj, _EMPTY_, &m.Server, &m)
|
||||
|
||||
|
||||
@@ -1204,8 +1204,6 @@ func TestJWTAccountLimitsMaxPayloadButServerOverrides(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: For now this is single server, will change to adapt for network wide.
|
||||
// TODO(dlc) - Make cluster/gateway aware.
|
||||
func TestJWTAccountLimitsMaxConns(t *testing.T) {
|
||||
s := opTrustBasicSetup()
|
||||
defer s.Shutdown()
|
||||
|
||||
@@ -728,7 +728,6 @@ type leafConnectInfo struct {
|
||||
func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) error {
|
||||
// Way to detect clients that incorrectly connect to the route listen
|
||||
// port. Client provided "lang" in the CONNECT protocol while LEAFNODEs don't.
|
||||
|
||||
if lang != "" {
|
||||
c.sendErrAndErr(ErrClientConnectedToLeafNodePort.Error())
|
||||
c.closeConnection(WrongPort)
|
||||
|
||||
@@ -534,8 +534,9 @@ func ProcessCommandLineArgs(cmd *flag.FlagSet) (showVersion bool, showHelp bool,
|
||||
// Protected check on running state
|
||||
func (s *Server) isRunning() bool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.running
|
||||
running := s.running
|
||||
s.mu.Unlock()
|
||||
return running
|
||||
}
|
||||
|
||||
func (s *Server) logPid() error {
|
||||
|
||||
@@ -1541,3 +1541,260 @@ func TestLeafNodeAdvertise(t *testing.T) {
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestLeafNodeConnectionLimitsSingleServer(t *testing.T) {
|
||||
s, opts, conf := runLeafNodeOperatorServer(t)
|
||||
defer os.Remove(conf)
|
||||
defer s.Shutdown()
|
||||
|
||||
// Setup account and a user that will be used by the remote leaf node server.
|
||||
// createAccount automatically registers with resolver etc..
|
||||
acc, akp := createAccount(t, s)
|
||||
|
||||
// Now update with limits for lead node connections.
|
||||
const maxleafs = 2
|
||||
|
||||
apub, _ := akp.PublicKey()
|
||||
nac := jwt.NewAccountClaims(apub)
|
||||
nac.Limits.LeafNodeConn = maxleafs
|
||||
s.UpdateAccountClaims(acc, nac)
|
||||
|
||||
// Make sure we have the limits updated in acc.
|
||||
if mleafs := acc.MaxActiveLeafNodes(); mleafs != maxleafs {
|
||||
t.Fatalf("Expected to have max leafnodes of %d, got %d", maxleafs, mleafs)
|
||||
}
|
||||
|
||||
// Create the user credentials for the leadnode connection.
|
||||
kp, _ := nkeys.CreateUser()
|
||||
pub, _ := kp.PublicKey()
|
||||
nuc := jwt.NewUserClaims(pub)
|
||||
ujwt, err := nuc.Encode(akp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating user JWT: %v", err)
|
||||
}
|
||||
seed, _ := kp.Seed()
|
||||
mycreds := genCredsFile(t, ujwt, seed)
|
||||
defer os.Remove(mycreds)
|
||||
|
||||
checkLFCount := func(n int) {
|
||||
t.Helper()
|
||||
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
||||
if nln := s.NumLeafNodes(); nln != n {
|
||||
return fmt.Errorf("Number of leaf nodes is %d", nln)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
sl, _, lnconf := runSolicitWithCredentials(t, opts, mycreds)
|
||||
defer os.Remove(lnconf)
|
||||
defer sl.Shutdown()
|
||||
checkLFCount(1)
|
||||
|
||||
// Make sure we are accounting properly here.
|
||||
if nln := acc.NumLeafNodes(); nln != 1 {
|
||||
t.Fatalf("Expected 1 leaf node, got %d", nln)
|
||||
}
|
||||
// clients and leafnodes counted together.
|
||||
if nc := acc.NumConnections(); nc != 1 {
|
||||
t.Fatalf("Expected 1 for total connections, got %d", nc)
|
||||
}
|
||||
|
||||
s2, _, lnconf2 := runSolicitWithCredentials(t, opts, mycreds)
|
||||
defer os.Remove(lnconf2)
|
||||
defer s2.Shutdown()
|
||||
checkLFCount(2)
|
||||
|
||||
// Make sure we are accounting properly here.
|
||||
if nln := acc.NumLeafNodes(); nln != 2 {
|
||||
t.Fatalf("Expected 2 leaf nodes, got %d", nln)
|
||||
}
|
||||
// clients and leafnodes counted together.
|
||||
if nc := acc.NumConnections(); nc != 2 {
|
||||
t.Fatalf("Expected 2 total connections, got %d", nc)
|
||||
}
|
||||
s2.Shutdown()
|
||||
checkLFCount(1)
|
||||
|
||||
// Make sure we are accounting properly here.
|
||||
if nln := acc.NumLeafNodes(); nln != 1 {
|
||||
t.Fatalf("Expected 1 leaf node, got %d", nln)
|
||||
}
|
||||
// clients and leafnodes counted together.
|
||||
if nc := acc.NumConnections(); nc != 1 {
|
||||
t.Fatalf("Expected 1 for total connections, got %d", nc)
|
||||
}
|
||||
|
||||
// Now add back the second one as #3.
|
||||
s3, _, lnconf3 := runSolicitWithCredentials(t, opts, mycreds)
|
||||
defer os.Remove(lnconf3)
|
||||
defer s3.Shutdown()
|
||||
checkLFCount(2)
|
||||
|
||||
if nln := acc.NumLeafNodes(); nln != 2 {
|
||||
t.Fatalf("Expected 2 leaf nodes, got %d", nln)
|
||||
}
|
||||
|
||||
// Once we are here we should not be able to create anymore. Limit == 2.
|
||||
s4, _, lnconf4 := runSolicitWithCredentials(t, opts, mycreds)
|
||||
defer os.Remove(lnconf4)
|
||||
defer s4.Shutdown()
|
||||
|
||||
if nln := acc.NumLeafNodes(); nln != 2 {
|
||||
fmt.Printf("Acc is %q\n", acc.Name)
|
||||
t.Fatalf("Expected 2 leaf nodes, got %d", nln)
|
||||
}
|
||||
|
||||
// Make sure s4 has 0 still.
|
||||
if nln := s4.NumLeafNodes(); nln != 0 {
|
||||
t.Fatalf("Expected no leafnodes accounted for in s4, got %d", nln)
|
||||
}
|
||||
|
||||
// Make sure this is still 2.
|
||||
checkLFCount(2)
|
||||
}
|
||||
|
||||
func TestLeafNodeConnectionLimitsCluster(t *testing.T) {
|
||||
content := `
|
||||
port: -1
|
||||
operator = "./configs/nkeys/op.jwt"
|
||||
system_account = "AD2VB6C25DQPEUUQ7KJBUFX2J4ZNVBPOHSCBISC7VFZXVWXZA7VASQZG"
|
||||
resolver = MEMORY
|
||||
cluster {
|
||||
port: -1
|
||||
}
|
||||
leafnodes {
|
||||
listen: "127.0.0.1:-1"
|
||||
}
|
||||
resolver_preload = {
|
||||
AD2VB6C25DQPEUUQ7KJBUFX2J4ZNVBPOHSCBISC7VFZXVWXZA7VASQZG : "eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5In0.eyJqdGkiOiJDSzU1UERKSUlTWU5QWkhLSUpMVURVVTdJT1dINlM3UkE0RUc2TTVGVUQzUEdGQ1RWWlJRIiwiaWF0IjoxNTQzOTU4NjU4LCJpc3MiOiJPQ0FUMzNNVFZVMlZVT0lNR05HVU5YSjY2QUgyUkxTREFGM01VQkNZQVk1UU1JTDY1TlFNNlhRRyIsInN1YiI6IkFEMlZCNkMyNURRUEVVVVE3S0pCVUZYMko0Wk5WQlBPSFNDQklTQzdWRlpYVldYWkE3VkFTUVpHIiwidHlwZSI6ImFjY291bnQiLCJuYXRzIjp7ImxpbWl0cyI6e319fQ.7m1fysYUsBw15Lj88YmYoHxOI4HlOzu6qgP8Zg-1q9mQXUURijuDGVZrtb7gFYRlo-nG9xZyd2ZTRpMA-b0xCQ"
|
||||
}
|
||||
`
|
||||
conf := createConfFile(t, []byte(content))
|
||||
defer os.Remove(conf)
|
||||
s1, s1Opts := RunServerWithConfig(conf)
|
||||
defer s1.Shutdown()
|
||||
|
||||
content = fmt.Sprintf(`
|
||||
port: -1
|
||||
operator = "./configs/nkeys/op.jwt"
|
||||
system_account = "AD2VB6C25DQPEUUQ7KJBUFX2J4ZNVBPOHSCBISC7VFZXVWXZA7VASQZG"
|
||||
resolver = MEMORY
|
||||
cluster {
|
||||
port: -1
|
||||
routes: ["nats://%s:%d"]
|
||||
}
|
||||
leafnodes {
|
||||
listen: "127.0.0.1:-1"
|
||||
}
|
||||
resolver_preload = {
|
||||
AD2VB6C25DQPEUUQ7KJBUFX2J4ZNVBPOHSCBISC7VFZXVWXZA7VASQZG : "eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5In0.eyJqdGkiOiJDSzU1UERKSUlTWU5QWkhLSUpMVURVVTdJT1dINlM3UkE0RUc2TTVGVUQzUEdGQ1RWWlJRIiwiaWF0IjoxNTQzOTU4NjU4LCJpc3MiOiJPQ0FUMzNNVFZVMlZVT0lNR05HVU5YSjY2QUgyUkxTREFGM01VQkNZQVk1UU1JTDY1TlFNNlhRRyIsInN1YiI6IkFEMlZCNkMyNURRUEVVVVE3S0pCVUZYMko0Wk5WQlBPSFNDQklTQzdWRlpYVldYWkE3VkFTUVpHIiwidHlwZSI6ImFjY291bnQiLCJuYXRzIjp7ImxpbWl0cyI6e319fQ.7m1fysYUsBw15Lj88YmYoHxOI4HlOzu6qgP8Zg-1q9mQXUURijuDGVZrtb7gFYRlo-nG9xZyd2ZTRpMA-b0xCQ"
|
||||
}
|
||||
`, s1Opts.Cluster.Host, s1Opts.Cluster.Port)
|
||||
conf = createConfFile(t, []byte(content))
|
||||
s2, s2Opts := RunServerWithConfig(conf)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Setup the two accounts for this server.
|
||||
okp, _ := nkeys.FromSeed(oSeed)
|
||||
|
||||
// Setup account and a user that will be used by the remote leaf node server.
|
||||
// createAccount automatically registers with resolver etc..
|
||||
acc, akp := createAccount(t, s1)
|
||||
|
||||
// Now update with limits for lead node connections.
|
||||
const maxleafs = 10
|
||||
|
||||
apub, _ := akp.PublicKey()
|
||||
nac := jwt.NewAccountClaims(apub)
|
||||
nac.Limits.LeafNodeConn = maxleafs
|
||||
|
||||
ajwt, err := nac.Encode(okp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
if err := s1.AccountResolver().Store(apub, ajwt); err != nil {
|
||||
t.Fatalf("Account Resolver returned an error: %v", err)
|
||||
}
|
||||
s1.UpdateAccountClaims(acc, nac)
|
||||
|
||||
if err := s2.AccountResolver().Store(apub, ajwt); err != nil {
|
||||
t.Fatalf("Account Resolver returned an error: %v", err)
|
||||
}
|
||||
// Make sure that account object registered in S2 is not acc2
|
||||
acc2, err := s2.LookupAccount(acc.Name)
|
||||
if err != nil || acc == acc2 {
|
||||
t.Fatalf("Lookup account error: %v - accounts are same: %v", err, acc == acc2)
|
||||
}
|
||||
|
||||
// Create the user credentials for the leadnode connection.
|
||||
kp, _ := nkeys.CreateUser()
|
||||
pub, _ := kp.PublicKey()
|
||||
nuc := jwt.NewUserClaims(pub)
|
||||
ujwt, err := nuc.Encode(akp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating user JWT: %v", err)
|
||||
}
|
||||
seed, _ := kp.Seed()
|
||||
mycreds := genCredsFile(t, ujwt, seed)
|
||||
defer os.Remove(mycreds)
|
||||
|
||||
loop := maxleafs / 2
|
||||
|
||||
// Now create maxleafs/2 leaf node servers on each operator server.
|
||||
for i := 0; i < loop; i++ {
|
||||
sl1, _, lnconf1 := runSolicitWithCredentials(t, s1Opts, mycreds)
|
||||
defer os.Remove(lnconf1)
|
||||
defer sl1.Shutdown()
|
||||
|
||||
sl2, _, lnconf2 := runSolicitWithCredentials(t, s2Opts, mycreds)
|
||||
defer os.Remove(lnconf2)
|
||||
defer sl2.Shutdown()
|
||||
}
|
||||
|
||||
checkLFCount := func(s *server.Server, n int) {
|
||||
t.Helper()
|
||||
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
||||
if nln := s.NumLeafNodes(); nln != n {
|
||||
return fmt.Errorf("Number of leaf nodes is %d", nln)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
checkLFCount(s1, loop)
|
||||
checkLFCount(s2, loop)
|
||||
|
||||
// Now check that we have the remotes registered. This will prove we are sending
|
||||
// and processing the leaf node connect events properly etc.
|
||||
checkAccRemoteLFCount := func(acc *server.Account, n int) {
|
||||
t.Helper()
|
||||
checkFor(t, time.Second, 10*time.Millisecond, func() error {
|
||||
if nrln := acc.NumRemoteLeafNodes(); nrln != n {
|
||||
return fmt.Errorf("Number of remote leaf nodes is %d", nrln)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
checkAccRemoteLFCount(acc, loop)
|
||||
checkAccRemoteLFCount(acc2, loop)
|
||||
|
||||
// Now that we are here we should not be allowed anymore leaf nodes.
|
||||
l, _, lnconf := runSolicitWithCredentials(t, s1Opts, mycreds)
|
||||
defer os.Remove(lnconf)
|
||||
defer l.Shutdown()
|
||||
|
||||
if nln := acc.NumLeafNodes(); nln != maxleafs {
|
||||
t.Fatalf("Expected %d leaf nodes, got %d", maxleafs, nln)
|
||||
}
|
||||
// Should still be at loop size.
|
||||
checkLFCount(s1, loop)
|
||||
|
||||
l, _, lnconf = runSolicitWithCredentials(t, s2Opts, mycreds)
|
||||
defer os.Remove(lnconf)
|
||||
defer l.Shutdown()
|
||||
if nln := acc2.NumLeafNodes(); nln != maxleafs {
|
||||
t.Fatalf("Expected %d leaf nodes, got %d", maxleafs, nln)
|
||||
}
|
||||
// Should still be at loop size.
|
||||
checkLFCount(s2, loop)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user