mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Single server limits
Implemented single server account claim limits for subscriptions and active connections and message payload. Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -52,11 +52,19 @@ type Account struct {
|
||||
rm map[string]*rme
|
||||
imports importMap
|
||||
exports exportMap
|
||||
nae int
|
||||
limits
|
||||
nae int
|
||||
pruning bool
|
||||
expired bool
|
||||
}
|
||||
|
||||
// Account based limits.
|
||||
type limits struct {
|
||||
mpay int32
|
||||
msubs int
|
||||
mconns int
|
||||
maxnae int
|
||||
maxaettl time.Duration
|
||||
pruning bool
|
||||
expired bool
|
||||
}
|
||||
|
||||
// Import stream mapping struct
|
||||
@@ -104,6 +112,16 @@ func (a *Account) NumClients() int {
|
||||
return len(a.clients)
|
||||
}
|
||||
|
||||
// MaxClientsReached returns if we have reached our limit for number of connections.
|
||||
func (a *Account) MaxClientsReached() bool {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
if a.mconns != 0 {
|
||||
return len(a.clients) >= a.mconns
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// RoutedSubs returns how many subjects we would send across a route when first
|
||||
// connected or expressing interest. Local client subs.
|
||||
func (a *Account) RoutedSubs() int {
|
||||
@@ -693,6 +711,16 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
a.exports = exportMap{}
|
||||
a.imports = importMap{}
|
||||
|
||||
gatherClients := func() []*client {
|
||||
a.mu.RLock()
|
||||
clients := make([]*client, 0, len(a.clients))
|
||||
for _, c := range a.clients {
|
||||
clients = append(clients, c)
|
||||
}
|
||||
a.mu.RUnlock()
|
||||
return clients
|
||||
}
|
||||
|
||||
for _, e := range ac.Exports {
|
||||
switch e.Type {
|
||||
case jwt.Stream:
|
||||
@@ -723,13 +751,7 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
// Now let's apply any needed changes from import/export changes.
|
||||
if !a.checkStreamImportsEqual(old) {
|
||||
awcsti := map[string]struct{}{a.Name: struct{}{}}
|
||||
a.mu.RLock()
|
||||
clients := make([]*client, 0, len(a.clients))
|
||||
for _, c := range a.clients {
|
||||
clients = append(clients, c)
|
||||
}
|
||||
a.mu.RUnlock()
|
||||
for _, c := range clients {
|
||||
for _, c := range gatherClients() {
|
||||
c.processSubsOnConfigReload(awcsti)
|
||||
}
|
||||
}
|
||||
@@ -759,7 +781,19 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME(dlc) - Limits etc..
|
||||
// Now do limits if they are present.
|
||||
a.msubs = int(ac.Limits.Subs)
|
||||
a.mpay = int32(ac.Limits.Payload)
|
||||
a.mconns = int(ac.Limits.Conn)
|
||||
for i, c := range gatherClients() {
|
||||
if a.mconns > 0 && i >= a.mconns {
|
||||
c.maxAccountConnExceeded()
|
||||
continue
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.applyAccountLimits()
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Helper to build an internal account structure from a jwt.AccountClaims.
|
||||
|
||||
@@ -124,9 +124,11 @@ const (
|
||||
ProtocolViolation
|
||||
BadClientProtocolVersion
|
||||
WrongPort
|
||||
MaxAccountConnectionsExceeded
|
||||
MaxConnectionsExceeded
|
||||
MaxPayloadExceeded
|
||||
MaxControlLineExceeded
|
||||
MaxSubscriptionsExceeded
|
||||
DuplicateRoute
|
||||
RouteRemoved
|
||||
ServerShutdown
|
||||
@@ -136,7 +138,7 @@ const (
|
||||
type client struct {
|
||||
// Here first because of use of atomics, and memory alignment.
|
||||
stats
|
||||
mpay int64
|
||||
mpay int32
|
||||
msubs int
|
||||
mu sync.Mutex
|
||||
typ int
|
||||
@@ -359,6 +361,16 @@ func (c *client) initClient() {
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to report errors.
|
||||
func (c *client) reporteErrRegisterAccount(acc *Account, err error) {
|
||||
if err == ErrTooManyAccountConnections {
|
||||
c.maxAccountConnExceeded()
|
||||
return
|
||||
}
|
||||
c.Errorf("Problem registering with account [%s]", acc.Name)
|
||||
c.sendErr("Failed Account Registration")
|
||||
}
|
||||
|
||||
// RegisterWithAccount will register the given user with a specific
|
||||
// account. This will change the subject namespace.
|
||||
func (c *client) registerWithAccount(acc *Account) error {
|
||||
@@ -371,16 +383,62 @@ func (c *client) registerWithAccount(acc *Account) error {
|
||||
c.srv.decActiveAccounts()
|
||||
}
|
||||
}
|
||||
// Check if we have a max connections violation
|
||||
if acc.MaxClientsReached() {
|
||||
return ErrTooManyAccountConnections
|
||||
}
|
||||
|
||||
// Add in new one.
|
||||
if prev := acc.addClient(c); prev == 0 && c.srv != nil {
|
||||
c.srv.incActiveAccounts()
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.acc = acc
|
||||
c.applyAccountLimits()
|
||||
c.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Apply account limits
|
||||
// Lock is held on entry.
|
||||
// FIXME(dlc) - Should server be able to override here?
|
||||
func (c *client) applyAccountLimits() {
|
||||
if c.acc == nil {
|
||||
return
|
||||
}
|
||||
// Set here, check for more details below. Only set if non-zero.
|
||||
if c.acc.msubs > 0 {
|
||||
c.msubs = c.acc.msubs
|
||||
}
|
||||
if c.acc.mpay > 0 {
|
||||
c.mpay = c.acc.mpay
|
||||
}
|
||||
|
||||
opts := c.srv.getOpts()
|
||||
|
||||
// We check here if the server has an option set that is lower than the account limit.
|
||||
if c.mpay != 0 && opts.MaxPayload != 0 && int32(opts.MaxPayload) < c.acc.mpay {
|
||||
c.Errorf("Max Payload set to %d from server config which overrides %d from account claims", opts.MaxPayload, c.acc.mpay)
|
||||
c.mpay = int32(opts.MaxPayload)
|
||||
}
|
||||
|
||||
// We check here if the server has an option set that is lower than the account limit.
|
||||
if c.msubs != 0 && opts.MaxSubs != 0 && opts.MaxSubs < c.acc.msubs {
|
||||
c.Errorf("Max Subscriptions set to %d from server config which overrides %d from account claims", opts.MaxSubs, c.acc.msubs)
|
||||
c.msubs = opts.MaxSubs
|
||||
}
|
||||
|
||||
if c.msubs > 0 && len(c.subs) > c.msubs {
|
||||
go func() {
|
||||
c.maxSubsExceeded()
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
c.closeConnection(MaxSubscriptionsExceeded)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterUser allows auth to call back into a new client
|
||||
// with the authenticated user. This is used to map
|
||||
// any permissions into the client and setup accounts.
|
||||
@@ -388,8 +446,7 @@ func (c *client) RegisterUser(user *User) {
|
||||
// Register with proper account and sublist.
|
||||
if user.Account != nil {
|
||||
if err := c.registerWithAccount(user.Account); err != nil {
|
||||
c.Errorf("Problem registering with account [%s]", user.Account.Name)
|
||||
c.sendErr("Failed Account Registration")
|
||||
c.reporteErrRegisterAccount(user.Account, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -415,8 +472,7 @@ func (c *client) RegisterNkeyUser(user *NkeyUser) {
|
||||
// Register with proper account and sublist.
|
||||
if user.Account != nil {
|
||||
if err := c.registerWithAccount(user.Account); err != nil {
|
||||
c.Errorf("Problem registering with account [%s]", user.Account.Name)
|
||||
c.sendErr("Failed Account Registration")
|
||||
c.reporteErrRegisterAccount(user.Account, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -954,8 +1010,7 @@ func (c *client) processConnect(arg []byte) error {
|
||||
}
|
||||
// If we are here we can register ourselves with the new account.
|
||||
if err := c.registerWithAccount(acc); err != nil {
|
||||
c.Errorf("Problem registering with account [%s]", account)
|
||||
c.sendErr("Failed Account Registration")
|
||||
c.reporteErrRegisterAccount(acc, err)
|
||||
return ErrBadAccount
|
||||
}
|
||||
} else if c.acc == nil {
|
||||
@@ -1047,6 +1102,11 @@ func (c *client) authViolation() {
|
||||
c.closeConnection(AuthenticationViolation)
|
||||
}
|
||||
|
||||
func (c *client) maxAccountConnExceeded() {
|
||||
c.sendErrAndErr(ErrTooManyAccountConnections.Error())
|
||||
c.closeConnection(MaxAccountConnectionsExceeded)
|
||||
}
|
||||
|
||||
func (c *client) maxConnExceeded() {
|
||||
c.sendErrAndErr(ErrTooManyConnections.Error())
|
||||
c.closeConnection(MaxConnectionsExceeded)
|
||||
@@ -1056,7 +1116,7 @@ func (c *client) maxSubsExceeded() {
|
||||
c.sendErrAndErr(ErrTooManySubs.Error())
|
||||
}
|
||||
|
||||
func (c *client) maxPayloadViolation(sz int, max int64) {
|
||||
func (c *client) maxPayloadViolation(sz int, max int32) {
|
||||
c.Errorf("%s: %d vs %d", ErrMaxPayload.Error(), sz, max)
|
||||
c.sendErr("Maximum Payload Violation")
|
||||
c.closeConnection(MaxPayloadExceeded)
|
||||
@@ -1304,8 +1364,8 @@ func (c *client) processPub(trace bool, arg []byte) error {
|
||||
if c.pa.size < 0 {
|
||||
return fmt.Errorf("processPub Bad or Missing Size: '%s'", arg)
|
||||
}
|
||||
maxPayload := atomic.LoadInt64(&c.mpay)
|
||||
if maxPayload > 0 && int64(c.pa.size) > maxPayload {
|
||||
maxPayload := atomic.LoadInt32(&c.mpay)
|
||||
if maxPayload > 0 && int32(c.pa.size) > maxPayload {
|
||||
c.maxPayloadViolation(c.pa.size, maxPayload)
|
||||
return ErrMaxPayload
|
||||
}
|
||||
|
||||
@@ -47,6 +47,10 @@ var (
|
||||
// server has been reached.
|
||||
ErrTooManyConnections = errors.New("Maximum Connections Exceeded")
|
||||
|
||||
// ErrTooManyAccountConnections signals that an acount has reached its maximum number of active
|
||||
// connections.
|
||||
ErrTooManyAccountConnections = errors.New("Maximum Account Active Connections Exceeded")
|
||||
|
||||
// ErrTooManySubs signals a client that the maximum number of subscriptions per connection
|
||||
// has been reached.
|
||||
ErrTooManySubs = errors.New("Maximum Subscriptions Exceeded")
|
||||
|
||||
@@ -515,7 +515,7 @@ func TestJWTAccountRenewFromResolver(t *testing.T) {
|
||||
apub, _ := akp.PublicKey()
|
||||
nac := jwt.NewAccountClaims(string(apub))
|
||||
nac.IssuedAt = time.Now().Add(-10 * time.Second).Unix()
|
||||
nac.Expires = time.Now().Unix()
|
||||
nac.Expires = time.Now().Add(time.Second).Unix()
|
||||
ajwt, err := nac.Encode(okp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
@@ -524,6 +524,9 @@ func TestJWTAccountRenewFromResolver(t *testing.T) {
|
||||
addAccountToMemResolver(s, string(apub), ajwt)
|
||||
// Force it to be loaded by the server and start the expiration timer.
|
||||
acc := s.LookupAccount(string(apub))
|
||||
if acc == nil {
|
||||
t.Fatalf("Could not retrieve account for %q", apub)
|
||||
}
|
||||
|
||||
// Create a new user
|
||||
nkp, _ := nkeys.CreateUser()
|
||||
@@ -542,6 +545,9 @@ func TestJWTAccountRenewFromResolver(t *testing.T) {
|
||||
sigraw, _ := nkp.Sign([]byte(info.Nonce))
|
||||
sig := base64.StdEncoding.EncodeToString(sigraw)
|
||||
|
||||
// Wait for expiration.
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// PING needed to flush the +OK/-ERR to us.
|
||||
// This should fail since the account is expired.
|
||||
cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", ujwt, sig)
|
||||
@@ -978,7 +984,6 @@ func TestJWTAccountImportActivationExpires(t *testing.T) {
|
||||
sig := base64.StdEncoding.EncodeToString(sigraw)
|
||||
|
||||
// PING needed to flush the +OK/-ERR to us.
|
||||
// This should fail too since no account resolver is defined.
|
||||
cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nSUB import.foo 1\r\nPING\r\n", ujwt, sig)
|
||||
go c.parse([]byte(cs))
|
||||
l, _ = cr.ReadString('\n')
|
||||
@@ -1012,3 +1017,386 @@ func TestJWTAccountImportActivationExpires(t *testing.T) {
|
||||
// Should have expired and been removed.
|
||||
checkShadow(0)
|
||||
}
|
||||
|
||||
func TestJWTAccountLimitsSubs(t *testing.T) {
|
||||
s := opTrustBasicSetup()
|
||||
defer s.Shutdown()
|
||||
buildMemAccResolver(s)
|
||||
|
||||
okp, _ := nkeys.FromSeed(oSeed)
|
||||
|
||||
// Create accounts and imports/exports.
|
||||
fooKP, _ := nkeys.CreateAccount()
|
||||
fooPub, _ := fooKP.PublicKey()
|
||||
fooAC := jwt.NewAccountClaims(string(fooPub))
|
||||
fooAC.Limits.Subs = 10
|
||||
fooJWT, err := fooAC.Encode(okp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
addAccountToMemResolver(s, string(fooPub), fooJWT)
|
||||
|
||||
// Create a client.
|
||||
nkp, _ := nkeys.CreateUser()
|
||||
pub, _ := nkp.PublicKey()
|
||||
nuc := jwt.NewUserClaims(string(pub))
|
||||
ujwt, err := nuc.Encode(fooKP)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating user JWT: %v", err)
|
||||
}
|
||||
|
||||
c, cr, l := newClientForServer(s)
|
||||
|
||||
// Sign Nonce
|
||||
var info nonceInfo
|
||||
json.Unmarshal([]byte(l[5:]), &info)
|
||||
sigraw, _ := nkp.Sign([]byte(info.Nonce))
|
||||
sig := base64.StdEncoding.EncodeToString(sigraw)
|
||||
|
||||
quit := make(chan bool)
|
||||
defer func() { quit <- true }()
|
||||
|
||||
pab := make(chan []byte, 16)
|
||||
|
||||
parseAsync := func(cs []byte) {
|
||||
pab <- cs
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case cs := <-pab:
|
||||
c.parse(cs)
|
||||
case <-quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// PING needed to flush the +OK/-ERR to us.
|
||||
cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", ujwt, sig)
|
||||
parseAsync([]byte(cs))
|
||||
l, _ = cr.ReadString('\n')
|
||||
if !strings.HasPrefix(l, "+OK") {
|
||||
t.Fatalf("Expected an OK, got: %v", l)
|
||||
}
|
||||
l, _ = cr.ReadString('\n')
|
||||
if !strings.HasPrefix(l, "PONG") {
|
||||
t.Fatalf("Expected a PONG")
|
||||
}
|
||||
|
||||
// Check to make sure we have the limit set.
|
||||
// Account first
|
||||
fooAcc := s.LookupAccount(string(fooPub))
|
||||
fooAcc.mu.RLock()
|
||||
if fooAcc.msubs != 10 {
|
||||
fooAcc.mu.RUnlock()
|
||||
t.Fatalf("Expected account to have msubs of 10, got %d", fooAcc.msubs)
|
||||
}
|
||||
fooAcc.mu.RUnlock()
|
||||
// Now test that the client has limits too.
|
||||
c.mu.Lock()
|
||||
if c.msubs != 10 {
|
||||
c.mu.Unlock()
|
||||
t.Fatalf("Expected client msubs to be 10, got %d", c.msubs)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
// Now make sure its enforced.
|
||||
/// These should all work ok.
|
||||
for i := 0; i < 10; i++ {
|
||||
cs := fmt.Sprintf("SUB foo %d\r\nPING\r\n", i)
|
||||
parseAsync([]byte(cs))
|
||||
l, _ = cr.ReadString('\n')
|
||||
if !strings.HasPrefix(l, "+OK") {
|
||||
t.Fatalf("Expected an OK, got: %v", l)
|
||||
}
|
||||
l, _ = cr.ReadString('\n')
|
||||
if !strings.HasPrefix(l, "PONG") {
|
||||
t.Fatalf("Expected a PONG")
|
||||
}
|
||||
}
|
||||
|
||||
// This one should fail.
|
||||
cs = fmt.Sprintf("SUB foo 22\r\n")
|
||||
parseAsync([]byte(cs))
|
||||
l, _ = cr.ReadString('\n')
|
||||
if !strings.HasPrefix(l, "-ERR") {
|
||||
t.Fatalf("Expected an ERR, got: %v", l)
|
||||
}
|
||||
if !strings.Contains(l, "Maximum Subscriptions Exceeded") {
|
||||
t.Fatalf("Expected an ERR for max subscriptions exceeded, got: %v", l)
|
||||
}
|
||||
|
||||
// Now update the claims and expect if max is lower to be disconnected.
|
||||
fooAC.Limits.Subs = 5
|
||||
fooJWT, err = fooAC.Encode(okp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
addAccountToMemResolver(s, string(fooPub), fooJWT)
|
||||
s.updateAccountClaims(fooAcc, fooAC)
|
||||
l, _ = cr.ReadString('\n')
|
||||
if !strings.HasPrefix(l, "-ERR") {
|
||||
t.Fatalf("Expected an ERR, got: %v", l)
|
||||
}
|
||||
if !strings.Contains(l, "Maximum Subscriptions Exceeded") {
|
||||
t.Fatalf("Expected an ERR for max subscriptions exceeded, got: %v", l)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJWTAccountLimitsSubsButServerOverrides(t *testing.T) {
|
||||
s := opTrustBasicSetup()
|
||||
defer s.Shutdown()
|
||||
buildMemAccResolver(s)
|
||||
|
||||
// override with server setting of 2.
|
||||
opts := s.getOpts()
|
||||
opts.MaxSubs = 2
|
||||
|
||||
okp, _ := nkeys.FromSeed(oSeed)
|
||||
|
||||
// Create accounts and imports/exports.
|
||||
fooKP, _ := nkeys.CreateAccount()
|
||||
fooPub, _ := fooKP.PublicKey()
|
||||
fooAC := jwt.NewAccountClaims(string(fooPub))
|
||||
fooAC.Limits.Subs = 10
|
||||
fooJWT, err := fooAC.Encode(okp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
addAccountToMemResolver(s, string(fooPub), fooJWT)
|
||||
fooAcc := s.LookupAccount(string(fooPub))
|
||||
fooAcc.mu.RLock()
|
||||
if fooAcc.msubs != 10 {
|
||||
fooAcc.mu.RUnlock()
|
||||
t.Fatalf("Expected account to have msubs of 10, got %d", fooAcc.msubs)
|
||||
}
|
||||
fooAcc.mu.RUnlock()
|
||||
|
||||
// Create a client.
|
||||
nkp, _ := nkeys.CreateUser()
|
||||
pub, _ := nkp.PublicKey()
|
||||
nuc := jwt.NewUserClaims(string(pub))
|
||||
ujwt, err := nuc.Encode(fooKP)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating user JWT: %v", err)
|
||||
}
|
||||
|
||||
c, cr, l := newClientForServer(s)
|
||||
|
||||
// Sign Nonce
|
||||
var info nonceInfo
|
||||
json.Unmarshal([]byte(l[5:]), &info)
|
||||
sigraw, _ := nkp.Sign([]byte(info.Nonce))
|
||||
sig := base64.StdEncoding.EncodeToString(sigraw)
|
||||
|
||||
cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\"}\r\nSUB foo 1\r\nSUB bar 2\r\nSUB baz 3\r\nPING\r\n", ujwt, sig)
|
||||
go c.parse([]byte(cs))
|
||||
l, _ = cr.ReadString('\n')
|
||||
if !strings.HasPrefix(l, "-ERR ") {
|
||||
t.Fatalf("Expected an error")
|
||||
}
|
||||
if !strings.Contains(l, "Maximum Subscriptions Exceeded") {
|
||||
t.Fatalf("Expected an ERR for max subscriptions exceeded, got: %v", l)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJWTAccountLimitsMaxPayload(t *testing.T) {
|
||||
s := opTrustBasicSetup()
|
||||
defer s.Shutdown()
|
||||
buildMemAccResolver(s)
|
||||
|
||||
okp, _ := nkeys.FromSeed(oSeed)
|
||||
|
||||
// Create accounts and imports/exports.
|
||||
fooKP, _ := nkeys.CreateAccount()
|
||||
fooPub, _ := fooKP.PublicKey()
|
||||
fooAC := jwt.NewAccountClaims(string(fooPub))
|
||||
fooAC.Limits.Payload = 8
|
||||
fooJWT, err := fooAC.Encode(okp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
addAccountToMemResolver(s, string(fooPub), fooJWT)
|
||||
|
||||
// Create a client.
|
||||
nkp, _ := nkeys.CreateUser()
|
||||
pub, _ := nkp.PublicKey()
|
||||
nuc := jwt.NewUserClaims(string(pub))
|
||||
ujwt, err := nuc.Encode(fooKP)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating user JWT: %v", err)
|
||||
}
|
||||
|
||||
c, cr, l := newClientForServer(s)
|
||||
|
||||
// Sign Nonce
|
||||
var info nonceInfo
|
||||
json.Unmarshal([]byte(l[5:]), &info)
|
||||
sigraw, _ := nkp.Sign([]byte(info.Nonce))
|
||||
sig := base64.StdEncoding.EncodeToString(sigraw)
|
||||
|
||||
quit := make(chan bool)
|
||||
defer func() { quit <- true }()
|
||||
|
||||
pab := make(chan []byte, 16)
|
||||
|
||||
parseAsync := func(cs []byte) {
|
||||
pab <- cs
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case cs := <-pab:
|
||||
c.parse(cs)
|
||||
case <-quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\"}\r\nPING\r\n", ujwt, sig)
|
||||
parseAsync([]byte(cs))
|
||||
l, _ = cr.ReadString('\n')
|
||||
if !strings.HasPrefix(l, "PONG") {
|
||||
t.Fatalf("Expected a PONG")
|
||||
}
|
||||
|
||||
// Check to make sure we have the limit set.
|
||||
// Account first
|
||||
fooAcc := s.LookupAccount(string(fooPub))
|
||||
fooAcc.mu.RLock()
|
||||
if fooAcc.mpay != 8 {
|
||||
fooAcc.mu.RUnlock()
|
||||
t.Fatalf("Expected account to have mpay of 8, got %d", fooAcc.mpay)
|
||||
}
|
||||
fooAcc.mu.RUnlock()
|
||||
// Now test that the client has limits too.
|
||||
c.mu.Lock()
|
||||
if c.mpay != 8 {
|
||||
c.mu.Unlock()
|
||||
t.Fatalf("Expected client to have mpay of 10, got %d", c.mpay)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
cs = fmt.Sprintf("PUB foo 4\r\nXXXX\r\nPING\r\n")
|
||||
parseAsync([]byte(cs))
|
||||
l, _ = cr.ReadString('\n')
|
||||
if !strings.HasPrefix(l, "PONG") {
|
||||
t.Fatalf("Expected a PONG")
|
||||
}
|
||||
|
||||
cs = fmt.Sprintf("PUB foo 10\r\nXXXXXXXXXX\r\nPING\r\n")
|
||||
parseAsync([]byte(cs))
|
||||
l, _ = cr.ReadString('\n')
|
||||
if !strings.HasPrefix(l, "-ERR ") {
|
||||
t.Fatalf("Expected an error")
|
||||
}
|
||||
if !strings.Contains(l, "Maximum Payload") {
|
||||
t.Fatalf("Expected an ERR for max payload violation, got: %v", l)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJWTAccountLimitsMaxPayloadButServerOverrides(t *testing.T) {
|
||||
s := opTrustBasicSetup()
|
||||
defer s.Shutdown()
|
||||
buildMemAccResolver(s)
|
||||
|
||||
// override with server setting of 4.
|
||||
opts := s.getOpts()
|
||||
opts.MaxPayload = 4
|
||||
|
||||
okp, _ := nkeys.FromSeed(oSeed)
|
||||
|
||||
// Create accounts and imports/exports.
|
||||
fooKP, _ := nkeys.CreateAccount()
|
||||
fooPub, _ := fooKP.PublicKey()
|
||||
fooAC := jwt.NewAccountClaims(string(fooPub))
|
||||
fooAC.Limits.Payload = 8
|
||||
fooJWT, err := fooAC.Encode(okp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
addAccountToMemResolver(s, string(fooPub), fooJWT)
|
||||
|
||||
// Create a client.
|
||||
nkp, _ := nkeys.CreateUser()
|
||||
pub, _ := nkp.PublicKey()
|
||||
nuc := jwt.NewUserClaims(string(pub))
|
||||
ujwt, err := nuc.Encode(fooKP)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating user JWT: %v", err)
|
||||
}
|
||||
|
||||
c, cr, l := newClientForServer(s)
|
||||
|
||||
// Sign Nonce
|
||||
var info nonceInfo
|
||||
json.Unmarshal([]byte(l[5:]), &info)
|
||||
sigraw, _ := nkp.Sign([]byte(info.Nonce))
|
||||
sig := base64.StdEncoding.EncodeToString(sigraw)
|
||||
|
||||
cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\"}\r\nPUB foo 6\r\nXXXXXX\r\nPING\r\n", ujwt, sig)
|
||||
go c.parse([]byte(cs))
|
||||
l, _ = cr.ReadString('\n')
|
||||
if !strings.HasPrefix(l, "-ERR ") {
|
||||
t.Fatalf("Expected an error")
|
||||
}
|
||||
if !strings.Contains(l, "Maximum Payload") {
|
||||
t.Fatalf("Expected an ERR for max payload violation, got: %v", l)
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: For now this is single server, will change to adapt for network wide.
|
||||
// TODO(dlc) - Make cluster/gateway aware.
|
||||
func TestJWTAccountLimitsMaxConns(t *testing.T) {
|
||||
s := opTrustBasicSetup()
|
||||
defer s.Shutdown()
|
||||
buildMemAccResolver(s)
|
||||
|
||||
okp, _ := nkeys.FromSeed(oSeed)
|
||||
|
||||
// Create accounts and imports/exports.
|
||||
fooKP, _ := nkeys.CreateAccount()
|
||||
fooPub, _ := fooKP.PublicKey()
|
||||
fooAC := jwt.NewAccountClaims(string(fooPub))
|
||||
fooAC.Limits.Conn = 8
|
||||
fooJWT, err := fooAC.Encode(okp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
addAccountToMemResolver(s, string(fooPub), fooJWT)
|
||||
|
||||
newClient := func(expPre string) {
|
||||
t.Helper()
|
||||
// Create a client.
|
||||
nkp, _ := nkeys.CreateUser()
|
||||
pub, _ := nkp.PublicKey()
|
||||
nuc := jwt.NewUserClaims(string(pub))
|
||||
ujwt, err := nuc.Encode(fooKP)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating user JWT: %v", err)
|
||||
}
|
||||
c, cr, l := newClientForServer(s)
|
||||
|
||||
// Sign Nonce
|
||||
var info nonceInfo
|
||||
json.Unmarshal([]byte(l[5:]), &info)
|
||||
sigraw, _ := nkp.Sign([]byte(info.Nonce))
|
||||
sig := base64.StdEncoding.EncodeToString(sigraw)
|
||||
cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\", \"verbose\":true}\r\nPING\r\n", ujwt, sig)
|
||||
go c.parse([]byte(cs))
|
||||
l, _ = cr.ReadString('\n')
|
||||
if !strings.HasPrefix(l, expPre) {
|
||||
t.Fatalf("Expected a response starting with %q", expPre)
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < 8; i++ {
|
||||
newClient("+OK")
|
||||
}
|
||||
// Now this one should fail.
|
||||
newClient("-ERR ")
|
||||
}
|
||||
|
||||
@@ -1029,10 +1029,14 @@ func (reason ClosedState) String() string {
|
||||
return "Incorrect Port"
|
||||
case MaxConnectionsExceeded:
|
||||
return "Maximum Connections Exceeded"
|
||||
case MaxAccountConnectionsExceeded:
|
||||
return "Maximum Account Connections Exceeded"
|
||||
case MaxPayloadExceeded:
|
||||
return "Maximum Message Payload Exceeded"
|
||||
case MaxControlLineExceeded:
|
||||
return "Maximum Control Line Exceeded"
|
||||
case MaxSubscriptionsExceeded:
|
||||
return "Maximum Subscriptions Exceeded"
|
||||
case DuplicateRoute:
|
||||
return "Duplicate Route"
|
||||
case RouteRemoved:
|
||||
|
||||
@@ -412,7 +412,7 @@ func (m *maxPayloadOption) Apply(server *Server) {
|
||||
server.mu.Lock()
|
||||
server.info.MaxPayload = m.newValue
|
||||
for _, client := range server.clients {
|
||||
atomic.StoreInt64(&client.mpay, int64(m.newValue))
|
||||
atomic.StoreInt32(&client.mpay, int32(m.newValue))
|
||||
}
|
||||
server.mu.Unlock()
|
||||
server.Noticef("Reloaded: max_payload = %d", m.newValue)
|
||||
|
||||
@@ -489,15 +489,6 @@ func (s *Server) LookupAccount(name string) *Account {
|
||||
return s.fetchAccount(name)
|
||||
}
|
||||
|
||||
/*
|
||||
// UpdateAccount will fetch new claims and if found update the account.
|
||||
func (s *Server) UpdateAccount(acc *Account) bool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.updateAccount(acc)
|
||||
}
|
||||
*/
|
||||
|
||||
// This will fetch new claims and if found update the account with new claims.
|
||||
// Lock should be held upon entry.
|
||||
func (s *Server) updateAccount(acc *Account) bool {
|
||||
@@ -1073,7 +1064,7 @@ func (s *Server) createClient(conn net.Conn) *client {
|
||||
// Snapshot server options.
|
||||
opts := s.getOpts()
|
||||
|
||||
maxPay := int64(opts.MaxPayload)
|
||||
maxPay := int32(opts.MaxPayload)
|
||||
maxSubs := opts.MaxSubs
|
||||
now := time.Now()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user