From 2d54fc3ee75f2fd22e09d2e8d96f1af604928676 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 5 Dec 2018 14:18:08 -0800 Subject: [PATCH] Account lookup failures, account and client limits, options reload. Changed account lookup and validation failures to be more understandable by users. Changed limits to be -1 for unlimited to match jwt pkg. The limits changed exposed problems with options holding real objects causing issues with reload tests under race mode. Longer term this code should be reworked such that options only hold config data, not real structs, etc. Signed-off-by: Derek Collison --- conf/parse.go | 2 +- main.go | 10 +- server/accounts.go | 43 ++++-- server/accounts_test.go | 43 +++--- server/auth.go | 14 +- server/client.go | 45 ++++--- server/errors.go | 6 + server/events.go | 4 +- server/events_test.go | 30 +++-- server/gateway_test.go | 2 +- server/jwt.go | 2 +- server/jwt_test.go | 36 ++--- server/opts.go | 17 ++- server/parser_test.go | 2 +- server/reload.go | 43 ++++-- server/reload_test.go | 14 +- server/route.go | 15 ++- server/server.go | 124 +++++++++++------- server/split_test.go | 26 ++-- test/configs/resolver_preload.conf | 2 +- test/gateway_test.go | 4 +- test/operator_test.go | 8 +- .../github.com/nats-io/jwt/account_claims.go | 37 +++++- vendor/github.com/nats-io/jwt/claims.go | 16 ++- vendor/github.com/nats-io/jwt/header.go | 5 +- vendor/github.com/nats-io/nkeys/main.go | 2 +- vendor/github.com/nats-io/nkeys/nk/main.go | 11 +- vendor/manifest | 4 +- 28 files changed, 359 insertions(+), 208 deletions(-) diff --git a/conf/parse.go b/conf/parse.go index 3383fcd2..a6684f2c 100644 --- a/conf/parse.go +++ b/conf/parse.go @@ -87,7 +87,7 @@ func ParseFile(fp string) (map[string]interface{}, error) { func ParseFileWithChecks(fp string) (map[string]interface{}, error) { data, err := ioutil.ReadFile(fp) if err != nil { - return nil, fmt.Errorf("error opening config file: %v", err) + return nil, err } p, err := parse(string(data), fp, true) diff --git a/main.go b/main.go index db42ea96..a0b7ca1f 100644 --- a/main.go +++ b/main.go @@ -78,8 +78,10 @@ func usage() { } func main() { + exe := "nats-server" + // Create a FlagSet and sets the usage - fs := flag.NewFlagSet("nats-server", flag.ExitOnError) + fs := flag.NewFlagSet(exe, flag.ExitOnError) fs.Usage = usage // Configure the options from the flags/config file @@ -88,16 +90,16 @@ func main() { fs.Usage, server.PrintTLSHelpAndDie) if err != nil { - server.PrintAndDie(fmt.Sprintf("ERR: %s", err)) + server.PrintAndDie(fmt.Sprintf("%s: %s", exe, err)) } else if opts.CheckConfig { - fmt.Fprintf(os.Stderr, "configuration file %s is valid\n", opts.ConfigFile) + fmt.Fprintf(os.Stderr, "nats-server: configuration file %s is valid\n", opts.ConfigFile) os.Exit(0) } // Create the server with appropriate options. s, err := server.NewServer(opts) if err != nil { - server.PrintAndDie(fmt.Sprintf("ERR: %s", err)) + server.PrintAndDie(fmt.Sprintf("%s: %s", exe, err)) } // Configure the logger based on the flags diff --git a/server/accounts.go b/server/accounts.go index 0961503c..10b40183 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -110,6 +110,26 @@ type exportMap struct { services map[string]*exportAuth } +func NewAccount(name string) *Account { + a := &Account{ + Name: name, + sl: NewSublist(), + limits: limits{-1, -1, -1, 0, 0}, + } + return a +} + +// Used to create shallow copies of accounts for transfer +// from opts to real accounts in server struct. +func (a *Account) shallowCopy() *Account { + na := NewAccount(a.Name) + na.Nkey = a.Nkey + na.Issuer = a.Issuer + na.imports = a.imports + na.exports = a.exports + return na +} + // NumClients returns active number of clients for this account for // all known servers. func (a *Account) NumConnections() int { @@ -134,7 +154,7 @@ func (a *Account) MaxTotalConnectionsReached() bool { } func (a *Account) maxTotalConnectionsReached() bool { - if a.mconns != 0 { + if a.mconns != jwt.NoLimit { return len(a.clients)+a.nrclients >= a.mconns } return false @@ -805,8 +825,8 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { } a.checkExpiration(ac.Claims()) - // Clone to update - old := &Account{Name: a.Name, imports: a.imports, exports: a.exports} + // Clone to update, only select certain fields. + old := &Account{Name: a.Name, imports: a.imports, exports: a.exports, limits: a.limits} // Reset exports and imports here. a.exports = exportMap{} @@ -837,7 +857,7 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { for _, i := range ac.Imports { acc := s.accounts[i.Account] if acc == nil { - if acc = s.fetchAccount(i.Account); acc == nil { + if acc, _ = s.fetchAccount(i.Account); acc == nil { s.Debugf("Can't locate account [%s] for import of [%v] %s", i.Account, i.Subject, i.Type) continue } @@ -907,7 +927,7 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { }) } for i, c := range clients { - if a.mconns > 0 && i >= a.mconns { + if a.mconns != jwt.NoLimit && i >= a.mconns { c.maxAccountConnExceeded() continue } @@ -919,7 +939,8 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { // Helper to build an internal account structure from a jwt.AccountClaims. func (s *Server) buildInternalAccount(ac *jwt.AccountClaims) *Account { - acc := &Account{Name: ac.Subject, Issuer: ac.Issuer} + acc := NewAccount(ac.Subject) + acc.Issuer = ac.Issuer s.updateAccountClaims(acc, ac) return acc } @@ -1004,8 +1025,12 @@ func NewURLAccResolver(url string) (*URLAccResolver, error) { func (ur *URLAccResolver) Fetch(name string) (string, error) { url := ur.url + name resp, err := ur.c.Get(url) - if err != nil || resp == nil || resp.StatusCode != http.StatusOK { - return _EMPTY_, fmt.Errorf("URL(%q) returned error", url) + if err != nil { + return _EMPTY_, fmt.Errorf("Could not fetch <%q>: %v", url, err) + } else if resp == nil { + return _EMPTY_, fmt.Errorf("Could not fetch <%q>: no response", url) + } else if resp.StatusCode != http.StatusOK { + return _EMPTY_, fmt.Errorf("Could not fetch <%q>: %v", url, resp.Status) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) @@ -1017,5 +1042,5 @@ func (ur *URLAccResolver) Fetch(name string) (string, error) { // Store is not implemented for URL Resolver. func (ur *URLAccResolver) Store(name, jwt string) error { - return fmt.Errorf("Store operation not supported on URL Resolver") + return fmt.Errorf("Store operation not supported for URL Resolver") } diff --git a/server/accounts_test.go b/server/accounts_test.go index af76546c..65e06def 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -102,10 +102,7 @@ func TestAccountIsolation(t *testing.T) { func TestAccountFromOptions(t *testing.T) { opts := defaultServerOptions - opts.Accounts = []*Account{ - &Account{Name: "foo"}, - &Account{Name: "bar"}, - } + opts.Accounts = []*Account{NewAccount("foo"), NewAccount("bar")} s := New(&opts) ta := s.numReservedAccounts() + 2 @@ -113,8 +110,8 @@ func TestAccountFromOptions(t *testing.T) { t.Fatalf("Expected to have a server with %d total accounts, got %v", ta, la) } // Check that sl is filled in. - fooAcc := s.LookupAccount("foo") - barAcc := s.LookupAccount("bar") + fooAcc, _ := s.LookupAccount("foo") + barAcc, _ := s.LookupAccount("bar") if fooAcc == nil || barAcc == nil { t.Fatalf("Error retrieving accounts for 'foo' and 'bar'") } @@ -192,8 +189,8 @@ func TestActiveAccounts(t *testing.T) { } // Make sure the Accounts track clients. - foo := s.LookupAccount("foo") - bar := s.LookupAccount("bar") + foo, _ := s.LookupAccount("foo") + bar, _ := s.LookupAccount("bar") if foo == nil || bar == nil { t.Fatalf("Error looking up accounts") } @@ -1188,8 +1185,8 @@ func TestAccountMapsUsers(t *testing.T) { t.Fatalf("Unexpected error parsing config file: %v", err) } s := New(opts) - synadia := s.LookupAccount("synadia") - nats := s.LookupAccount("nats") + synadia, _ := s.LookupAccount("synadia") + nats, _ := s.LookupAccount("nats") if synadia == nil || nats == nil { t.Fatalf("Expected non nil accounts during lookup") @@ -1276,7 +1273,7 @@ func TestAccountGlobalDefault(t *testing.T) { opts := defaultServerOptions s := New(&opts) - if acc := s.LookupAccount(globalAccountName); acc == nil { + if acc, _ := s.LookupAccount(globalAccountName); acc == nil { t.Fatalf("Expected a global default account on a new server, got none.") } // Make sure we can not create one with same name.. @@ -1295,16 +1292,16 @@ func TestAccountGlobalDefault(t *testing.T) { func TestAccountCheckStreamImportsEqual(t *testing.T) { // Create bare accounts for this test - fooAcc := &Account{Name: "foo"} + fooAcc := NewAccount("foo") if err := fooAcc.AddStreamExport(">", nil); err != nil { t.Fatalf("Error adding stream export: %v", err) } - barAcc := &Account{Name: "bar"} + barAcc := NewAccount("bar") if err := barAcc.AddStreamImport(fooAcc, "foo", "myPrefix"); err != nil { t.Fatalf("Error adding stream import: %v", err) } - bazAcc := &Account{Name: "baz"} + bazAcc := NewAccount("baz") if err := bazAcc.AddStreamImport(fooAcc, "foo", "myPrefix"); err != nil { t.Fatalf("Error adding stream import: %v", err) } @@ -1327,11 +1324,11 @@ func TestAccountCheckStreamImportsEqual(t *testing.T) { // Create another account that is named "foo". We want to make sure // that the comparison still works (based on account name, not pointer) - newFooAcc := &Account{Name: "foo"} + newFooAcc := NewAccount("foo") if err := newFooAcc.AddStreamExport(">", nil); err != nil { t.Fatalf("Error adding stream export: %v", err) } - batAcc := &Account{Name: "bat"} + batAcc := NewAccount("bat") if err := batAcc.AddStreamImport(newFooAcc, "foo", "myPrefix"); err != nil { t.Fatalf("Error adding stream import: %v", err) } @@ -1346,15 +1343,15 @@ func TestAccountCheckStreamImportsEqual(t *testing.T) { } // Test with account with different "from" - expAcc := &Account{Name: "new_acc"} + expAcc := NewAccount("new_acc") if err := expAcc.AddStreamExport(">", nil); err != nil { t.Fatalf("Error adding stream export: %v", err) } - aAcc := &Account{Name: "a"} + aAcc := NewAccount("a") if err := aAcc.AddStreamImport(expAcc, "bar", ""); err != nil { t.Fatalf("Error adding stream import: %v", err) } - bAcc := &Account{Name: "b"} + bAcc := NewAccount("b") if err := bAcc.AddStreamImport(expAcc, "baz", ""); err != nil { t.Fatalf("Error adding stream import: %v", err) } @@ -1363,11 +1360,11 @@ func TestAccountCheckStreamImportsEqual(t *testing.T) { } // Test with account with different "prefix" - aAcc = &Account{Name: "a"} + aAcc = NewAccount("a") if err := aAcc.AddStreamImport(expAcc, "bar", "prefix"); err != nil { t.Fatalf("Error adding stream import: %v", err) } - bAcc = &Account{Name: "b"} + bAcc = NewAccount("b") if err := bAcc.AddStreamImport(expAcc, "bar", "diff_prefix"); err != nil { t.Fatalf("Error adding stream import: %v", err) } @@ -1376,11 +1373,11 @@ func TestAccountCheckStreamImportsEqual(t *testing.T) { } // Test with account with different "name" - expAcc = &Account{Name: "diff_name"} + expAcc = NewAccount("diff_name") if err := expAcc.AddStreamExport(">", nil); err != nil { t.Fatalf("Error adding stream export: %v", err) } - bAcc = &Account{Name: "b"} + bAcc = NewAccount("b") if err := bAcc.AddStreamImport(expAcc, "bar", "prefix"); err != nil { t.Fatalf("Error adding stream import: %v", err) } diff --git a/server/auth.go b/server/auth.go index 5da5c651..81665a92 100644 --- a/server/auth.go +++ b/server/auth.go @@ -207,13 +207,21 @@ func (s *Server) configureAuthorization() { if opts.Nkeys != nil { s.nkeys = make(map[string]*NkeyUser) for _, u := range opts.Nkeys { - s.nkeys[u.Nkey] = u + copy := u.clone() + if u.Account != nil { + copy.Account = s.accounts[u.Account.Name] + } + s.nkeys[u.Nkey] = copy } } if opts.Users != nil { s.users = make(map[string]*User) for _, u := range opts.Users { - s.users[u.Username] = u + copy := u.clone() + if u.Account != nil { + copy.Account = s.accounts[u.Account.Name] + } + s.users[u.Username] = copy } } s.assignGlobalAccountToOrphanUsers() @@ -319,7 +327,7 @@ func (s *Server) isClientAuthorized(c *client) bool { // If we have a jwt and a userClaim, make sure we have the Account, etc associated. // We need to look up the account. This will use an account resolver if one is present. if juc != nil { - if acc = s.LookupAccount(juc.Issuer); acc == nil { + if acc, _ = s.LookupAccount(juc.Issuer); acc == nil { c.Debugf("Account JWT can not be found") return false } diff --git a/server/client.go b/server/client.go index 259eacd8..56e9161f 100644 --- a/server/client.go +++ b/server/client.go @@ -430,6 +430,16 @@ func (c *client) registerWithAccount(acc *Account) error { return nil } +// Helper to determine if we have exceeded max subs. +func (c *client) subsExceeded() bool { + return c.msubs != jwt.NoLimit && len(c.subs) > c.msubs +} + +// Helper to determine if we have met or exceeded max subs. +func (c *client) subsAtLimit() bool { + return c.msubs != jwt.NoLimit && len(c.subs) >= c.msubs +} + // Apply account limits // Lock is held on entry. // FIXME(dlc) - Should server be able to override here? @@ -437,29 +447,30 @@ func (c *client) applyAccountLimits() { if c.acc == nil { return } - // Set here, check for more details below. Only set if non-zero. - if c.acc.msubs > 0 { + + // Set here, will need to fo checks for NoLimit. + if c.acc.msubs != jwt.NoLimit { c.msubs = c.acc.msubs } - if c.acc.mpay > 0 { + if c.acc.mpay != jwt.NoLimit { c.mpay = c.acc.mpay } opts := c.srv.getOpts() // We check here if the server has an option set that is lower than the account limit. - if c.mpay != 0 && opts.MaxPayload != 0 && int32(opts.MaxPayload) < c.acc.mpay { + if c.mpay != jwt.NoLimit && opts.MaxPayload != 0 && int32(opts.MaxPayload) < c.acc.mpay { c.Errorf("Max Payload set to %d from server config which overrides %d from account claims", opts.MaxPayload, c.acc.mpay) c.mpay = int32(opts.MaxPayload) } // We check here if the server has an option set that is lower than the account limit. - if c.msubs != 0 && opts.MaxSubs != 0 && opts.MaxSubs < c.acc.msubs { + if c.msubs != jwt.NoLimit && opts.MaxSubs != 0 && opts.MaxSubs < c.acc.msubs { c.Errorf("Max Subscriptions set to %d from server config which overrides %d from account claims", opts.MaxSubs, c.acc.msubs) c.msubs = opts.MaxSubs } - if c.msubs > 0 && len(c.subs) > c.msubs { + if c.subsExceeded() { go func() { c.maxSubsExceeded() time.Sleep(20 * time.Millisecond) @@ -1041,13 +1052,14 @@ func (c *client) processConnect(arg []byte) error { if account != "" { var acc *Account var wasNew bool + var err error if !srv.NewAccountsAllowed() { - acc = srv.LookupAccount(account) - if acc == nil { - c.Errorf(ErrMissingAccount.Error()) + acc, err = srv.LookupAccount(account) + if err != nil { + c.Errorf(err.Error()) c.sendErr("Account Not Found") - return ErrMissingAccount - } else if accountNew { + return err + } else if accountNew && acc != nil { c.Errorf(ErrAccountExists.Error()) c.sendErr(ErrAccountExists.Error()) return ErrAccountExists @@ -1413,7 +1425,7 @@ func (c *client) processPub(trace bool, arg []byte) error { return fmt.Errorf("processPub Bad or Missing Size: '%s'", arg) } maxPayload := atomic.LoadInt32(&c.mpay) - if maxPayload > 0 && int32(c.pa.size) > maxPayload { + if maxPayload != jwt.NoLimit && int32(c.pa.size) > maxPayload { c.maxPayloadViolation(c.pa.size, maxPayload) return ErrMaxPayload } @@ -1497,7 +1509,7 @@ func (c *client) processSub(argo []byte) (err error) { } // Check if we have a maximum on the number of subscriptions. - if c.msubs > 0 && len(c.subs) >= c.msubs { + if c.subsAtLimit() { c.mu.Unlock() c.maxSubsExceeded() return nil @@ -2480,7 +2492,9 @@ func (c *client) clearConnection(reason ClosedState) { c.flushOutbound() // Clear outbound here. - c.out.sg.Broadcast() + if c.out.sg != nil { + c.out.sg.Broadcast() + } // With TLS, Close() is sending an alert (that is doing a write). // Need to set a deadline otherwise the server could block there @@ -2791,8 +2805,7 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) { if !ok { // Match correct account and sublist. - acc = c.srv.LookupAccount(string(c.pa.account)) - if acc == nil { + if acc, _ = c.srv.LookupAccount(string(c.pa.account)); acc == nil { return nil, nil } diff --git a/server/errors.go b/server/errors.go index 80b63a9b..c8e46c89 100644 --- a/server/errors.go +++ b/server/errors.go @@ -81,6 +81,12 @@ var ( // ErrNoAccountResolver is returned when we attempt an update but do not have an account resolver. ErrNoAccountResolver = errors.New("Account Resolver Missing") + // ErrAccountResolverUpdateTooSoon is returned when we attempt an update too soon to last request. + ErrAccountResolverUpdateTooSoon = errors.New("Account Resolver Update Too Soon") + + // ErrAccountResolverSameClaims is returned when same claims have been fetched. + ErrAccountResolverSameClaims = errors.New("Account Resolver No New Claims") + // ErrStreamImportAuthorization is returned when a stream import is not authorized. ErrStreamImportAuthorization = errors.New("Stream Import Not Authorized") diff --git a/server/events.go b/server/events.go index f50891c0..4c7607e0 100644 --- a/server/events.go +++ b/server/events.go @@ -539,7 +539,7 @@ func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []by s.sys.client.Errorf("Error unmarshalling account connections request message: %v", err) return } - acc := s.lookupAccount(m.Account) + acc, _ := s.lookupAccount(m.Account) if acc == nil { return } @@ -576,7 +576,7 @@ func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg return } // See if we have the account registered, if not drop it. - acc := s.lookupAccount(m.Account) + acc, _ := s.lookupAccount(m.Account) if acc == nil { s.sys.client.Debugf("Received account connection event for unknown account: %s", m.Account) return diff --git a/server/events_test.go b/server/events_test.go index 16230b0c..dd077864 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -36,7 +36,11 @@ func createAccount(s *Server) (*Account, nkeys.KeyPair) { nac := jwt.NewAccountClaims(pub) jwt, _ := nac.Encode(okp) addAccountToMemResolver(s, pub, jwt) - return s.LookupAccount(pub), akp + acc, err := s.LookupAccount(pub) + if err != nil { + panic(err) + } + return acc, akp } func createUserCreds(t *testing.T, s *Server, akp nkeys.KeyPair) nats.Option { @@ -631,7 +635,7 @@ func TestAccountClaimsUpdates(t *testing.T) { addAccountToMemResolver(s, pub, ajwt) - acc := s.LookupAccount(pub) + acc, _ := s.LookupAccount(pub) if acc.MaxActiveConnections() != 4 { t.Fatalf("Expected to see a limit of 4 connections") } @@ -658,7 +662,7 @@ func TestAccountClaimsUpdates(t *testing.T) { nc.Publish(claimUpdateSubj, []byte(ajwt)) nc.Flush() - acc = s.LookupAccount(pub) + acc, _ = s.LookupAccount(pub) if acc.MaxActiveConnections() != 8 { t.Fatalf("Account was not updated") } @@ -680,7 +684,7 @@ func TestAccountConnsLimitExceededAfterUpdate(t *testing.T) { ajwt, _ := nac.Encode(okp) addAccountToMemResolver(s, pub, ajwt) - acc := s.LookupAccount(pub) + acc, _ := s.LookupAccount(pub) // Now create the max connections. url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) @@ -731,7 +735,7 @@ func TestAccountConnsLimitExceededAfterUpdateDisconnectNewOnly(t *testing.T) { ajwt, _ := nac.Encode(okp) addAccountToMemResolver(s, pub, ajwt) - acc := s.LookupAccount(pub) + acc, _ := s.LookupAccount(pub) // Now create the max connections. // We create half then we will wait and then create the rest. @@ -893,11 +897,11 @@ func TestServerEventStatsZ(t *testing.T) { if m2.Stats.ActiveAccounts != 2 { t.Fatalf("Did not match active accounts of 2, got %d", m2.Stats.ActiveAccounts) } - if m2.Stats.Sent.Msgs != 3 { - t.Fatalf("Did not match sent msgs of 3, got %d", m2.Stats.Sent.Msgs) + if m2.Stats.Sent.Msgs < 3 { + t.Fatalf("Did not match sent msgs of >= 3, got %d", m2.Stats.Sent.Msgs) } - if m2.Stats.Received.Msgs != 1 { - t.Fatalf("Did not match received msgs of 1, got %d", m2.Stats.Received.Msgs) + if m2.Stats.Received.Msgs < 1 { + t.Fatalf("Did not match received msgs of >= 1, got %d", m2.Stats.Received.Msgs) } if lr := len(m2.Stats.Routes); lr != 1 { t.Fatalf("Expected a route, but got %d", lr) @@ -920,11 +924,11 @@ func TestServerEventStatsZ(t *testing.T) { if m3.Stats.ActiveAccounts != 2 { t.Fatalf("Did not match active accounts of 2, got %d", m3.Stats.ActiveAccounts) } - if m3.Stats.Sent.Msgs != 5 { - t.Fatalf("Did not match sent msgs of 5, got %d", m3.Stats.Sent.Msgs) + if m3.Stats.Sent.Msgs < 5 { + t.Fatalf("Did not match sent msgs of >= 5, got %d", m3.Stats.Sent.Msgs) } - if m3.Stats.Received.Msgs != 2 { - t.Fatalf("Did not match received msgs of 2, got %d", m3.Stats.Received.Msgs) + if m3.Stats.Received.Msgs < 2 { + t.Fatalf("Did not match received msgs of >= 2, got %d", m3.Stats.Received.Msgs) } if lr := len(m3.Stats.Routes); lr != 1 { t.Fatalf("Expected a route, but got %d", lr) diff --git a/server/gateway_test.go b/server/gateway_test.go index b8adbe07..c81f82b8 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -1143,7 +1143,7 @@ func TestGatewayDontSendSubInterest(t *testing.T) { } func setAccountUserPassInOptions(o *Options, accName, username, password string) { - acc := &Account{Name: accName} + acc := NewAccount(accName) o.Accounts = append(o.Accounts, acc) o.Users = append(o.Users, &User{Username: username, Password: password, Account: acc}) } diff --git a/server/jwt.go b/server/jwt.go index c747c023..79b51141 100644 --- a/server/jwt.go +++ b/server/jwt.go @@ -81,7 +81,7 @@ func validateTrustedOperators(o *Options) error { return fmt.Errorf("conflicting options for 'TrustedKeys' and 'TrustedOperators'") } // If we have operators, fill in the trusted keys. - // FIXME(dlc) - We had TrustedKeys before TrsutedOperators. The jwt.OperatorClaims + // FIXME(dlc) - We had TrustedKeys before TrustedOperators. The jwt.OperatorClaims // has a DidSign(). Use that longer term. For now we can expand in place. for _, opc := range o.TrustedOperators { if o.TrustedKeys == nil { diff --git a/server/jwt_test.go b/server/jwt_test.go index df632fc7..bc078081 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -483,7 +483,7 @@ func TestJWTAccountRenew(t *testing.T) { // Update the account addAccountToMemResolver(s, apub, ajwt) - acc := s.LookupAccount(apub) + acc, _ := s.LookupAccount(apub) if acc == nil { t.Fatalf("Expected to retrive the account") } @@ -518,7 +518,7 @@ func TestJWTAccountRenewFromResolver(t *testing.T) { addAccountToMemResolver(s, apub, ajwt) // Force it to be loaded by the server and start the expiration timer. - acc := s.LookupAccount(apub) + acc, _ := s.LookupAccount(apub) if acc == nil { t.Fatalf("Could not retrieve account for %q", apub) } @@ -585,7 +585,7 @@ func TestJWTAccountBasicImportExport(t *testing.T) { addAccountToMemResolver(s, fooPub, fooJWT) - acc := s.LookupAccount(fooPub) + acc, _ := s.LookupAccount(fooPub) if acc == nil { t.Fatalf("Expected to retrieve the account") } @@ -622,7 +622,7 @@ func TestJWTAccountBasicImportExport(t *testing.T) { } addAccountToMemResolver(s, barPub, barJWT) - acc = s.LookupAccount(barPub) + acc, _ = s.LookupAccount(barPub) if acc == nil { t.Fatalf("Expected to retrieve the account") } @@ -814,7 +814,7 @@ func TestJWTAccountImportExportUpdates(t *testing.T) { barAC = jwt.NewAccountClaims(barPub) barJWT, _ = barAC.Encode(okp) addAccountToMemResolver(s, barPub, barJWT) - acc := s.LookupAccount(barPub) + acc, _ := s.LookupAccount(barPub) s.updateAccountClaims(acc, barAC) checkShadow(0) @@ -832,8 +832,8 @@ func TestJWTAccountImportExportUpdates(t *testing.T) { fooAC = jwt.NewAccountClaims(fooPub) fooJWT, _ = fooAC.Encode(okp) addAccountToMemResolver(s, fooPub, fooJWT) - s.updateAccountClaims(s.LookupAccount(fooPub), fooAC) - + acc, _ = s.LookupAccount(fooPub) + s.updateAccountClaims(acc, fooAC) checkShadow(0) // Now add it in but with permission required. @@ -841,7 +841,7 @@ func TestJWTAccountImportExportUpdates(t *testing.T) { fooAC.Exports.Add(streamExport) fooJWT, _ = fooAC.Encode(okp) addAccountToMemResolver(s, fooPub, fooJWT) - s.updateAccountClaims(s.LookupAccount(fooPub), fooAC) + s.updateAccountClaims(acc, fooAC) checkShadow(0) @@ -851,7 +851,7 @@ func TestJWTAccountImportExportUpdates(t *testing.T) { fooAC.Exports.Add(streamExport) fooJWT, _ = fooAC.Encode(okp) addAccountToMemResolver(s, fooPub, fooJWT) - s.updateAccountClaims(s.LookupAccount(fooPub), fooAC) + s.updateAccountClaims(acc, fooAC) checkShadow(1) } @@ -877,7 +877,7 @@ func TestJWTAccountImportActivationExpires(t *testing.T) { addAccountToMemResolver(s, fooPub, fooJWT) - acc := s.LookupAccount(fooPub) + acc, _ := s.LookupAccount(fooPub) if acc == nil { t.Fatalf("Expected to retrieve the account") } @@ -978,7 +978,7 @@ func TestJWTAccountLimitsSubs(t *testing.T) { // Check to make sure we have the limit set. // Account first - fooAcc := s.LookupAccount(fooPub) + fooAcc, _ := s.LookupAccount(fooPub) fooAcc.mu.RLock() if fooAcc.msubs != 10 { fooAcc.mu.RUnlock() @@ -1048,7 +1048,7 @@ func TestJWTAccountLimitsSubsButServerOverrides(t *testing.T) { t.Fatalf("Error generating account JWT: %v", err) } addAccountToMemResolver(s, fooPub, fooJWT) - fooAcc := s.LookupAccount(fooPub) + fooAcc, _ := s.LookupAccount(fooPub) fooAcc.mu.RLock() if fooAcc.msubs != 10 { fooAcc.mu.RUnlock() @@ -1121,7 +1121,7 @@ func TestJWTAccountLimitsMaxPayload(t *testing.T) { // Check to make sure we have the limit set. // Account first - fooAcc := s.LookupAccount(fooPub) + fooAcc, _ := s.LookupAccount(fooPub) fooAcc.mu.RLock() if fooAcc.mpay != 8 { fooAcc.mu.RUnlock() @@ -1325,7 +1325,8 @@ func TestJWTAccountServiceImportExpires(t *testing.T) { t.Fatalf("Error generating account JWT: %v", err) } addAccountToMemResolver(s, fooPub, fooJWT) - s.updateAccountClaims(s.LookupAccount(fooPub), fooAC) + acc, _ := s.LookupAccount(fooPub) + s.updateAccountClaims(acc, fooAC) // Send Another Request parseAsyncA("PUB foo 2\r\nhi\r\nPING\r\n") @@ -1356,7 +1357,8 @@ func TestJWTAccountServiceImportExpires(t *testing.T) { t.Fatalf("Error generating account JWT: %v", err) } addAccountToMemResolver(s, barPub, barJWT) - s.updateAccountClaims(s.LookupAccount(barPub), barAC) + acc, _ = s.LookupAccount(barPub) + s.updateAccountClaims(acc, barAC) // Now it should work again. // Send Another Request @@ -1406,7 +1408,7 @@ func TestAccountURLResolver(t *testing.T) { opts.TrustedKeys = []string{pub} defer s.Shutdown() - acc := s.LookupAccount(apub) + acc, _ := s.LookupAccount(apub) if acc == nil { t.Fatalf("Expected to receive an account") } @@ -1450,7 +1452,7 @@ func TestAccountURLResolverTimeout(t *testing.T) { opts.TrustedKeys = []string{pub} defer s.Shutdown() - acc := s.LookupAccount(apub) + acc, _ := s.LookupAccount(apub) if acc != nil { t.Fatalf("Expected to not receive an account due to timeout") } diff --git a/server/opts.go b/server/opts.go index 9b39ca80..a5424075 100644 --- a/server/opts.go +++ b/server/opts.go @@ -532,8 +532,13 @@ func (o *Options) ProcessConfigFile(configFile string) error { items := resolverRe.FindStringSubmatch(str) if len(items) == 2 { url := items[1] + _, err := parseURL(url, "account resolver") + if err != nil { + errors = append(errors, &configErr{tk, err.Error()}) + continue + } if ur, err := NewURLAccResolver(url); err != nil { - err := &configErr{tk, fmt.Sprintf("URL account resolver error: %v", err)} + err := &configErr{tk, err.Error()} errors = append(errors, err) continue } else { @@ -1017,7 +1022,7 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er *errors = append(*errors, err) continue } - opts.Accounts = append(opts.Accounts, &Account{Name: ns}) + opts.Accounts = append(opts.Accounts, NewAccount(ns)) m[ns] = struct{}{} } // More common map entry @@ -1045,7 +1050,7 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er *errors = append(*errors, err) continue } - acc := &Account{Name: aname} + acc := NewAccount(aname) opts.Accounts = append(opts.Accounts, acc) for k, v := range mv { @@ -1958,7 +1963,7 @@ func GenTLSConfig(tc *TLSConfigOpts) (*tls.Config, error) { pool := x509.NewCertPool() ok := pool.AppendCertsFromPEM(rootPEM) if !ok { - return nil, fmt.Errorf("failed to parse root ca certificate") + return nil, fmt.Errorf("Failed to parse root ca certificate") } config.ClientCAs = pool } @@ -2362,7 +2367,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, // flags). fs.Parse(args) } else if opts.CheckConfig { - return nil, fmt.Errorf("must specify [-c, --config] option to check configuration file syntax") + return nil, fmt.Errorf("Must specify [-c, --config] option to check configuration file syntax") } // Special handling of some flags @@ -2502,7 +2507,7 @@ func processSignal(signal string) error { if l := len(commandAndPid); l == 2 { pid = maybeReadPidFile(commandAndPid[1]) } else if l > 2 { - return fmt.Errorf("invalid signal parameters: %v", commandAndPid[2:]) + return fmt.Errorf("Invalid signal parameters: %v", commandAndPid[2:]) } if err := ProcessSignal(Command(commandAndPid[0]), pid); err != nil { return err diff --git a/server/parser_test.go b/server/parser_test.go index 2bb01623..742475bc 100644 --- a/server/parser_test.go +++ b/server/parser_test.go @@ -19,7 +19,7 @@ import ( ) func dummyClient() *client { - return &client{srv: New(&defaultServerOptions)} + return &client{srv: New(&defaultServerOptions), msubs: -1, mpay: -1} } func dummyRouteClient() *client { diff --git a/server/reload.go b/server/reload.go index 75e95a29..c76f4f4f 100644 --- a/server/reload.go +++ b/server/reload.go @@ -245,6 +245,14 @@ func (u *nkeysOption) Apply(server *Server) { server.Noticef("Reloaded: authorization nkey users") } +type trustKeysOption struct { + noopOption +} + +func (u *trustKeysOption) Apply(server *Server) { + server.Noticef("Reloaded: trusted keys") +} + // clusterOption implements the option interface for the `cluster` setting. type clusterOption struct { authOption @@ -497,6 +505,12 @@ func (s *Server) Reload() error { // TODO: Dump previous good config to a .bak file? return err } + + // Wipe trusted keys if needed when we have an operator. + if len(s.opts.TrustedOperators) > 0 && len(s.opts.TrustedKeys) > 0 { + s.opts.TrustedKeys = nil + } + clientOrgPort := s.clientActualPort clusterOrgPort := s.clusterActualPort gatewayOrgPort := s.gatewayActualPort @@ -536,8 +550,8 @@ func (s *Server) reloadOptions(newOpts *Options) error { if err != nil { return err } - // Need to save off previous cluster permissions s.mu.Lock() + // Need to save off previous cluster permissions s.oldClusterPerms = s.opts.Cluster.Permissions s.mu.Unlock() s.setOpts(newOpts) @@ -554,7 +568,6 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { newConfig = reflect.ValueOf(newOpts).Elem() diffOpts = []option{} ) - for i := 0; i < oldConfig.NumField(); i++ { field := oldConfig.Type().Field(i) // field.PkgPath is empty for exported fields, and is not for unexported ones. @@ -637,6 +650,10 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { diffOpts = append(diffOpts, &clientAdvertiseOption{newValue: cliAdv}) case "accounts": diffOpts = append(diffOpts, &accountsOption{}) + case "trustedkeys": + fmt.Printf("newVal is %+v\n", newValue) + fmt.Printf("oldVal is %+v\n", oldValue) + diffOpts = append(diffOpts, &trustKeysOption{}) case "gateway": // Not supported for now, but report warning if configuration of gateway // is actually changed so that user knows that it won't take effect. @@ -717,25 +734,32 @@ func (s *Server) applyOptions(opts []option) { func (s *Server) reloadAuthorization() { s.mu.Lock() + oldAccounts := s.accounts + s.accounts, s.gacc = nil, nil + s.configureAccounts() + s.configureAuthorization() // This map will contain the names of accounts that have their streams // import configuration changed. - awcsti := make(map[string]struct{}, len(s.opts.Accounts)) + awcsti := make(map[string]struct{}, len(s.accounts)) - oldAccounts := s.accounts - s.accounts = make(map[string]*Account) - s.registerAccount(s.gacc) - for _, newAcc := range s.opts.Accounts { + for _, newAcc := range s.accounts { if acc, ok := oldAccounts[newAcc.Name]; ok { // If account exist in latest config, "transfer" the account's - // sublist to the new account object before registering it - // in s.accounts. + // sublist and client map to the new account. acc.mu.RLock() sl := acc.sl + clients := acc.clients acc.mu.RUnlock() newAcc.mu.Lock() newAcc.sl = sl + if len(clients) > 0 { + newAcc.clients = make(map[*client]*client, len(clients)) + for _, c := range clients { + newAcc.clients[c] = c + } + } // Check if current and new config of this account are same // in term of stream imports. if !acc.checkStreamImportsEqual(newAcc) { @@ -743,7 +767,6 @@ func (s *Server) reloadAuthorization() { } newAcc.mu.Unlock() } - s.registerAccount(newAcc) } // Gather clients that changed accounts. We will close them and they // will reconnect, doing the right thing. diff --git a/server/reload_test.go b/server/reload_test.go index 4be04188..04e1cfce 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -2668,7 +2668,7 @@ func TestConfigReloadAccountUsers(t *testing.T) { } // Old account should be gone - if s.LookupAccount("acc_deleted_after_reload") != nil { + if _, err := s.LookupAccount("acc_deleted_after_reload"); err == nil { t.Fatal("old account should be gone") } @@ -2683,7 +2683,7 @@ func TestConfigReloadAccountUsers(t *testing.T) { // being reconnected does not mean that resent of subscriptions // has already been processed. checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { - gAcc := s.LookupAccount(globalAccountName) + gAcc, _ := s.LookupAccount(globalAccountName) gAcc.mu.RLock() n := gAcc.sl.Count() fooMatch := gAcc.sl.Match("foo") @@ -2699,7 +2699,7 @@ func TestConfigReloadAccountUsers(t *testing.T) { return fmt.Errorf("Global account should have baz sub") } - sAcc := s.LookupAccount("synadia") + sAcc, _ := s.LookupAccount("synadia") sAcc.mu.RLock() n = sAcc.sl.Count() barMatch := sAcc.sl.Match("bar") @@ -2711,7 +2711,7 @@ func TestConfigReloadAccountUsers(t *testing.T) { return fmt.Errorf("Synadia account should have bar sub") } - nAcc := s.LookupAccount("nats.io") + nAcc, _ := s.LookupAccount("nats.io") nAcc.mu.RLock() n = nAcc.sl.Count() batMatch := nAcc.sl.Match("bat") @@ -2748,8 +2748,8 @@ func TestConfigReloadAccountNKeyUsers(t *testing.T) { s, _ := RunServerWithConfig(conf) defer s.Shutdown() - synadia := s.LookupAccount("synadia") - nats := s.LookupAccount("nats.io") + synadia, _ := s.LookupAccount("synadia") + nats, _ := s.LookupAccount("nats.io") seed1 := []byte("SUAPM67TC4RHQLKBX55NIQXSMATZDOZK6FNEOSS36CAYA7F7TY66LP4BOM") seed2 := []byte("SUAIS5JPX4X4GJ7EIIJEQ56DH2GWPYJRPWN5XJEDENJOZHCBLI7SEPUQDE") @@ -2854,7 +2854,7 @@ func TestConfigReloadAccountNKeyUsers(t *testing.T) { if ivan.Account != globalAcc { t.Fatalf("Invalid account for user Ivan: %#v", ivan.Account) } - if s.LookupAccount("synadia") != nil { + if _, err := s.LookupAccount("synadia"); err == nil { t.Fatal("Account Synadia should have been removed") } } diff --git a/server/route.go b/server/route.go index 1a3f0181..251a3f4d 100644 --- a/server/route.go +++ b/server/route.go @@ -117,8 +117,9 @@ func (c *client) removeReplySub(sub *subscription) { // Lookup the account based on sub.sid. if i := bytes.Index(sub.sid, []byte(" ")); i > 0 { // First part of SID for route is account name. - acc := c.srv.LookupAccount(string(sub.sid[:i])) - acc.sl.Remove(sub) + if acc, _ := c.srv.LookupAccount(string(sub.sid[:i])); acc != nil { + acc.sl.Remove(sub) + } c.mu.Lock() c.removeReplySubTimeout(sub) c.mu.Unlock() @@ -649,7 +650,7 @@ func (c *client) removeRemoteSubs() { accountName := strings.Fields(key)[0] ase := as[accountName] if ase == nil { - acc := srv.LookupAccount(accountName) + acc, _ := srv.LookupAccount(accountName) if acc == nil { continue } @@ -704,7 +705,7 @@ func (c *client) processRemoteUnsub(arg []byte) (err error) { return fmt.Errorf("processRemoteUnsub %s", err.Error()) } // Lookup the account - acc := c.srv.LookupAccount(accountName) + acc, _ := c.srv.LookupAccount(accountName) if acc == nil { c.Debugf("Unknown account %q for subject %q", accountName, subject) // Mark this account as not interested since we received a RS- and we @@ -773,7 +774,7 @@ func (c *client) processRemoteSub(argo []byte) (err error) { // Lookup the account // FIXME(dlc) - This may start having lots of contention? accountName := string(args[0]) - acc := c.srv.LookupAccount(accountName) + acc, _ := c.srv.LookupAccount(accountName) if acc == nil { if !srv.NewAccountsAllowed() { c.Debugf("Unknown account %q for subject %q", accountName, sub.subject) @@ -796,7 +797,7 @@ func (c *client) processRemoteSub(argo []byte) (err error) { } // Check if we have a maximum on the number of subscriptions. - if c.msubs > 0 && len(c.subs) >= c.msubs { + if c.subsExceeded() { c.mu.Unlock() c.maxSubsExceeded() return nil @@ -1023,7 +1024,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { } } - c := &client{srv: s, nc: conn, opts: clientOpts{}, kind: ROUTER, route: r} + c := &client{srv: s, nc: conn, opts: clientOpts{}, kind: ROUTER, msubs: -1, mpay: -1, route: r} // Grab server variables s.mu.Lock() diff --git a/server/server.go b/server/server.go index de3d6179..65d9a939 100644 --- a/server/server.go +++ b/server/server.go @@ -311,6 +311,13 @@ func (s *Server) setOpts(opts *Options) { s.optsMu.Unlock() } +func (s *Server) globalAccount() *Account { + s.mu.Lock() + gacc := s.gacc + s.mu.Unlock() + return gacc +} + func (s *Server) configureAccounts() error { // Used to setup Accounts. if s.accounts == nil { @@ -318,15 +325,46 @@ func (s *Server) configureAccounts() error { } // Create global account. if s.gacc == nil { - s.gacc = &Account{Name: globalAccountName} + s.gacc = NewAccount(globalAccountName) s.registerAccount(s.gacc) } opts := s.opts - // Check opts and walk through them. Making sure to create SLs. + // Check opts and walk through them. We need to copy them here + // so that we do not keep a real one sitting in the options. for _, acc := range s.opts.Accounts { - s.registerAccount(acc) + a := acc.shallowCopy() + acc.sl = nil + acc.clients = nil + s.registerAccount(a) + } + + // Now that we have this we need to remap any referenced accounts in + // import or export maps to the new ones. + swapApproved := func(ea *exportAuth) { + if ea == nil { + return + } + for sub, a := range ea.approved { + ea.approved[sub] = s.accounts[a.Name] + } + } + for _, acc := range s.accounts { + // Exports + for _, ea := range acc.exports.streams { + swapApproved(ea) + } + for _, ea := range acc.exports.services { + swapApproved(ea) + } + // Imports + for _, si := range acc.imports.streams { + si.acc = s.accounts[si.acc.Name] + } + for _, si := range acc.imports.services { + si.acc = s.accounts[si.acc.Name] + } } // Check for configured account resolvers. @@ -337,6 +375,9 @@ func (s *Server) configureAccounts() error { return fmt.Errorf("Resolver preloads only available for MemAccResolver") } for k, v := range opts.resolverPreloads { + if _, _, err := s.verifyAccountClaims(v); err != nil { + return fmt.Errorf("Preloaded Account: %v", err) + } s.accResolver.Store(k, v) } } @@ -344,8 +385,8 @@ func (s *Server) configureAccounts() error { // Set the system account if it was configured. if opts.SystemAccount != _EMPTY_ { - if acc := s.lookupAccount(opts.SystemAccount); acc == nil { - return ErrMissingAccount + if _, err := s.lookupAccount(opts.SystemAccount); err != nil { + return fmt.Errorf("Error resolving system account: %v", err) } } return nil @@ -425,13 +466,13 @@ func (s *Server) initStampedTrustedKeys() bool { // PrintAndDie is exported for access in other packages. func PrintAndDie(msg string) { - fmt.Fprintf(os.Stderr, "%s\n", msg) + fmt.Fprintln(os.Stderr, msg) os.Exit(1) } // PrintServerAndExit will print our version and exit. func PrintServerAndExit() { - fmt.Printf("nats-server version %s\n", VERSION) + fmt.Printf("nats-server: v%s\n", VERSION) os.Exit(0) } @@ -507,10 +548,7 @@ func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew b if acc, ok := s.accounts[name]; ok { return acc, false } - acc := &Account{ - Name: name, - sl: NewSublist(), - } + acc := NewAccount(name) s.registerAccount(acc) return acc, true } @@ -524,7 +562,7 @@ func (s *Server) RegisterAccount(name string) (*Account, error) { if _, ok := s.accounts[name]; ok { return nil, ErrAccountExists } - acc := &Account{Name: name} + acc := NewAccount(name) s.registerAccount(acc) return acc, nil } @@ -583,7 +621,7 @@ func (s *Server) setSystemAccount(acc *Account) error { s.sys = &internal{ account: acc, - client: &client{srv: s, kind: SYSTEM, opts: defaultOpts, start: time.Now(), last: time.Now()}, + client: &client{srv: s, kind: SYSTEM, opts: defaultOpts, msubs: -1, mpay: -1, start: time.Now(), last: time.Now()}, seq: 1, sid: 1, servers: make(map[string]*serverUpdate), @@ -663,23 +701,28 @@ func (s *Server) registerAccount(acc *Account) { // lookupAccount is a function to return the account structure // associated with an account name. // Lock should be held on entry. -func (s *Server) lookupAccount(name string) *Account { +func (s *Server) lookupAccount(name string) (*Account, error) { acc := s.accounts[name] if acc != nil { // If we are expired and we have a resolver, then // return the latest information from the resolver. if s.accResolver != nil && acc.IsExpired() { - s.updateAccount(acc) + if err := s.updateAccount(acc); err != nil { + return nil, err + } } - return acc + return acc, nil } // If we have a resolver see if it can fetch the account. + if s.accResolver == nil { + return nil, ErrMissingAccount + } return s.fetchAccount(name) } // LookupAccount is a public function to return the account structure // associated with name. -func (s *Server) LookupAccount(name string) *Account { +func (s *Server) LookupAccount(name string) (*Account, error) { s.mu.Lock() defer s.mu.Unlock() return s.lookupAccount(name) @@ -687,48 +730,36 @@ func (s *Server) LookupAccount(name string) *Account { // This will fetch new claims and if found update the account with new claims. // Lock should be held upon entry. -func (s *Server) updateAccount(acc *Account) bool { +func (s *Server) updateAccount(acc *Account) error { // TODO(dlc) - Make configurable if time.Since(acc.updated) < time.Second { s.Debugf("Requested account update for [%s] ignored, too soon", acc.Name) - return false + return ErrAccountResolverUpdateTooSoon } claimJWT, err := s.fetchRawAccountClaims(acc.Name) if err != nil { - return false + return err } - acc.updated = time.Now() - if acc.claimJWT != "" && acc.claimJWT == claimJWT { - s.Debugf("Requested account update for [%s], same claims detected", acc.Name) - return false - } - accClaims, _, err := s.verifyAccountClaims(claimJWT) - if err == nil && accClaims != nil { - acc.claimJWT = claimJWT - s.updateAccountClaims(acc, accClaims) - return true - } - return false + return s.updateAccountWithClaimJWT(acc, claimJWT) } -// updateAccountWithClaimJWT will chack and apply the claim update. -func (s *Server) updateAccountWithClaimJWT(acc *Account, claimJWT string) bool { +// updateAccountWithClaimJWT will check and apply the claim update. +func (s *Server) updateAccountWithClaimJWT(acc *Account, claimJWT string) error { if acc == nil { - return false + return ErrMissingAccount } acc.updated = time.Now() if acc.claimJWT != "" && acc.claimJWT == claimJWT { s.Debugf("Requested account update for [%s], same claims detected", acc.Name) - return false + return ErrAccountResolverSameClaims } accClaims, _, err := s.verifyAccountClaims(claimJWT) if err == nil && accClaims != nil { acc.claimJWT = claimJWT s.updateAccountClaims(acc, accClaims) - return true + return nil } - return false - + return err } // fetchRawAccountClaims will grab raw account claims iff we have a resolver. @@ -780,14 +811,15 @@ func (s *Server) verifyAccountClaims(claimJWT string) (*jwt.AccountClaims, strin // This will fetch an account from a resolver if defined. // Lock should be held upon entry. -func (s *Server) fetchAccount(name string) *Account { - if accClaims, claimJWT, _ := s.fetchAccountClaims(name); accClaims != nil { +func (s *Server) fetchAccount(name string) (*Account, error) { + if accClaims, claimJWT, err := s.fetchAccountClaims(name); accClaims != nil { acc := s.buildInternalAccount(accClaims) acc.claimJWT = claimJWT s.registerAccount(acc) - return acc + return acc, nil + } else { + return nil, err } - return nil } // Start up the server, this will block. @@ -1324,11 +1356,15 @@ func (s *Server) createClient(conn net.Conn) *client { maxPay := int32(opts.MaxPayload) maxSubs := opts.MaxSubs + // For system, maxSubs of 0 means unlimited, so re-adjust here. + if maxSubs == 0 { + maxSubs = -1 + } now := time.Now() c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now} - c.registerWithAccount(s.gacc) + c.registerWithAccount(s.globalAccount()) // Grab JSON info string s.mu.Lock() diff --git a/server/split_test.go b/server/split_test.go index 9abf0744..dbb52d89 100644 --- a/server/split_test.go +++ b/server/split_test.go @@ -28,9 +28,9 @@ func TestSplitBufferSubOp(t *testing.T) { if err != nil { t.Fatalf("Error creating gateways: %v", err) } - s := &Server{gacc: &Account{Name: globalAccountName}, accounts: make(map[string]*Account), gateway: gws} + s := &Server{gacc: NewAccount(globalAccountName), accounts: make(map[string]*Account), gateway: gws} s.registerAccount(s.gacc) - c := &client{srv: s, acc: s.gacc, subs: make(map[string]*subscription), nc: cli} + c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, subs: make(map[string]*subscription), nc: cli} subop := []byte("SUB foo 1\r\n") subop1 := subop[:6] @@ -65,9 +65,9 @@ func TestSplitBufferSubOp(t *testing.T) { } func TestSplitBufferUnsubOp(t *testing.T) { - s := &Server{gacc: &Account{Name: globalAccountName}, accounts: make(map[string]*Account), gateway: &srvGateway{}} + s := &Server{gacc: NewAccount(globalAccountName), accounts: make(map[string]*Account), gateway: &srvGateway{}} s.registerAccount(s.gacc) - c := &client{srv: s, acc: s.gacc, subs: make(map[string]*subscription)} + c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, subs: make(map[string]*subscription)} subop := []byte("SUB foo 1024\r\n") if err := c.parse(subop); err != nil { @@ -100,7 +100,7 @@ func TestSplitBufferUnsubOp(t *testing.T) { } func TestSplitBufferPubOp(t *testing.T) { - c := &client{subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} pub := []byte("PUB foo.bar INBOX.22 11\r\nhello world\r") pub1 := pub[:2] pub2 := pub[2:9] @@ -166,7 +166,7 @@ func TestSplitBufferPubOp(t *testing.T) { } func TestSplitBufferPubOp2(t *testing.T) { - c := &client{subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} pub := []byte("PUB foo.bar INBOX.22 11\r\nhello world\r\n") pub1 := pub[:30] pub2 := pub[30:] @@ -186,7 +186,7 @@ func TestSplitBufferPubOp2(t *testing.T) { } func TestSplitBufferPubOp3(t *testing.T) { - c := &client{subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} pubAll := []byte("PUB foo bar 11\r\nhello world\r\n") pub := pubAll[:16] @@ -212,7 +212,7 @@ func TestSplitBufferPubOp3(t *testing.T) { } func TestSplitBufferPubOp4(t *testing.T) { - c := &client{subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} pubAll := []byte("PUB foo 11\r\nhello world\r\n") pub := pubAll[:12] @@ -238,7 +238,7 @@ func TestSplitBufferPubOp4(t *testing.T) { } func TestSplitBufferPubOp5(t *testing.T) { - c := &client{subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} pubAll := []byte("PUB foo 11\r\nhello world\r\n") // Splits need to be on MSG_END now too, so make sure we check that. @@ -257,7 +257,7 @@ func TestSplitBufferPubOp5(t *testing.T) { } func TestSplitConnectArg(t *testing.T) { - c := &client{subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} connectAll := []byte("CONNECT {\"verbose\":false,\"tls_required\":false," + "\"user\":\"test\",\"pedantic\":true,\"pass\":\"pass\"}\r\n") @@ -306,7 +306,7 @@ func TestSplitConnectArg(t *testing.T) { func TestSplitDanglingArgBuf(t *testing.T) { s := New(&defaultServerOptions) - c := &client{srv: s, acc: s.gacc, subs: make(map[string]*subscription)} + c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, subs: make(map[string]*subscription)} // We test to make sure we do not dangle any argBufs after processing // since that could lead to performance issues. @@ -365,7 +365,7 @@ func TestSplitDanglingArgBuf(t *testing.T) { } // MSG (the client has to be a ROUTE) - c = &client{subs: make(map[string]*subscription), kind: ROUTER} + c = &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription), kind: ROUTER} msgop := []byte("RMSG $foo foo 5\r\nhello\r\n") c.parse(msgop[:5]) c.parse(msgop[5:10]) @@ -445,7 +445,7 @@ func TestSplitRoutedMsgArg(t *testing.T) { } func TestSplitBufferMsgOp(t *testing.T) { - c := &client{subs: make(map[string]*subscription), kind: ROUTER} + c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription), kind: ROUTER} msg := []byte("RMSG $G foo.bar _INBOX.22 11\r\nhello world\r") msg1 := msg[:2] msg2 := msg[2:9] diff --git a/test/configs/resolver_preload.conf b/test/configs/resolver_preload.conf index 1cf910af..5d40c4c3 100644 --- a/test/configs/resolver_preload.conf +++ b/test/configs/resolver_preload.conf @@ -23,4 +23,4 @@ resolver_preload = { ADM2CIIL3RWXBA6T2HW3FODNCQQOUJEHHQD6FKCPVAMHDNTTSMO73ROX: "eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5In0.eyJqdGkiOiJCMk0zTFRMT1ZNRk03REY3U0M3SE9RTzNXUzI2RFhMTURINk0zRzY3RzRXRFdTWExPNlVBIiwiaWF0IjoxNTQzOTU4NzI0LCJpc3MiOiJPQ0FUMzNNVFZVMlZVT0lNR05HVU5YSjY2QUgyUkxTREFGM01VQkNZQVk1UU1JTDY1TlFNNlhRRyIsInN1YiI6IkFETTJDSUlMM1JXWEJBNlQySFczRk9ETkNRUU9VSkVISFFENkZLQ1BWQU1IRE5UVFNNTzczUk9YIiwidHlwZSI6ImFjY291bnQiLCJuYXRzIjp7ImxpbWl0cyI6e319fQ.pvvPmBei_IFEbspHGN5FkWJoSfHk8BVeJCCVODTgul8-xUU8p1fidvsg3sgMvrXqXtmL8SFc68jGQd0nGtk5Dw" -} \ No newline at end of file +} diff --git a/test/gateway_test.go b/test/gateway_test.go index 01222032..d0e0b7e4 100644 --- a/test/gateway_test.go +++ b/test/gateway_test.go @@ -139,7 +139,7 @@ func TestGatewayAccountInterest(t *testing.T) { func TestGatewaySubjectInterest(t *testing.T) { ob := testDefaultOptionsForGateway("B") - fooAcc := &server.Account{Name: "$foo"} + fooAcc := server.NewAccount("$foo") ob.Accounts = []*server.Account{fooAcc} ob.Users = []*server.User{&server.User{Username: "ivan", Password: "password", Account: fooAcc}} sb := runGatewayServer(ob) @@ -275,7 +275,7 @@ func TestGatewaySubjectInterest(t *testing.T) { func TestGatewayQueue(t *testing.T) { ob := testDefaultOptionsForGateway("B") - fooAcc := &server.Account{Name: "$foo"} + fooAcc := server.NewAccount("$foo") ob.Accounts = []*server.Account{fooAcc} ob.Users = []*server.User{&server.User{Username: "ivan", Password: "password", Account: fooAcc}} sb := runGatewayServer(ob) diff --git a/test/operator_test.go b/test/operator_test.go index 9a79b9b0..e692b41c 100644 --- a/test/operator_test.go +++ b/test/operator_test.go @@ -136,7 +136,11 @@ func createAccountForOperatorKey(t *testing.T, s *server.Server, seed []byte) (* if err := s.AccountResolver().Store(pub, jwt); err != nil { t.Fatalf("Account Resolver returned an error: %v", err) } - return s.LookupAccount(pub), akp + acc, err := s.LookupAccount(pub) + if err != nil { + t.Fatalf("Error looking up account: %v", err) + } + return acc, akp } func createAccount(t *testing.T, s *server.Server) (*server.Account, nkeys.KeyPair) { @@ -238,7 +242,7 @@ func TestOperatorMemResolverPreload(t *testing.T) { s, opts := RunServerWithConfig("./configs/resolver_preload.conf") // Make sure we can look up the account. - acc := s.LookupAccount("ADM2CIIL3RWXBA6T2HW3FODNCQQOUJEHHQD6FKCPVAMHDNTTSMO73ROX") + acc, _ := s.LookupAccount("ADM2CIIL3RWXBA6T2HW3FODNCQQOUJEHHQD6FKCPVAMHDNTTSMO73ROX") if acc == nil { t.Fatalf("Expected to properly lookup account") } diff --git a/vendor/github.com/nats-io/jwt/account_claims.go b/vendor/github.com/nats-io/jwt/account_claims.go index 5f804832..d8fd7344 100644 --- a/vendor/github.com/nats-io/jwt/account_claims.go +++ b/vendor/github.com/nats-io/jwt/account_claims.go @@ -6,22 +6,30 @@ import ( "github.com/nats-io/nkeys" ) +// Signifies no limit. +const NoLimit = -1 + // OperatorLimits are used to limit access by an account type OperatorLimits struct { Subs int64 `json:"subs,omitempty"` // Max number of subscriptions Conn int64 `json:"conn,omitempty"` // Max number of active connections Imports int64 `json:"imports,omitempty"` // Max number of imports Exports int64 `json:"exports,omitempty"` // Max number of exports - WildcardExports bool `json:"wildcards,omitempty"` // Are wildcards allowed in exports Data int64 `json:"data,omitempty"` // Max number of bytes Payload int64 `json:"payload,omitempty"` // Max message payload + WildcardExports bool `json:"wildcards,omitempty"` // Are wildcards allowed in exports } -// IsEmpty returns true if all of the limits are 0 +// IsEmpty returns true if all of the limits are 0/false. func (o *OperatorLimits) IsEmpty() bool { return *o == OperatorLimits{} } +// IsUnlimited returns true if all limits are +func (o *OperatorLimits) IsUnlimited() bool { + return *o == OperatorLimits{NoLimit, NoLimit, NoLimit, NoLimit, NoLimit, NoLimit, true} +} + // Validate checks that the operator limits contain valid values func (o *OperatorLimits) Validate(vr *ValidationResults) { // negative values mean unlimited, so all numbers are valid @@ -46,11 +54,27 @@ func (a *Account) Validate(acct *AccountClaims, vr *ValidationResults) { } if !a.Limits.IsEmpty() && a.Limits.Imports >= 0 && int64(len(a.Imports)) > a.Limits.Imports { - vr.AddError("the account contains more imports than allowed by the operator limits") + vr.AddError("the account contains more imports than allowed by the operator") } - if !a.Limits.IsEmpty() && a.Limits.Exports >= 0 && int64(len(a.Exports)) > a.Limits.Exports { - vr.AddError("the account contains more exports than allowed by the operator limits") + // Check Imports and Exports for limit violations. + if a.Limits.Imports != NoLimit { + if int64(len(a.Imports)) > a.Limits.Imports { + vr.AddError("the account contains more imports than allowed by the operator") + } + } + if a.Limits.Exports != NoLimit { + if int64(len(a.Exports)) > a.Limits.Exports { + vr.AddError("the account contains more exports than allowed by the operator") + } + // Check for wildcard restrictions + if !a.Limits.WildcardExports { + for _, ex := range a.Exports { + if ex.Subject.HasWildCards() { + vr.AddError("the account contains wildcard exports that are not allowed by the operator") + } + } + } } } @@ -66,6 +90,9 @@ func NewAccountClaims(subject string) *AccountClaims { return nil } c := &AccountClaims{} + // Set to unlimited to start. We do it this way so we get compiler + // errors if we add to the OperatorLimits. + c.Limits = OperatorLimits{NoLimit, NoLimit, NoLimit, NoLimit, NoLimit, NoLimit, true} c.Subject = subject return c } diff --git a/vendor/github.com/nats-io/jwt/claims.go b/vendor/github.com/nats-io/jwt/claims.go index e060a7d9..ae21e997 100644 --- a/vendor/github.com/nats-io/jwt/claims.go +++ b/vendor/github.com/nats-io/jwt/claims.go @@ -63,12 +63,20 @@ type Prefix struct { nkeys.PrefixByte } +func encodeToString(d []byte) string { + return base64.RawURLEncoding.EncodeToString(d) +} + +func decodeString(s string) ([]byte, error) { + return base64.RawURLEncoding.DecodeString(s) +} + func serialize(v interface{}) (string, error) { j, err := json.Marshal(v) if err != nil { return "", err } - return base64.RawURLEncoding.EncodeToString(j), nil + return encodeToString(j), nil } func (c *ClaimsData) doEncode(header *Header, kp nkeys.KeyPair, claim Claims) (string, error) { @@ -143,7 +151,7 @@ func (c *ClaimsData) doEncode(header *Header, kp nkeys.KeyPair, claim Claims) (s if err != nil { return "", err } - eSig := base64.RawURLEncoding.EncodeToString(sig) + eSig := encodeToString(sig) return fmt.Sprintf("%s.%s.%s", h, payload, eSig), nil } @@ -173,7 +181,7 @@ func (c *ClaimsData) String(claim interface{}) string { } func parseClaims(s string, target Claims) error { - h, err := base64.RawURLEncoding.DecodeString(s) + h, err := decodeString(s) if err != nil { return err } @@ -239,7 +247,7 @@ func Decode(token string, target Claims) error { return err } - sig, err := base64.RawURLEncoding.DecodeString(chunks[2]) + sig, err := decodeString(chunks[2]) if err != nil { return err } diff --git a/vendor/github.com/nats-io/jwt/header.go b/vendor/github.com/nats-io/jwt/header.go index 72ffd8f3..7865d111 100644 --- a/vendor/github.com/nats-io/jwt/header.go +++ b/vendor/github.com/nats-io/jwt/header.go @@ -1,7 +1,6 @@ package jwt import ( - "encoding/base64" "encoding/json" "fmt" "strings" @@ -9,7 +8,7 @@ import ( const ( // Version - Version = "0.0.3" + Version = "0.0.5" // TokenTypeJwt is the JWT token type supported JWT tokens // encoded and decoded by this library @@ -28,7 +27,7 @@ type Header struct { // Parses a header JWT token func parseHeaders(s string) (*Header, error) { - h, err := base64.RawURLEncoding.DecodeString(s) + h, err := decodeString(s) if err != nil { return nil, err } diff --git a/vendor/github.com/nats-io/nkeys/main.go b/vendor/github.com/nats-io/nkeys/main.go index a85b8b46..5b05df50 100644 --- a/vendor/github.com/nats-io/nkeys/main.go +++ b/vendor/github.com/nats-io/nkeys/main.go @@ -20,7 +20,7 @@ import ( ) // Version -const Version = "0.0.1" +const Version = "0.0.2" // Errors var ( diff --git a/vendor/github.com/nats-io/nkeys/nk/main.go b/vendor/github.com/nats-io/nkeys/nk/main.go index 8274e5a6..410668a3 100644 --- a/vendor/github.com/nats-io/nkeys/nk/main.go +++ b/vendor/github.com/nats-io/nkeys/nk/main.go @@ -249,7 +249,7 @@ func createVanityKey(keyType, vanity, entropy string, max int) nkeys.KeyPair { fmt.Fprintf(os.Stderr, "\r\033[mcomputing\033[m %s ", string(spin)) kp := genKeyPair(pre, entropy) pub, _ := kp.PublicKey() - if bytes.HasPrefix([]byte(pub)[1:], []byte(vanity)) { + if strings.HasPrefix(pub[1:], vanity) { fmt.Fprintf(os.Stderr, "\r") return kp } @@ -281,15 +281,6 @@ func readKeyFile(filename string) []byte { return key } -func isValidLeadingByte(c byte) bool { - switch c { - case 'S', 'P', 'N', 'C', 'O', 'A', 'U': - return true - default: - return false - } -} - func wipeSlice(buf []byte) { for i := range buf { buf[i] = 'x' diff --git a/vendor/manifest b/vendor/manifest index 9b7e3e1e..b6883f13 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -5,7 +5,7 @@ "importpath": "github.com/nats-io/jwt", "repository": "https://github.com/nats-io/jwt", "vcs": "git", - "revision": "1f8635961c5122b7410998050fd9616afeb28d92", + "revision": "f26fb76c5f45b9fe3e657a1135d795079cecffac", "branch": "master", "notests": true }, @@ -13,7 +13,7 @@ "importpath": "github.com/nats-io/nkeys", "repository": "https://github.com/nats-io/nkeys", "vcs": "git", - "revision": "cdc461ca8f466aeeea6eca667cde553efe40bc31", + "revision": "1546a3320a8f195a5b5c84aef8309377c2e411d5", "branch": "master", "notests": true },