diff --git a/conf/parse.go b/conf/parse.go index 3383fcd2..a6684f2c 100644 --- a/conf/parse.go +++ b/conf/parse.go @@ -87,7 +87,7 @@ func ParseFile(fp string) (map[string]interface{}, error) { func ParseFileWithChecks(fp string) (map[string]interface{}, error) { data, err := ioutil.ReadFile(fp) if err != nil { - return nil, fmt.Errorf("error opening config file: %v", err) + return nil, err } p, err := parse(string(data), fp, true) diff --git a/main.go b/main.go index db42ea96..a0b7ca1f 100644 --- a/main.go +++ b/main.go @@ -78,8 +78,10 @@ func usage() { } func main() { + exe := "nats-server" + // Create a FlagSet and sets the usage - fs := flag.NewFlagSet("nats-server", flag.ExitOnError) + fs := flag.NewFlagSet(exe, flag.ExitOnError) fs.Usage = usage // Configure the options from the flags/config file @@ -88,16 +90,16 @@ func main() { fs.Usage, server.PrintTLSHelpAndDie) if err != nil { - server.PrintAndDie(fmt.Sprintf("ERR: %s", err)) + server.PrintAndDie(fmt.Sprintf("%s: %s", exe, err)) } else if opts.CheckConfig { - fmt.Fprintf(os.Stderr, "configuration file %s is valid\n", opts.ConfigFile) + fmt.Fprintf(os.Stderr, "nats-server: configuration file %s is valid\n", opts.ConfigFile) os.Exit(0) } // Create the server with appropriate options. s, err := server.NewServer(opts) if err != nil { - server.PrintAndDie(fmt.Sprintf("ERR: %s", err)) + server.PrintAndDie(fmt.Sprintf("%s: %s", exe, err)) } // Configure the logger based on the flags diff --git a/server/accounts.go b/server/accounts.go index 0961503c..10b40183 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -110,6 +110,26 @@ type exportMap struct { services map[string]*exportAuth } +func NewAccount(name string) *Account { + a := &Account{ + Name: name, + sl: NewSublist(), + limits: limits{-1, -1, -1, 0, 0}, + } + return a +} + +// Used to create shallow copies of accounts for transfer +// from opts to real accounts in server struct. +func (a *Account) shallowCopy() *Account { + na := NewAccount(a.Name) + na.Nkey = a.Nkey + na.Issuer = a.Issuer + na.imports = a.imports + na.exports = a.exports + return na +} + // NumClients returns active number of clients for this account for // all known servers. func (a *Account) NumConnections() int { @@ -134,7 +154,7 @@ func (a *Account) MaxTotalConnectionsReached() bool { } func (a *Account) maxTotalConnectionsReached() bool { - if a.mconns != 0 { + if a.mconns != jwt.NoLimit { return len(a.clients)+a.nrclients >= a.mconns } return false @@ -805,8 +825,8 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { } a.checkExpiration(ac.Claims()) - // Clone to update - old := &Account{Name: a.Name, imports: a.imports, exports: a.exports} + // Clone to update, only select certain fields. + old := &Account{Name: a.Name, imports: a.imports, exports: a.exports, limits: a.limits} // Reset exports and imports here. a.exports = exportMap{} @@ -837,7 +857,7 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { for _, i := range ac.Imports { acc := s.accounts[i.Account] if acc == nil { - if acc = s.fetchAccount(i.Account); acc == nil { + if acc, _ = s.fetchAccount(i.Account); acc == nil { s.Debugf("Can't locate account [%s] for import of [%v] %s", i.Account, i.Subject, i.Type) continue } @@ -907,7 +927,7 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { }) } for i, c := range clients { - if a.mconns > 0 && i >= a.mconns { + if a.mconns != jwt.NoLimit && i >= a.mconns { c.maxAccountConnExceeded() continue } @@ -919,7 +939,8 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) { // Helper to build an internal account structure from a jwt.AccountClaims. func (s *Server) buildInternalAccount(ac *jwt.AccountClaims) *Account { - acc := &Account{Name: ac.Subject, Issuer: ac.Issuer} + acc := NewAccount(ac.Subject) + acc.Issuer = ac.Issuer s.updateAccountClaims(acc, ac) return acc } @@ -1004,8 +1025,12 @@ func NewURLAccResolver(url string) (*URLAccResolver, error) { func (ur *URLAccResolver) Fetch(name string) (string, error) { url := ur.url + name resp, err := ur.c.Get(url) - if err != nil || resp == nil || resp.StatusCode != http.StatusOK { - return _EMPTY_, fmt.Errorf("URL(%q) returned error", url) + if err != nil { + return _EMPTY_, fmt.Errorf("Could not fetch <%q>: %v", url, err) + } else if resp == nil { + return _EMPTY_, fmt.Errorf("Could not fetch <%q>: no response", url) + } else if resp.StatusCode != http.StatusOK { + return _EMPTY_, fmt.Errorf("Could not fetch <%q>: %v", url, resp.Status) } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) @@ -1017,5 +1042,5 @@ func (ur *URLAccResolver) Fetch(name string) (string, error) { // Store is not implemented for URL Resolver. func (ur *URLAccResolver) Store(name, jwt string) error { - return fmt.Errorf("Store operation not supported on URL Resolver") + return fmt.Errorf("Store operation not supported for URL Resolver") } diff --git a/server/accounts_test.go b/server/accounts_test.go index af76546c..65e06def 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -102,10 +102,7 @@ func TestAccountIsolation(t *testing.T) { func TestAccountFromOptions(t *testing.T) { opts := defaultServerOptions - opts.Accounts = []*Account{ - &Account{Name: "foo"}, - &Account{Name: "bar"}, - } + opts.Accounts = []*Account{NewAccount("foo"), NewAccount("bar")} s := New(&opts) ta := s.numReservedAccounts() + 2 @@ -113,8 +110,8 @@ func TestAccountFromOptions(t *testing.T) { t.Fatalf("Expected to have a server with %d total accounts, got %v", ta, la) } // Check that sl is filled in. - fooAcc := s.LookupAccount("foo") - barAcc := s.LookupAccount("bar") + fooAcc, _ := s.LookupAccount("foo") + barAcc, _ := s.LookupAccount("bar") if fooAcc == nil || barAcc == nil { t.Fatalf("Error retrieving accounts for 'foo' and 'bar'") } @@ -192,8 +189,8 @@ func TestActiveAccounts(t *testing.T) { } // Make sure the Accounts track clients. - foo := s.LookupAccount("foo") - bar := s.LookupAccount("bar") + foo, _ := s.LookupAccount("foo") + bar, _ := s.LookupAccount("bar") if foo == nil || bar == nil { t.Fatalf("Error looking up accounts") } @@ -1188,8 +1185,8 @@ func TestAccountMapsUsers(t *testing.T) { t.Fatalf("Unexpected error parsing config file: %v", err) } s := New(opts) - synadia := s.LookupAccount("synadia") - nats := s.LookupAccount("nats") + synadia, _ := s.LookupAccount("synadia") + nats, _ := s.LookupAccount("nats") if synadia == nil || nats == nil { t.Fatalf("Expected non nil accounts during lookup") @@ -1276,7 +1273,7 @@ func TestAccountGlobalDefault(t *testing.T) { opts := defaultServerOptions s := New(&opts) - if acc := s.LookupAccount(globalAccountName); acc == nil { + if acc, _ := s.LookupAccount(globalAccountName); acc == nil { t.Fatalf("Expected a global default account on a new server, got none.") } // Make sure we can not create one with same name.. @@ -1295,16 +1292,16 @@ func TestAccountGlobalDefault(t *testing.T) { func TestAccountCheckStreamImportsEqual(t *testing.T) { // Create bare accounts for this test - fooAcc := &Account{Name: "foo"} + fooAcc := NewAccount("foo") if err := fooAcc.AddStreamExport(">", nil); err != nil { t.Fatalf("Error adding stream export: %v", err) } - barAcc := &Account{Name: "bar"} + barAcc := NewAccount("bar") if err := barAcc.AddStreamImport(fooAcc, "foo", "myPrefix"); err != nil { t.Fatalf("Error adding stream import: %v", err) } - bazAcc := &Account{Name: "baz"} + bazAcc := NewAccount("baz") if err := bazAcc.AddStreamImport(fooAcc, "foo", "myPrefix"); err != nil { t.Fatalf("Error adding stream import: %v", err) } @@ -1327,11 +1324,11 @@ func TestAccountCheckStreamImportsEqual(t *testing.T) { // Create another account that is named "foo". We want to make sure // that the comparison still works (based on account name, not pointer) - newFooAcc := &Account{Name: "foo"} + newFooAcc := NewAccount("foo") if err := newFooAcc.AddStreamExport(">", nil); err != nil { t.Fatalf("Error adding stream export: %v", err) } - batAcc := &Account{Name: "bat"} + batAcc := NewAccount("bat") if err := batAcc.AddStreamImport(newFooAcc, "foo", "myPrefix"); err != nil { t.Fatalf("Error adding stream import: %v", err) } @@ -1346,15 +1343,15 @@ func TestAccountCheckStreamImportsEqual(t *testing.T) { } // Test with account with different "from" - expAcc := &Account{Name: "new_acc"} + expAcc := NewAccount("new_acc") if err := expAcc.AddStreamExport(">", nil); err != nil { t.Fatalf("Error adding stream export: %v", err) } - aAcc := &Account{Name: "a"} + aAcc := NewAccount("a") if err := aAcc.AddStreamImport(expAcc, "bar", ""); err != nil { t.Fatalf("Error adding stream import: %v", err) } - bAcc := &Account{Name: "b"} + bAcc := NewAccount("b") if err := bAcc.AddStreamImport(expAcc, "baz", ""); err != nil { t.Fatalf("Error adding stream import: %v", err) } @@ -1363,11 +1360,11 @@ func TestAccountCheckStreamImportsEqual(t *testing.T) { } // Test with account with different "prefix" - aAcc = &Account{Name: "a"} + aAcc = NewAccount("a") if err := aAcc.AddStreamImport(expAcc, "bar", "prefix"); err != nil { t.Fatalf("Error adding stream import: %v", err) } - bAcc = &Account{Name: "b"} + bAcc = NewAccount("b") if err := bAcc.AddStreamImport(expAcc, "bar", "diff_prefix"); err != nil { t.Fatalf("Error adding stream import: %v", err) } @@ -1376,11 +1373,11 @@ func TestAccountCheckStreamImportsEqual(t *testing.T) { } // Test with account with different "name" - expAcc = &Account{Name: "diff_name"} + expAcc = NewAccount("diff_name") if err := expAcc.AddStreamExport(">", nil); err != nil { t.Fatalf("Error adding stream export: %v", err) } - bAcc = &Account{Name: "b"} + bAcc = NewAccount("b") if err := bAcc.AddStreamImport(expAcc, "bar", "prefix"); err != nil { t.Fatalf("Error adding stream import: %v", err) } diff --git a/server/auth.go b/server/auth.go index 5da5c651..81665a92 100644 --- a/server/auth.go +++ b/server/auth.go @@ -207,13 +207,21 @@ func (s *Server) configureAuthorization() { if opts.Nkeys != nil { s.nkeys = make(map[string]*NkeyUser) for _, u := range opts.Nkeys { - s.nkeys[u.Nkey] = u + copy := u.clone() + if u.Account != nil { + copy.Account = s.accounts[u.Account.Name] + } + s.nkeys[u.Nkey] = copy } } if opts.Users != nil { s.users = make(map[string]*User) for _, u := range opts.Users { - s.users[u.Username] = u + copy := u.clone() + if u.Account != nil { + copy.Account = s.accounts[u.Account.Name] + } + s.users[u.Username] = copy } } s.assignGlobalAccountToOrphanUsers() @@ -319,7 +327,7 @@ func (s *Server) isClientAuthorized(c *client) bool { // If we have a jwt and a userClaim, make sure we have the Account, etc associated. // We need to look up the account. This will use an account resolver if one is present. if juc != nil { - if acc = s.LookupAccount(juc.Issuer); acc == nil { + if acc, _ = s.LookupAccount(juc.Issuer); acc == nil { c.Debugf("Account JWT can not be found") return false } diff --git a/server/client.go b/server/client.go index 259eacd8..56e9161f 100644 --- a/server/client.go +++ b/server/client.go @@ -430,6 +430,16 @@ func (c *client) registerWithAccount(acc *Account) error { return nil } +// Helper to determine if we have exceeded max subs. +func (c *client) subsExceeded() bool { + return c.msubs != jwt.NoLimit && len(c.subs) > c.msubs +} + +// Helper to determine if we have met or exceeded max subs. +func (c *client) subsAtLimit() bool { + return c.msubs != jwt.NoLimit && len(c.subs) >= c.msubs +} + // Apply account limits // Lock is held on entry. // FIXME(dlc) - Should server be able to override here? @@ -437,29 +447,30 @@ func (c *client) applyAccountLimits() { if c.acc == nil { return } - // Set here, check for more details below. Only set if non-zero. - if c.acc.msubs > 0 { + + // Set here, will need to fo checks for NoLimit. + if c.acc.msubs != jwt.NoLimit { c.msubs = c.acc.msubs } - if c.acc.mpay > 0 { + if c.acc.mpay != jwt.NoLimit { c.mpay = c.acc.mpay } opts := c.srv.getOpts() // We check here if the server has an option set that is lower than the account limit. - if c.mpay != 0 && opts.MaxPayload != 0 && int32(opts.MaxPayload) < c.acc.mpay { + if c.mpay != jwt.NoLimit && opts.MaxPayload != 0 && int32(opts.MaxPayload) < c.acc.mpay { c.Errorf("Max Payload set to %d from server config which overrides %d from account claims", opts.MaxPayload, c.acc.mpay) c.mpay = int32(opts.MaxPayload) } // We check here if the server has an option set that is lower than the account limit. - if c.msubs != 0 && opts.MaxSubs != 0 && opts.MaxSubs < c.acc.msubs { + if c.msubs != jwt.NoLimit && opts.MaxSubs != 0 && opts.MaxSubs < c.acc.msubs { c.Errorf("Max Subscriptions set to %d from server config which overrides %d from account claims", opts.MaxSubs, c.acc.msubs) c.msubs = opts.MaxSubs } - if c.msubs > 0 && len(c.subs) > c.msubs { + if c.subsExceeded() { go func() { c.maxSubsExceeded() time.Sleep(20 * time.Millisecond) @@ -1041,13 +1052,14 @@ func (c *client) processConnect(arg []byte) error { if account != "" { var acc *Account var wasNew bool + var err error if !srv.NewAccountsAllowed() { - acc = srv.LookupAccount(account) - if acc == nil { - c.Errorf(ErrMissingAccount.Error()) + acc, err = srv.LookupAccount(account) + if err != nil { + c.Errorf(err.Error()) c.sendErr("Account Not Found") - return ErrMissingAccount - } else if accountNew { + return err + } else if accountNew && acc != nil { c.Errorf(ErrAccountExists.Error()) c.sendErr(ErrAccountExists.Error()) return ErrAccountExists @@ -1413,7 +1425,7 @@ func (c *client) processPub(trace bool, arg []byte) error { return fmt.Errorf("processPub Bad or Missing Size: '%s'", arg) } maxPayload := atomic.LoadInt32(&c.mpay) - if maxPayload > 0 && int32(c.pa.size) > maxPayload { + if maxPayload != jwt.NoLimit && int32(c.pa.size) > maxPayload { c.maxPayloadViolation(c.pa.size, maxPayload) return ErrMaxPayload } @@ -1497,7 +1509,7 @@ func (c *client) processSub(argo []byte) (err error) { } // Check if we have a maximum on the number of subscriptions. - if c.msubs > 0 && len(c.subs) >= c.msubs { + if c.subsAtLimit() { c.mu.Unlock() c.maxSubsExceeded() return nil @@ -2480,7 +2492,9 @@ func (c *client) clearConnection(reason ClosedState) { c.flushOutbound() // Clear outbound here. - c.out.sg.Broadcast() + if c.out.sg != nil { + c.out.sg.Broadcast() + } // With TLS, Close() is sending an alert (that is doing a write). // Need to set a deadline otherwise the server could block there @@ -2791,8 +2805,7 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) { if !ok { // Match correct account and sublist. - acc = c.srv.LookupAccount(string(c.pa.account)) - if acc == nil { + if acc, _ = c.srv.LookupAccount(string(c.pa.account)); acc == nil { return nil, nil } diff --git a/server/errors.go b/server/errors.go index 80b63a9b..c8e46c89 100644 --- a/server/errors.go +++ b/server/errors.go @@ -81,6 +81,12 @@ var ( // ErrNoAccountResolver is returned when we attempt an update but do not have an account resolver. ErrNoAccountResolver = errors.New("Account Resolver Missing") + // ErrAccountResolverUpdateTooSoon is returned when we attempt an update too soon to last request. + ErrAccountResolverUpdateTooSoon = errors.New("Account Resolver Update Too Soon") + + // ErrAccountResolverSameClaims is returned when same claims have been fetched. + ErrAccountResolverSameClaims = errors.New("Account Resolver No New Claims") + // ErrStreamImportAuthorization is returned when a stream import is not authorized. ErrStreamImportAuthorization = errors.New("Stream Import Not Authorized") diff --git a/server/events.go b/server/events.go index f50891c0..4c7607e0 100644 --- a/server/events.go +++ b/server/events.go @@ -539,7 +539,7 @@ func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []by s.sys.client.Errorf("Error unmarshalling account connections request message: %v", err) return } - acc := s.lookupAccount(m.Account) + acc, _ := s.lookupAccount(m.Account) if acc == nil { return } @@ -576,7 +576,7 @@ func (s *Server) remoteConnsUpdate(sub *subscription, subject, reply string, msg return } // See if we have the account registered, if not drop it. - acc := s.lookupAccount(m.Account) + acc, _ := s.lookupAccount(m.Account) if acc == nil { s.sys.client.Debugf("Received account connection event for unknown account: %s", m.Account) return diff --git a/server/events_test.go b/server/events_test.go index 16230b0c..dd077864 100644 --- a/server/events_test.go +++ b/server/events_test.go @@ -36,7 +36,11 @@ func createAccount(s *Server) (*Account, nkeys.KeyPair) { nac := jwt.NewAccountClaims(pub) jwt, _ := nac.Encode(okp) addAccountToMemResolver(s, pub, jwt) - return s.LookupAccount(pub), akp + acc, err := s.LookupAccount(pub) + if err != nil { + panic(err) + } + return acc, akp } func createUserCreds(t *testing.T, s *Server, akp nkeys.KeyPair) nats.Option { @@ -631,7 +635,7 @@ func TestAccountClaimsUpdates(t *testing.T) { addAccountToMemResolver(s, pub, ajwt) - acc := s.LookupAccount(pub) + acc, _ := s.LookupAccount(pub) if acc.MaxActiveConnections() != 4 { t.Fatalf("Expected to see a limit of 4 connections") } @@ -658,7 +662,7 @@ func TestAccountClaimsUpdates(t *testing.T) { nc.Publish(claimUpdateSubj, []byte(ajwt)) nc.Flush() - acc = s.LookupAccount(pub) + acc, _ = s.LookupAccount(pub) if acc.MaxActiveConnections() != 8 { t.Fatalf("Account was not updated") } @@ -680,7 +684,7 @@ func TestAccountConnsLimitExceededAfterUpdate(t *testing.T) { ajwt, _ := nac.Encode(okp) addAccountToMemResolver(s, pub, ajwt) - acc := s.LookupAccount(pub) + acc, _ := s.LookupAccount(pub) // Now create the max connections. url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port) @@ -731,7 +735,7 @@ func TestAccountConnsLimitExceededAfterUpdateDisconnectNewOnly(t *testing.T) { ajwt, _ := nac.Encode(okp) addAccountToMemResolver(s, pub, ajwt) - acc := s.LookupAccount(pub) + acc, _ := s.LookupAccount(pub) // Now create the max connections. // We create half then we will wait and then create the rest. @@ -893,11 +897,11 @@ func TestServerEventStatsZ(t *testing.T) { if m2.Stats.ActiveAccounts != 2 { t.Fatalf("Did not match active accounts of 2, got %d", m2.Stats.ActiveAccounts) } - if m2.Stats.Sent.Msgs != 3 { - t.Fatalf("Did not match sent msgs of 3, got %d", m2.Stats.Sent.Msgs) + if m2.Stats.Sent.Msgs < 3 { + t.Fatalf("Did not match sent msgs of >= 3, got %d", m2.Stats.Sent.Msgs) } - if m2.Stats.Received.Msgs != 1 { - t.Fatalf("Did not match received msgs of 1, got %d", m2.Stats.Received.Msgs) + if m2.Stats.Received.Msgs < 1 { + t.Fatalf("Did not match received msgs of >= 1, got %d", m2.Stats.Received.Msgs) } if lr := len(m2.Stats.Routes); lr != 1 { t.Fatalf("Expected a route, but got %d", lr) @@ -920,11 +924,11 @@ func TestServerEventStatsZ(t *testing.T) { if m3.Stats.ActiveAccounts != 2 { t.Fatalf("Did not match active accounts of 2, got %d", m3.Stats.ActiveAccounts) } - if m3.Stats.Sent.Msgs != 5 { - t.Fatalf("Did not match sent msgs of 5, got %d", m3.Stats.Sent.Msgs) + if m3.Stats.Sent.Msgs < 5 { + t.Fatalf("Did not match sent msgs of >= 5, got %d", m3.Stats.Sent.Msgs) } - if m3.Stats.Received.Msgs != 2 { - t.Fatalf("Did not match received msgs of 2, got %d", m3.Stats.Received.Msgs) + if m3.Stats.Received.Msgs < 2 { + t.Fatalf("Did not match received msgs of >= 2, got %d", m3.Stats.Received.Msgs) } if lr := len(m3.Stats.Routes); lr != 1 { t.Fatalf("Expected a route, but got %d", lr) diff --git a/server/gateway_test.go b/server/gateway_test.go index b8adbe07..c81f82b8 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -1143,7 +1143,7 @@ func TestGatewayDontSendSubInterest(t *testing.T) { } func setAccountUserPassInOptions(o *Options, accName, username, password string) { - acc := &Account{Name: accName} + acc := NewAccount(accName) o.Accounts = append(o.Accounts, acc) o.Users = append(o.Users, &User{Username: username, Password: password, Account: acc}) } diff --git a/server/jwt.go b/server/jwt.go index c747c023..79b51141 100644 --- a/server/jwt.go +++ b/server/jwt.go @@ -81,7 +81,7 @@ func validateTrustedOperators(o *Options) error { return fmt.Errorf("conflicting options for 'TrustedKeys' and 'TrustedOperators'") } // If we have operators, fill in the trusted keys. - // FIXME(dlc) - We had TrustedKeys before TrsutedOperators. The jwt.OperatorClaims + // FIXME(dlc) - We had TrustedKeys before TrustedOperators. The jwt.OperatorClaims // has a DidSign(). Use that longer term. For now we can expand in place. for _, opc := range o.TrustedOperators { if o.TrustedKeys == nil { diff --git a/server/jwt_test.go b/server/jwt_test.go index df632fc7..bc078081 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -483,7 +483,7 @@ func TestJWTAccountRenew(t *testing.T) { // Update the account addAccountToMemResolver(s, apub, ajwt) - acc := s.LookupAccount(apub) + acc, _ := s.LookupAccount(apub) if acc == nil { t.Fatalf("Expected to retrive the account") } @@ -518,7 +518,7 @@ func TestJWTAccountRenewFromResolver(t *testing.T) { addAccountToMemResolver(s, apub, ajwt) // Force it to be loaded by the server and start the expiration timer. - acc := s.LookupAccount(apub) + acc, _ := s.LookupAccount(apub) if acc == nil { t.Fatalf("Could not retrieve account for %q", apub) } @@ -585,7 +585,7 @@ func TestJWTAccountBasicImportExport(t *testing.T) { addAccountToMemResolver(s, fooPub, fooJWT) - acc := s.LookupAccount(fooPub) + acc, _ := s.LookupAccount(fooPub) if acc == nil { t.Fatalf("Expected to retrieve the account") } @@ -622,7 +622,7 @@ func TestJWTAccountBasicImportExport(t *testing.T) { } addAccountToMemResolver(s, barPub, barJWT) - acc = s.LookupAccount(barPub) + acc, _ = s.LookupAccount(barPub) if acc == nil { t.Fatalf("Expected to retrieve the account") } @@ -814,7 +814,7 @@ func TestJWTAccountImportExportUpdates(t *testing.T) { barAC = jwt.NewAccountClaims(barPub) barJWT, _ = barAC.Encode(okp) addAccountToMemResolver(s, barPub, barJWT) - acc := s.LookupAccount(barPub) + acc, _ := s.LookupAccount(barPub) s.updateAccountClaims(acc, barAC) checkShadow(0) @@ -832,8 +832,8 @@ func TestJWTAccountImportExportUpdates(t *testing.T) { fooAC = jwt.NewAccountClaims(fooPub) fooJWT, _ = fooAC.Encode(okp) addAccountToMemResolver(s, fooPub, fooJWT) - s.updateAccountClaims(s.LookupAccount(fooPub), fooAC) - + acc, _ = s.LookupAccount(fooPub) + s.updateAccountClaims(acc, fooAC) checkShadow(0) // Now add it in but with permission required. @@ -841,7 +841,7 @@ func TestJWTAccountImportExportUpdates(t *testing.T) { fooAC.Exports.Add(streamExport) fooJWT, _ = fooAC.Encode(okp) addAccountToMemResolver(s, fooPub, fooJWT) - s.updateAccountClaims(s.LookupAccount(fooPub), fooAC) + s.updateAccountClaims(acc, fooAC) checkShadow(0) @@ -851,7 +851,7 @@ func TestJWTAccountImportExportUpdates(t *testing.T) { fooAC.Exports.Add(streamExport) fooJWT, _ = fooAC.Encode(okp) addAccountToMemResolver(s, fooPub, fooJWT) - s.updateAccountClaims(s.LookupAccount(fooPub), fooAC) + s.updateAccountClaims(acc, fooAC) checkShadow(1) } @@ -877,7 +877,7 @@ func TestJWTAccountImportActivationExpires(t *testing.T) { addAccountToMemResolver(s, fooPub, fooJWT) - acc := s.LookupAccount(fooPub) + acc, _ := s.LookupAccount(fooPub) if acc == nil { t.Fatalf("Expected to retrieve the account") } @@ -978,7 +978,7 @@ func TestJWTAccountLimitsSubs(t *testing.T) { // Check to make sure we have the limit set. // Account first - fooAcc := s.LookupAccount(fooPub) + fooAcc, _ := s.LookupAccount(fooPub) fooAcc.mu.RLock() if fooAcc.msubs != 10 { fooAcc.mu.RUnlock() @@ -1048,7 +1048,7 @@ func TestJWTAccountLimitsSubsButServerOverrides(t *testing.T) { t.Fatalf("Error generating account JWT: %v", err) } addAccountToMemResolver(s, fooPub, fooJWT) - fooAcc := s.LookupAccount(fooPub) + fooAcc, _ := s.LookupAccount(fooPub) fooAcc.mu.RLock() if fooAcc.msubs != 10 { fooAcc.mu.RUnlock() @@ -1121,7 +1121,7 @@ func TestJWTAccountLimitsMaxPayload(t *testing.T) { // Check to make sure we have the limit set. // Account first - fooAcc := s.LookupAccount(fooPub) + fooAcc, _ := s.LookupAccount(fooPub) fooAcc.mu.RLock() if fooAcc.mpay != 8 { fooAcc.mu.RUnlock() @@ -1325,7 +1325,8 @@ func TestJWTAccountServiceImportExpires(t *testing.T) { t.Fatalf("Error generating account JWT: %v", err) } addAccountToMemResolver(s, fooPub, fooJWT) - s.updateAccountClaims(s.LookupAccount(fooPub), fooAC) + acc, _ := s.LookupAccount(fooPub) + s.updateAccountClaims(acc, fooAC) // Send Another Request parseAsyncA("PUB foo 2\r\nhi\r\nPING\r\n") @@ -1356,7 +1357,8 @@ func TestJWTAccountServiceImportExpires(t *testing.T) { t.Fatalf("Error generating account JWT: %v", err) } addAccountToMemResolver(s, barPub, barJWT) - s.updateAccountClaims(s.LookupAccount(barPub), barAC) + acc, _ = s.LookupAccount(barPub) + s.updateAccountClaims(acc, barAC) // Now it should work again. // Send Another Request @@ -1406,7 +1408,7 @@ func TestAccountURLResolver(t *testing.T) { opts.TrustedKeys = []string{pub} defer s.Shutdown() - acc := s.LookupAccount(apub) + acc, _ := s.LookupAccount(apub) if acc == nil { t.Fatalf("Expected to receive an account") } @@ -1450,7 +1452,7 @@ func TestAccountURLResolverTimeout(t *testing.T) { opts.TrustedKeys = []string{pub} defer s.Shutdown() - acc := s.LookupAccount(apub) + acc, _ := s.LookupAccount(apub) if acc != nil { t.Fatalf("Expected to not receive an account due to timeout") } diff --git a/server/opts.go b/server/opts.go index 9b39ca80..a5424075 100644 --- a/server/opts.go +++ b/server/opts.go @@ -532,8 +532,13 @@ func (o *Options) ProcessConfigFile(configFile string) error { items := resolverRe.FindStringSubmatch(str) if len(items) == 2 { url := items[1] + _, err := parseURL(url, "account resolver") + if err != nil { + errors = append(errors, &configErr{tk, err.Error()}) + continue + } if ur, err := NewURLAccResolver(url); err != nil { - err := &configErr{tk, fmt.Sprintf("URL account resolver error: %v", err)} + err := &configErr{tk, err.Error()} errors = append(errors, err) continue } else { @@ -1017,7 +1022,7 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er *errors = append(*errors, err) continue } - opts.Accounts = append(opts.Accounts, &Account{Name: ns}) + opts.Accounts = append(opts.Accounts, NewAccount(ns)) m[ns] = struct{}{} } // More common map entry @@ -1045,7 +1050,7 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er *errors = append(*errors, err) continue } - acc := &Account{Name: aname} + acc := NewAccount(aname) opts.Accounts = append(opts.Accounts, acc) for k, v := range mv { @@ -1958,7 +1963,7 @@ func GenTLSConfig(tc *TLSConfigOpts) (*tls.Config, error) { pool := x509.NewCertPool() ok := pool.AppendCertsFromPEM(rootPEM) if !ok { - return nil, fmt.Errorf("failed to parse root ca certificate") + return nil, fmt.Errorf("Failed to parse root ca certificate") } config.ClientCAs = pool } @@ -2362,7 +2367,7 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, // flags). fs.Parse(args) } else if opts.CheckConfig { - return nil, fmt.Errorf("must specify [-c, --config] option to check configuration file syntax") + return nil, fmt.Errorf("Must specify [-c, --config] option to check configuration file syntax") } // Special handling of some flags @@ -2502,7 +2507,7 @@ func processSignal(signal string) error { if l := len(commandAndPid); l == 2 { pid = maybeReadPidFile(commandAndPid[1]) } else if l > 2 { - return fmt.Errorf("invalid signal parameters: %v", commandAndPid[2:]) + return fmt.Errorf("Invalid signal parameters: %v", commandAndPid[2:]) } if err := ProcessSignal(Command(commandAndPid[0]), pid); err != nil { return err diff --git a/server/parser_test.go b/server/parser_test.go index 2bb01623..742475bc 100644 --- a/server/parser_test.go +++ b/server/parser_test.go @@ -19,7 +19,7 @@ import ( ) func dummyClient() *client { - return &client{srv: New(&defaultServerOptions)} + return &client{srv: New(&defaultServerOptions), msubs: -1, mpay: -1} } func dummyRouteClient() *client { diff --git a/server/reload.go b/server/reload.go index 75e95a29..c76f4f4f 100644 --- a/server/reload.go +++ b/server/reload.go @@ -245,6 +245,14 @@ func (u *nkeysOption) Apply(server *Server) { server.Noticef("Reloaded: authorization nkey users") } +type trustKeysOption struct { + noopOption +} + +func (u *trustKeysOption) Apply(server *Server) { + server.Noticef("Reloaded: trusted keys") +} + // clusterOption implements the option interface for the `cluster` setting. type clusterOption struct { authOption @@ -497,6 +505,12 @@ func (s *Server) Reload() error { // TODO: Dump previous good config to a .bak file? return err } + + // Wipe trusted keys if needed when we have an operator. + if len(s.opts.TrustedOperators) > 0 && len(s.opts.TrustedKeys) > 0 { + s.opts.TrustedKeys = nil + } + clientOrgPort := s.clientActualPort clusterOrgPort := s.clusterActualPort gatewayOrgPort := s.gatewayActualPort @@ -536,8 +550,8 @@ func (s *Server) reloadOptions(newOpts *Options) error { if err != nil { return err } - // Need to save off previous cluster permissions s.mu.Lock() + // Need to save off previous cluster permissions s.oldClusterPerms = s.opts.Cluster.Permissions s.mu.Unlock() s.setOpts(newOpts) @@ -554,7 +568,6 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { newConfig = reflect.ValueOf(newOpts).Elem() diffOpts = []option{} ) - for i := 0; i < oldConfig.NumField(); i++ { field := oldConfig.Type().Field(i) // field.PkgPath is empty for exported fields, and is not for unexported ones. @@ -637,6 +650,10 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) { diffOpts = append(diffOpts, &clientAdvertiseOption{newValue: cliAdv}) case "accounts": diffOpts = append(diffOpts, &accountsOption{}) + case "trustedkeys": + fmt.Printf("newVal is %+v\n", newValue) + fmt.Printf("oldVal is %+v\n", oldValue) + diffOpts = append(diffOpts, &trustKeysOption{}) case "gateway": // Not supported for now, but report warning if configuration of gateway // is actually changed so that user knows that it won't take effect. @@ -717,25 +734,32 @@ func (s *Server) applyOptions(opts []option) { func (s *Server) reloadAuthorization() { s.mu.Lock() + oldAccounts := s.accounts + s.accounts, s.gacc = nil, nil + s.configureAccounts() + s.configureAuthorization() // This map will contain the names of accounts that have their streams // import configuration changed. - awcsti := make(map[string]struct{}, len(s.opts.Accounts)) + awcsti := make(map[string]struct{}, len(s.accounts)) - oldAccounts := s.accounts - s.accounts = make(map[string]*Account) - s.registerAccount(s.gacc) - for _, newAcc := range s.opts.Accounts { + for _, newAcc := range s.accounts { if acc, ok := oldAccounts[newAcc.Name]; ok { // If account exist in latest config, "transfer" the account's - // sublist to the new account object before registering it - // in s.accounts. + // sublist and client map to the new account. acc.mu.RLock() sl := acc.sl + clients := acc.clients acc.mu.RUnlock() newAcc.mu.Lock() newAcc.sl = sl + if len(clients) > 0 { + newAcc.clients = make(map[*client]*client, len(clients)) + for _, c := range clients { + newAcc.clients[c] = c + } + } // Check if current and new config of this account are same // in term of stream imports. if !acc.checkStreamImportsEqual(newAcc) { @@ -743,7 +767,6 @@ func (s *Server) reloadAuthorization() { } newAcc.mu.Unlock() } - s.registerAccount(newAcc) } // Gather clients that changed accounts. We will close them and they // will reconnect, doing the right thing. diff --git a/server/reload_test.go b/server/reload_test.go index 4be04188..04e1cfce 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -2668,7 +2668,7 @@ func TestConfigReloadAccountUsers(t *testing.T) { } // Old account should be gone - if s.LookupAccount("acc_deleted_after_reload") != nil { + if _, err := s.LookupAccount("acc_deleted_after_reload"); err == nil { t.Fatal("old account should be gone") } @@ -2683,7 +2683,7 @@ func TestConfigReloadAccountUsers(t *testing.T) { // being reconnected does not mean that resent of subscriptions // has already been processed. checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { - gAcc := s.LookupAccount(globalAccountName) + gAcc, _ := s.LookupAccount(globalAccountName) gAcc.mu.RLock() n := gAcc.sl.Count() fooMatch := gAcc.sl.Match("foo") @@ -2699,7 +2699,7 @@ func TestConfigReloadAccountUsers(t *testing.T) { return fmt.Errorf("Global account should have baz sub") } - sAcc := s.LookupAccount("synadia") + sAcc, _ := s.LookupAccount("synadia") sAcc.mu.RLock() n = sAcc.sl.Count() barMatch := sAcc.sl.Match("bar") @@ -2711,7 +2711,7 @@ func TestConfigReloadAccountUsers(t *testing.T) { return fmt.Errorf("Synadia account should have bar sub") } - nAcc := s.LookupAccount("nats.io") + nAcc, _ := s.LookupAccount("nats.io") nAcc.mu.RLock() n = nAcc.sl.Count() batMatch := nAcc.sl.Match("bat") @@ -2748,8 +2748,8 @@ func TestConfigReloadAccountNKeyUsers(t *testing.T) { s, _ := RunServerWithConfig(conf) defer s.Shutdown() - synadia := s.LookupAccount("synadia") - nats := s.LookupAccount("nats.io") + synadia, _ := s.LookupAccount("synadia") + nats, _ := s.LookupAccount("nats.io") seed1 := []byte("SUAPM67TC4RHQLKBX55NIQXSMATZDOZK6FNEOSS36CAYA7F7TY66LP4BOM") seed2 := []byte("SUAIS5JPX4X4GJ7EIIJEQ56DH2GWPYJRPWN5XJEDENJOZHCBLI7SEPUQDE") @@ -2854,7 +2854,7 @@ func TestConfigReloadAccountNKeyUsers(t *testing.T) { if ivan.Account != globalAcc { t.Fatalf("Invalid account for user Ivan: %#v", ivan.Account) } - if s.LookupAccount("synadia") != nil { + if _, err := s.LookupAccount("synadia"); err == nil { t.Fatal("Account Synadia should have been removed") } } diff --git a/server/route.go b/server/route.go index 1a3f0181..251a3f4d 100644 --- a/server/route.go +++ b/server/route.go @@ -117,8 +117,9 @@ func (c *client) removeReplySub(sub *subscription) { // Lookup the account based on sub.sid. if i := bytes.Index(sub.sid, []byte(" ")); i > 0 { // First part of SID for route is account name. - acc := c.srv.LookupAccount(string(sub.sid[:i])) - acc.sl.Remove(sub) + if acc, _ := c.srv.LookupAccount(string(sub.sid[:i])); acc != nil { + acc.sl.Remove(sub) + } c.mu.Lock() c.removeReplySubTimeout(sub) c.mu.Unlock() @@ -649,7 +650,7 @@ func (c *client) removeRemoteSubs() { accountName := strings.Fields(key)[0] ase := as[accountName] if ase == nil { - acc := srv.LookupAccount(accountName) + acc, _ := srv.LookupAccount(accountName) if acc == nil { continue } @@ -704,7 +705,7 @@ func (c *client) processRemoteUnsub(arg []byte) (err error) { return fmt.Errorf("processRemoteUnsub %s", err.Error()) } // Lookup the account - acc := c.srv.LookupAccount(accountName) + acc, _ := c.srv.LookupAccount(accountName) if acc == nil { c.Debugf("Unknown account %q for subject %q", accountName, subject) // Mark this account as not interested since we received a RS- and we @@ -773,7 +774,7 @@ func (c *client) processRemoteSub(argo []byte) (err error) { // Lookup the account // FIXME(dlc) - This may start having lots of contention? accountName := string(args[0]) - acc := c.srv.LookupAccount(accountName) + acc, _ := c.srv.LookupAccount(accountName) if acc == nil { if !srv.NewAccountsAllowed() { c.Debugf("Unknown account %q for subject %q", accountName, sub.subject) @@ -796,7 +797,7 @@ func (c *client) processRemoteSub(argo []byte) (err error) { } // Check if we have a maximum on the number of subscriptions. - if c.msubs > 0 && len(c.subs) >= c.msubs { + if c.subsExceeded() { c.mu.Unlock() c.maxSubsExceeded() return nil @@ -1023,7 +1024,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { } } - c := &client{srv: s, nc: conn, opts: clientOpts{}, kind: ROUTER, route: r} + c := &client{srv: s, nc: conn, opts: clientOpts{}, kind: ROUTER, msubs: -1, mpay: -1, route: r} // Grab server variables s.mu.Lock() diff --git a/server/server.go b/server/server.go index de3d6179..65d9a939 100644 --- a/server/server.go +++ b/server/server.go @@ -311,6 +311,13 @@ func (s *Server) setOpts(opts *Options) { s.optsMu.Unlock() } +func (s *Server) globalAccount() *Account { + s.mu.Lock() + gacc := s.gacc + s.mu.Unlock() + return gacc +} + func (s *Server) configureAccounts() error { // Used to setup Accounts. if s.accounts == nil { @@ -318,15 +325,46 @@ func (s *Server) configureAccounts() error { } // Create global account. if s.gacc == nil { - s.gacc = &Account{Name: globalAccountName} + s.gacc = NewAccount(globalAccountName) s.registerAccount(s.gacc) } opts := s.opts - // Check opts and walk through them. Making sure to create SLs. + // Check opts and walk through them. We need to copy them here + // so that we do not keep a real one sitting in the options. for _, acc := range s.opts.Accounts { - s.registerAccount(acc) + a := acc.shallowCopy() + acc.sl = nil + acc.clients = nil + s.registerAccount(a) + } + + // Now that we have this we need to remap any referenced accounts in + // import or export maps to the new ones. + swapApproved := func(ea *exportAuth) { + if ea == nil { + return + } + for sub, a := range ea.approved { + ea.approved[sub] = s.accounts[a.Name] + } + } + for _, acc := range s.accounts { + // Exports + for _, ea := range acc.exports.streams { + swapApproved(ea) + } + for _, ea := range acc.exports.services { + swapApproved(ea) + } + // Imports + for _, si := range acc.imports.streams { + si.acc = s.accounts[si.acc.Name] + } + for _, si := range acc.imports.services { + si.acc = s.accounts[si.acc.Name] + } } // Check for configured account resolvers. @@ -337,6 +375,9 @@ func (s *Server) configureAccounts() error { return fmt.Errorf("Resolver preloads only available for MemAccResolver") } for k, v := range opts.resolverPreloads { + if _, _, err := s.verifyAccountClaims(v); err != nil { + return fmt.Errorf("Preloaded Account: %v", err) + } s.accResolver.Store(k, v) } } @@ -344,8 +385,8 @@ func (s *Server) configureAccounts() error { // Set the system account if it was configured. if opts.SystemAccount != _EMPTY_ { - if acc := s.lookupAccount(opts.SystemAccount); acc == nil { - return ErrMissingAccount + if _, err := s.lookupAccount(opts.SystemAccount); err != nil { + return fmt.Errorf("Error resolving system account: %v", err) } } return nil @@ -425,13 +466,13 @@ func (s *Server) initStampedTrustedKeys() bool { // PrintAndDie is exported for access in other packages. func PrintAndDie(msg string) { - fmt.Fprintf(os.Stderr, "%s\n", msg) + fmt.Fprintln(os.Stderr, msg) os.Exit(1) } // PrintServerAndExit will print our version and exit. func PrintServerAndExit() { - fmt.Printf("nats-server version %s\n", VERSION) + fmt.Printf("nats-server: v%s\n", VERSION) os.Exit(0) } @@ -507,10 +548,7 @@ func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew b if acc, ok := s.accounts[name]; ok { return acc, false } - acc := &Account{ - Name: name, - sl: NewSublist(), - } + acc := NewAccount(name) s.registerAccount(acc) return acc, true } @@ -524,7 +562,7 @@ func (s *Server) RegisterAccount(name string) (*Account, error) { if _, ok := s.accounts[name]; ok { return nil, ErrAccountExists } - acc := &Account{Name: name} + acc := NewAccount(name) s.registerAccount(acc) return acc, nil } @@ -583,7 +621,7 @@ func (s *Server) setSystemAccount(acc *Account) error { s.sys = &internal{ account: acc, - client: &client{srv: s, kind: SYSTEM, opts: defaultOpts, start: time.Now(), last: time.Now()}, + client: &client{srv: s, kind: SYSTEM, opts: defaultOpts, msubs: -1, mpay: -1, start: time.Now(), last: time.Now()}, seq: 1, sid: 1, servers: make(map[string]*serverUpdate), @@ -663,23 +701,28 @@ func (s *Server) registerAccount(acc *Account) { // lookupAccount is a function to return the account structure // associated with an account name. // Lock should be held on entry. -func (s *Server) lookupAccount(name string) *Account { +func (s *Server) lookupAccount(name string) (*Account, error) { acc := s.accounts[name] if acc != nil { // If we are expired and we have a resolver, then // return the latest information from the resolver. if s.accResolver != nil && acc.IsExpired() { - s.updateAccount(acc) + if err := s.updateAccount(acc); err != nil { + return nil, err + } } - return acc + return acc, nil } // If we have a resolver see if it can fetch the account. + if s.accResolver == nil { + return nil, ErrMissingAccount + } return s.fetchAccount(name) } // LookupAccount is a public function to return the account structure // associated with name. -func (s *Server) LookupAccount(name string) *Account { +func (s *Server) LookupAccount(name string) (*Account, error) { s.mu.Lock() defer s.mu.Unlock() return s.lookupAccount(name) @@ -687,48 +730,36 @@ func (s *Server) LookupAccount(name string) *Account { // This will fetch new claims and if found update the account with new claims. // Lock should be held upon entry. -func (s *Server) updateAccount(acc *Account) bool { +func (s *Server) updateAccount(acc *Account) error { // TODO(dlc) - Make configurable if time.Since(acc.updated) < time.Second { s.Debugf("Requested account update for [%s] ignored, too soon", acc.Name) - return false + return ErrAccountResolverUpdateTooSoon } claimJWT, err := s.fetchRawAccountClaims(acc.Name) if err != nil { - return false + return err } - acc.updated = time.Now() - if acc.claimJWT != "" && acc.claimJWT == claimJWT { - s.Debugf("Requested account update for [%s], same claims detected", acc.Name) - return false - } - accClaims, _, err := s.verifyAccountClaims(claimJWT) - if err == nil && accClaims != nil { - acc.claimJWT = claimJWT - s.updateAccountClaims(acc, accClaims) - return true - } - return false + return s.updateAccountWithClaimJWT(acc, claimJWT) } -// updateAccountWithClaimJWT will chack and apply the claim update. -func (s *Server) updateAccountWithClaimJWT(acc *Account, claimJWT string) bool { +// updateAccountWithClaimJWT will check and apply the claim update. +func (s *Server) updateAccountWithClaimJWT(acc *Account, claimJWT string) error { if acc == nil { - return false + return ErrMissingAccount } acc.updated = time.Now() if acc.claimJWT != "" && acc.claimJWT == claimJWT { s.Debugf("Requested account update for [%s], same claims detected", acc.Name) - return false + return ErrAccountResolverSameClaims } accClaims, _, err := s.verifyAccountClaims(claimJWT) if err == nil && accClaims != nil { acc.claimJWT = claimJWT s.updateAccountClaims(acc, accClaims) - return true + return nil } - return false - + return err } // fetchRawAccountClaims will grab raw account claims iff we have a resolver. @@ -780,14 +811,15 @@ func (s *Server) verifyAccountClaims(claimJWT string) (*jwt.AccountClaims, strin // This will fetch an account from a resolver if defined. // Lock should be held upon entry. -func (s *Server) fetchAccount(name string) *Account { - if accClaims, claimJWT, _ := s.fetchAccountClaims(name); accClaims != nil { +func (s *Server) fetchAccount(name string) (*Account, error) { + if accClaims, claimJWT, err := s.fetchAccountClaims(name); accClaims != nil { acc := s.buildInternalAccount(accClaims) acc.claimJWT = claimJWT s.registerAccount(acc) - return acc + return acc, nil + } else { + return nil, err } - return nil } // Start up the server, this will block. @@ -1324,11 +1356,15 @@ func (s *Server) createClient(conn net.Conn) *client { maxPay := int32(opts.MaxPayload) maxSubs := opts.MaxSubs + // For system, maxSubs of 0 means unlimited, so re-adjust here. + if maxSubs == 0 { + maxSubs = -1 + } now := time.Now() c := &client{srv: s, nc: conn, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now} - c.registerWithAccount(s.gacc) + c.registerWithAccount(s.globalAccount()) // Grab JSON info string s.mu.Lock() diff --git a/server/split_test.go b/server/split_test.go index 9abf0744..dbb52d89 100644 --- a/server/split_test.go +++ b/server/split_test.go @@ -28,9 +28,9 @@ func TestSplitBufferSubOp(t *testing.T) { if err != nil { t.Fatalf("Error creating gateways: %v", err) } - s := &Server{gacc: &Account{Name: globalAccountName}, accounts: make(map[string]*Account), gateway: gws} + s := &Server{gacc: NewAccount(globalAccountName), accounts: make(map[string]*Account), gateway: gws} s.registerAccount(s.gacc) - c := &client{srv: s, acc: s.gacc, subs: make(map[string]*subscription), nc: cli} + c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, subs: make(map[string]*subscription), nc: cli} subop := []byte("SUB foo 1\r\n") subop1 := subop[:6] @@ -65,9 +65,9 @@ func TestSplitBufferSubOp(t *testing.T) { } func TestSplitBufferUnsubOp(t *testing.T) { - s := &Server{gacc: &Account{Name: globalAccountName}, accounts: make(map[string]*Account), gateway: &srvGateway{}} + s := &Server{gacc: NewAccount(globalAccountName), accounts: make(map[string]*Account), gateway: &srvGateway{}} s.registerAccount(s.gacc) - c := &client{srv: s, acc: s.gacc, subs: make(map[string]*subscription)} + c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, subs: make(map[string]*subscription)} subop := []byte("SUB foo 1024\r\n") if err := c.parse(subop); err != nil { @@ -100,7 +100,7 @@ func TestSplitBufferUnsubOp(t *testing.T) { } func TestSplitBufferPubOp(t *testing.T) { - c := &client{subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} pub := []byte("PUB foo.bar INBOX.22 11\r\nhello world\r") pub1 := pub[:2] pub2 := pub[2:9] @@ -166,7 +166,7 @@ func TestSplitBufferPubOp(t *testing.T) { } func TestSplitBufferPubOp2(t *testing.T) { - c := &client{subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} pub := []byte("PUB foo.bar INBOX.22 11\r\nhello world\r\n") pub1 := pub[:30] pub2 := pub[30:] @@ -186,7 +186,7 @@ func TestSplitBufferPubOp2(t *testing.T) { } func TestSplitBufferPubOp3(t *testing.T) { - c := &client{subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} pubAll := []byte("PUB foo bar 11\r\nhello world\r\n") pub := pubAll[:16] @@ -212,7 +212,7 @@ func TestSplitBufferPubOp3(t *testing.T) { } func TestSplitBufferPubOp4(t *testing.T) { - c := &client{subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} pubAll := []byte("PUB foo 11\r\nhello world\r\n") pub := pubAll[:12] @@ -238,7 +238,7 @@ func TestSplitBufferPubOp4(t *testing.T) { } func TestSplitBufferPubOp5(t *testing.T) { - c := &client{subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} pubAll := []byte("PUB foo 11\r\nhello world\r\n") // Splits need to be on MSG_END now too, so make sure we check that. @@ -257,7 +257,7 @@ func TestSplitBufferPubOp5(t *testing.T) { } func TestSplitConnectArg(t *testing.T) { - c := &client{subs: make(map[string]*subscription)} + c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription)} connectAll := []byte("CONNECT {\"verbose\":false,\"tls_required\":false," + "\"user\":\"test\",\"pedantic\":true,\"pass\":\"pass\"}\r\n") @@ -306,7 +306,7 @@ func TestSplitConnectArg(t *testing.T) { func TestSplitDanglingArgBuf(t *testing.T) { s := New(&defaultServerOptions) - c := &client{srv: s, acc: s.gacc, subs: make(map[string]*subscription)} + c := &client{srv: s, acc: s.gacc, msubs: -1, mpay: -1, subs: make(map[string]*subscription)} // We test to make sure we do not dangle any argBufs after processing // since that could lead to performance issues. @@ -365,7 +365,7 @@ func TestSplitDanglingArgBuf(t *testing.T) { } // MSG (the client has to be a ROUTE) - c = &client{subs: make(map[string]*subscription), kind: ROUTER} + c = &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription), kind: ROUTER} msgop := []byte("RMSG $foo foo 5\r\nhello\r\n") c.parse(msgop[:5]) c.parse(msgop[5:10]) @@ -445,7 +445,7 @@ func TestSplitRoutedMsgArg(t *testing.T) { } func TestSplitBufferMsgOp(t *testing.T) { - c := &client{subs: make(map[string]*subscription), kind: ROUTER} + c := &client{msubs: -1, mpay: -1, subs: make(map[string]*subscription), kind: ROUTER} msg := []byte("RMSG $G foo.bar _INBOX.22 11\r\nhello world\r") msg1 := msg[:2] msg2 := msg[2:9] diff --git a/test/configs/resolver_preload.conf b/test/configs/resolver_preload.conf index 1cf910af..5d40c4c3 100644 --- a/test/configs/resolver_preload.conf +++ b/test/configs/resolver_preload.conf @@ -23,4 +23,4 @@ resolver_preload = { ADM2CIIL3RWXBA6T2HW3FODNCQQOUJEHHQD6FKCPVAMHDNTTSMO73ROX: "eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5In0.eyJqdGkiOiJCMk0zTFRMT1ZNRk03REY3U0M3SE9RTzNXUzI2RFhMTURINk0zRzY3RzRXRFdTWExPNlVBIiwiaWF0IjoxNTQzOTU4NzI0LCJpc3MiOiJPQ0FUMzNNVFZVMlZVT0lNR05HVU5YSjY2QUgyUkxTREFGM01VQkNZQVk1UU1JTDY1TlFNNlhRRyIsInN1YiI6IkFETTJDSUlMM1JXWEJBNlQySFczRk9ETkNRUU9VSkVISFFENkZLQ1BWQU1IRE5UVFNNTzczUk9YIiwidHlwZSI6ImFjY291bnQiLCJuYXRzIjp7ImxpbWl0cyI6e319fQ.pvvPmBei_IFEbspHGN5FkWJoSfHk8BVeJCCVODTgul8-xUU8p1fidvsg3sgMvrXqXtmL8SFc68jGQd0nGtk5Dw" -} \ No newline at end of file +} diff --git a/test/gateway_test.go b/test/gateway_test.go index 01222032..d0e0b7e4 100644 --- a/test/gateway_test.go +++ b/test/gateway_test.go @@ -139,7 +139,7 @@ func TestGatewayAccountInterest(t *testing.T) { func TestGatewaySubjectInterest(t *testing.T) { ob := testDefaultOptionsForGateway("B") - fooAcc := &server.Account{Name: "$foo"} + fooAcc := server.NewAccount("$foo") ob.Accounts = []*server.Account{fooAcc} ob.Users = []*server.User{&server.User{Username: "ivan", Password: "password", Account: fooAcc}} sb := runGatewayServer(ob) @@ -275,7 +275,7 @@ func TestGatewaySubjectInterest(t *testing.T) { func TestGatewayQueue(t *testing.T) { ob := testDefaultOptionsForGateway("B") - fooAcc := &server.Account{Name: "$foo"} + fooAcc := server.NewAccount("$foo") ob.Accounts = []*server.Account{fooAcc} ob.Users = []*server.User{&server.User{Username: "ivan", Password: "password", Account: fooAcc}} sb := runGatewayServer(ob) diff --git a/test/operator_test.go b/test/operator_test.go index 9a79b9b0..e692b41c 100644 --- a/test/operator_test.go +++ b/test/operator_test.go @@ -136,7 +136,11 @@ func createAccountForOperatorKey(t *testing.T, s *server.Server, seed []byte) (* if err := s.AccountResolver().Store(pub, jwt); err != nil { t.Fatalf("Account Resolver returned an error: %v", err) } - return s.LookupAccount(pub), akp + acc, err := s.LookupAccount(pub) + if err != nil { + t.Fatalf("Error looking up account: %v", err) + } + return acc, akp } func createAccount(t *testing.T, s *server.Server) (*server.Account, nkeys.KeyPair) { @@ -238,7 +242,7 @@ func TestOperatorMemResolverPreload(t *testing.T) { s, opts := RunServerWithConfig("./configs/resolver_preload.conf") // Make sure we can look up the account. - acc := s.LookupAccount("ADM2CIIL3RWXBA6T2HW3FODNCQQOUJEHHQD6FKCPVAMHDNTTSMO73ROX") + acc, _ := s.LookupAccount("ADM2CIIL3RWXBA6T2HW3FODNCQQOUJEHHQD6FKCPVAMHDNTTSMO73ROX") if acc == nil { t.Fatalf("Expected to properly lookup account") } diff --git a/vendor/github.com/nats-io/jwt/account_claims.go b/vendor/github.com/nats-io/jwt/account_claims.go index 5f804832..d8fd7344 100644 --- a/vendor/github.com/nats-io/jwt/account_claims.go +++ b/vendor/github.com/nats-io/jwt/account_claims.go @@ -6,22 +6,30 @@ import ( "github.com/nats-io/nkeys" ) +// Signifies no limit. +const NoLimit = -1 + // OperatorLimits are used to limit access by an account type OperatorLimits struct { Subs int64 `json:"subs,omitempty"` // Max number of subscriptions Conn int64 `json:"conn,omitempty"` // Max number of active connections Imports int64 `json:"imports,omitempty"` // Max number of imports Exports int64 `json:"exports,omitempty"` // Max number of exports - WildcardExports bool `json:"wildcards,omitempty"` // Are wildcards allowed in exports Data int64 `json:"data,omitempty"` // Max number of bytes Payload int64 `json:"payload,omitempty"` // Max message payload + WildcardExports bool `json:"wildcards,omitempty"` // Are wildcards allowed in exports } -// IsEmpty returns true if all of the limits are 0 +// IsEmpty returns true if all of the limits are 0/false. func (o *OperatorLimits) IsEmpty() bool { return *o == OperatorLimits{} } +// IsUnlimited returns true if all limits are +func (o *OperatorLimits) IsUnlimited() bool { + return *o == OperatorLimits{NoLimit, NoLimit, NoLimit, NoLimit, NoLimit, NoLimit, true} +} + // Validate checks that the operator limits contain valid values func (o *OperatorLimits) Validate(vr *ValidationResults) { // negative values mean unlimited, so all numbers are valid @@ -46,11 +54,27 @@ func (a *Account) Validate(acct *AccountClaims, vr *ValidationResults) { } if !a.Limits.IsEmpty() && a.Limits.Imports >= 0 && int64(len(a.Imports)) > a.Limits.Imports { - vr.AddError("the account contains more imports than allowed by the operator limits") + vr.AddError("the account contains more imports than allowed by the operator") } - if !a.Limits.IsEmpty() && a.Limits.Exports >= 0 && int64(len(a.Exports)) > a.Limits.Exports { - vr.AddError("the account contains more exports than allowed by the operator limits") + // Check Imports and Exports for limit violations. + if a.Limits.Imports != NoLimit { + if int64(len(a.Imports)) > a.Limits.Imports { + vr.AddError("the account contains more imports than allowed by the operator") + } + } + if a.Limits.Exports != NoLimit { + if int64(len(a.Exports)) > a.Limits.Exports { + vr.AddError("the account contains more exports than allowed by the operator") + } + // Check for wildcard restrictions + if !a.Limits.WildcardExports { + for _, ex := range a.Exports { + if ex.Subject.HasWildCards() { + vr.AddError("the account contains wildcard exports that are not allowed by the operator") + } + } + } } } @@ -66,6 +90,9 @@ func NewAccountClaims(subject string) *AccountClaims { return nil } c := &AccountClaims{} + // Set to unlimited to start. We do it this way so we get compiler + // errors if we add to the OperatorLimits. + c.Limits = OperatorLimits{NoLimit, NoLimit, NoLimit, NoLimit, NoLimit, NoLimit, true} c.Subject = subject return c } diff --git a/vendor/github.com/nats-io/jwt/claims.go b/vendor/github.com/nats-io/jwt/claims.go index e060a7d9..ae21e997 100644 --- a/vendor/github.com/nats-io/jwt/claims.go +++ b/vendor/github.com/nats-io/jwt/claims.go @@ -63,12 +63,20 @@ type Prefix struct { nkeys.PrefixByte } +func encodeToString(d []byte) string { + return base64.RawURLEncoding.EncodeToString(d) +} + +func decodeString(s string) ([]byte, error) { + return base64.RawURLEncoding.DecodeString(s) +} + func serialize(v interface{}) (string, error) { j, err := json.Marshal(v) if err != nil { return "", err } - return base64.RawURLEncoding.EncodeToString(j), nil + return encodeToString(j), nil } func (c *ClaimsData) doEncode(header *Header, kp nkeys.KeyPair, claim Claims) (string, error) { @@ -143,7 +151,7 @@ func (c *ClaimsData) doEncode(header *Header, kp nkeys.KeyPair, claim Claims) (s if err != nil { return "", err } - eSig := base64.RawURLEncoding.EncodeToString(sig) + eSig := encodeToString(sig) return fmt.Sprintf("%s.%s.%s", h, payload, eSig), nil } @@ -173,7 +181,7 @@ func (c *ClaimsData) String(claim interface{}) string { } func parseClaims(s string, target Claims) error { - h, err := base64.RawURLEncoding.DecodeString(s) + h, err := decodeString(s) if err != nil { return err } @@ -239,7 +247,7 @@ func Decode(token string, target Claims) error { return err } - sig, err := base64.RawURLEncoding.DecodeString(chunks[2]) + sig, err := decodeString(chunks[2]) if err != nil { return err } diff --git a/vendor/github.com/nats-io/jwt/header.go b/vendor/github.com/nats-io/jwt/header.go index 72ffd8f3..7865d111 100644 --- a/vendor/github.com/nats-io/jwt/header.go +++ b/vendor/github.com/nats-io/jwt/header.go @@ -1,7 +1,6 @@ package jwt import ( - "encoding/base64" "encoding/json" "fmt" "strings" @@ -9,7 +8,7 @@ import ( const ( // Version - Version = "0.0.3" + Version = "0.0.5" // TokenTypeJwt is the JWT token type supported JWT tokens // encoded and decoded by this library @@ -28,7 +27,7 @@ type Header struct { // Parses a header JWT token func parseHeaders(s string) (*Header, error) { - h, err := base64.RawURLEncoding.DecodeString(s) + h, err := decodeString(s) if err != nil { return nil, err } diff --git a/vendor/github.com/nats-io/nkeys/main.go b/vendor/github.com/nats-io/nkeys/main.go index a85b8b46..5b05df50 100644 --- a/vendor/github.com/nats-io/nkeys/main.go +++ b/vendor/github.com/nats-io/nkeys/main.go @@ -20,7 +20,7 @@ import ( ) // Version -const Version = "0.0.1" +const Version = "0.0.2" // Errors var ( diff --git a/vendor/github.com/nats-io/nkeys/nk/main.go b/vendor/github.com/nats-io/nkeys/nk/main.go index 8274e5a6..410668a3 100644 --- a/vendor/github.com/nats-io/nkeys/nk/main.go +++ b/vendor/github.com/nats-io/nkeys/nk/main.go @@ -249,7 +249,7 @@ func createVanityKey(keyType, vanity, entropy string, max int) nkeys.KeyPair { fmt.Fprintf(os.Stderr, "\r\033[mcomputing\033[m %s ", string(spin)) kp := genKeyPair(pre, entropy) pub, _ := kp.PublicKey() - if bytes.HasPrefix([]byte(pub)[1:], []byte(vanity)) { + if strings.HasPrefix(pub[1:], vanity) { fmt.Fprintf(os.Stderr, "\r") return kp } @@ -281,15 +281,6 @@ func readKeyFile(filename string) []byte { return key } -func isValidLeadingByte(c byte) bool { - switch c { - case 'S', 'P', 'N', 'C', 'O', 'A', 'U': - return true - default: - return false - } -} - func wipeSlice(buf []byte) { for i := range buf { buf[i] = 'x' diff --git a/vendor/manifest b/vendor/manifest index 9b7e3e1e..b6883f13 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -5,7 +5,7 @@ "importpath": "github.com/nats-io/jwt", "repository": "https://github.com/nats-io/jwt", "vcs": "git", - "revision": "1f8635961c5122b7410998050fd9616afeb28d92", + "revision": "f26fb76c5f45b9fe3e657a1135d795079cecffac", "branch": "master", "notests": true }, @@ -13,7 +13,7 @@ "importpath": "github.com/nats-io/nkeys", "repository": "https://github.com/nats-io/nkeys", "vcs": "git", - "revision": "cdc461ca8f466aeeea6eca667cde553efe40bc31", + "revision": "1546a3320a8f195a5b5c84aef8309377c2e411d5", "branch": "master", "notests": true },