Merge pull request #825 from nats-io/operator

Updates for operator based configurations.
This commit is contained in:
Derek Collison
2018-12-03 14:19:23 -08:00
committed by GitHub
23 changed files with 1035 additions and 125 deletions

View File

@@ -14,6 +14,7 @@
package server
import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
@@ -111,7 +112,7 @@ type exportMap struct {
// NumClients returns active number of clients for this account for
// all known servers.
func (a *Account) NumClients() int {
func (a *Account) NumConnections() int {
a.mu.RLock()
defer a.mu.RUnlock()
return len(a.clients) + a.nrclients
@@ -119,22 +120,34 @@ func (a *Account) NumClients() int {
// NumLocalClients returns active number of clients for this account
// on this server.
func (a *Account) NumLocalClients() int {
func (a *Account) NumLocalConnections() int {
a.mu.RLock()
defer a.mu.RUnlock()
return len(a.clients)
}
// MaxClientsReached returns if we have reached our limit for number of connections.
func (a *Account) MaxTotalClientsReached() bool {
func (a *Account) MaxTotalConnectionsReached() bool {
a.mu.RLock()
defer a.mu.RUnlock()
return a.maxTotalConnectionsReached()
}
func (a *Account) maxTotalConnectionsReached() bool {
if a.mconns != 0 {
return len(a.clients)+a.nrclients >= a.mconns
}
return false
}
// MaxActiveConnections return the set limit for the account system
// wide for total number of active connections.
func (a *Account) MaxActiveConnections() int {
a.mu.RLock()
defer a.mu.RUnlock()
return a.mconns
}
// RoutedSubs returns how many subjects we would send across a route when first
// connected or expressing interest. Local client subs.
func (a *Account) RoutedSubs() int {
@@ -777,6 +790,13 @@ func (s *Server) SetAccountResolver(ar AccountResolver) {
s.mu.Unlock()
}
// AccountResolver returns the registered account resolver.
func (s *Server) AccountResolver() AccountResolver {
s.mu.Lock()
defer s.mu.Unlock()
return s.accResolver
}
// updateAccountClaims will update and existing account with new claims.
// This will replace any exports or imports previously defined.
func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
@@ -878,7 +898,15 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
a.msubs = int(ac.Limits.Subs)
a.mpay = int32(ac.Limits.Payload)
a.mconns = int(ac.Limits.Conn)
for i, c := range gatherClients() {
clients := gatherClients()
// Sort if we are over the limit.
if a.maxTotalConnectionsReached() {
sort.Slice(clients, func(i, j int) bool {
return clients[i].start.After(clients[j].start)
})
}
for i, c := range clients {
if a.mconns > 0 && i >= a.mconns {
c.maxAccountConnExceeded()
continue
@@ -925,17 +953,69 @@ func buildInternalNkeyUser(uc *jwt.UserClaims, acc *Account) *NkeyUser {
// AccountResolver interface. This is to fetch Account JWTs by public nkeys
type AccountResolver interface {
Fetch(pub string) (string, error)
Fetch(name string) (string, error)
Store(name, jwt string) error
}
// Mostly for testing.
type MemAccResolver struct {
sync.Map
sm sync.Map
}
func (m *MemAccResolver) Fetch(pub string) (string, error) {
if j, ok := m.Load(pub); ok {
// Fetch will fetch the account jwt claims from the internal sync.Map.
func (m *MemAccResolver) Fetch(name string) (string, error) {
if j, ok := m.sm.Load(name); ok {
return j.(string), nil
}
return "", ErrMissingAccount
return _EMPTY_, ErrMissingAccount
}
// Store will store the account jwt claims in the internal sync.Map.
func (m *MemAccResolver) Store(name, jwt string) error {
m.sm.Store(name, jwt)
return nil
}
// URLAccResolver implements an http fetcher.
type URLAccResolver struct {
url string
c *http.Client
}
// NewURLAccResolver returns a new resolver for the given base URL.
func NewURLAccResolver(url string) (*URLAccResolver, error) {
if !strings.HasSuffix(url, "/") {
url += "/"
}
// Do basic test to see if anyone is home.
// FIXME(dlc) - Make timeout configurable post MVP.
ur := &URLAccResolver{
url: url,
c: &http.Client{Timeout: 2 * time.Second},
}
if _, err := ur.Fetch(""); err != nil {
return nil, err
}
return ur, nil
}
// Fetch will fetch the account jwt claims from the base url, appending the
// account name onto the end.
func (ur *URLAccResolver) Fetch(name string) (string, error) {
url := ur.url + name
resp, err := ur.c.Get(url)
if err != nil || resp == nil || resp.StatusCode != http.StatusOK {
return _EMPTY_, fmt.Errorf("URL(%q) returned error", url)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return _EMPTY_, err
}
return string(body), nil
}
// Store is not implemented for URL Resolver.
func (ur *URLAccResolver) Store(name, jwt string) error {
return fmt.Errorf("Store operation not supported on URL Resolver")
}

View File

@@ -197,10 +197,10 @@ func TestActiveAccounts(t *testing.T) {
if foo == nil || bar == nil {
t.Fatalf("Error looking up accounts")
}
if nc := foo.NumClients(); nc != 2 {
if nc := foo.NumConnections(); nc != 2 {
t.Fatalf("Expected account foo to have 2 clients, got %d", nc)
}
if nc := bar.NumClients(); nc != 1 {
if nc := bar.NumConnections(); nc != 1 {
t.Fatalf("Expected account bar to have 1 client, got %d", nc)
}
@@ -218,7 +218,7 @@ func TestActiveAccounts(t *testing.T) {
cb1.closeConnection(ClientClosed)
waitTilActiveCount(1)
if nc := bar.NumClients(); nc != 0 {
if nc := bar.NumConnections(); nc != 0 {
t.Fatalf("Expected account bar to have 0 clients, got %d", nc)
}
@@ -226,14 +226,14 @@ func TestActiveAccounts(t *testing.T) {
cf1.closeConnection(ClientClosed)
waitTilActiveCount(1)
if nc := foo.NumClients(); nc != 1 {
if nc := foo.NumConnections(); nc != 1 {
t.Fatalf("Expected account foo to have 1 client, got %d", nc)
}
cf2.closeConnection(ClientClosed)
waitTilActiveCount(0)
if nc := foo.NumClients(); nc != 0 {
if nc := foo.NumConnections(); nc != 0 {
t.Fatalf("Expected account bar to have 0 clients, got %d", nc)
}
}

View File

@@ -200,7 +200,7 @@ func (s *Server) configureAuthorization() {
// This just checks and sets up the user map if we have multiple users.
if opts.CustomClientAuthentication != nil {
s.info.AuthRequired = true
} else if len(s.trustedNkeys) > 0 {
} else if len(s.trustedKeys) > 0 {
s.info.AuthRequired = true
} else if opts.Nkeys != nil || opts.Users != nil {
// Support both at the same time.
@@ -275,8 +275,8 @@ func (s *Server) isClientAuthorized(c *client) bool {
return true
}
// Check if we have trustedNkeys defined in the server. If so we require a user jwt.
if s.trustedNkeys != nil {
// Check if we have trustedKeys defined in the server. If so we require a user jwt.
if s.trustedKeys != nil {
if c.opts.JWT == "" {
s.mu.Unlock()
c.Debugf("Authentication requires a user JWT")

View File

@@ -413,7 +413,7 @@ func (c *client) registerWithAccount(acc *Account) error {
}
}
// Check if we have a max connections violation
if acc.MaxTotalClientsReached() {
if acc.MaxTotalConnectionsReached() {
return ErrTooManyAccountConnections
}
@@ -1106,7 +1106,7 @@ func (c *client) authViolation() {
var hasTrustedNkeys, hasNkeys, hasUsers bool
if s := c.srv; s != nil {
s.mu.Lock()
hasTrustedNkeys = len(s.trustedNkeys) > 0
hasTrustedNkeys = len(s.trustedKeys) > 0
hasNkeys = s.nkeys != nil
hasUsers = s.users != nil
s.mu.Unlock()

View File

@@ -34,8 +34,8 @@ const (
var (
// gitCommit injected at build
gitCommit string
// trustedNkeys is a whitespace separated array of trusted operator's public nkeys.
trustedNkeys string
// trustedKeys is a whitespace separated array of trusted operator's public nkeys.
trustedKeys string
)
const (

View File

@@ -75,6 +75,9 @@ var (
// ErrAccountValidation is returned when an account has failed validation.
ErrAccountValidation = errors.New("Account Validation Failed")
// ErrAccountExpired is returned when an account has expired.
ErrAccountExpired = errors.New("Account Expired")
// ErrNoAccountResolver is returned when we attempt an update but do not have an account resolver.
ErrNoAccountResolver = errors.New("Account Resolver Missing")

View File

@@ -28,10 +28,13 @@ const (
disconnectEventSubj = "$SYS.ACCOUNT.%s.DISCONNECT"
accConnsEventSubj = "$SYS.SERVER.ACCOUNT.%s.CONNS"
accConnsReqSubj = "$SYS.REQ.ACCOUNT.%s.CONNS"
accUpdateEventSubj = "$SYS.ACCOUNT.%s.CLAIMS.UPDATE"
connsRespSubj = "$SYS._INBOX_.%s"
shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN"
shutdownEventTokens = 4
serverSubjectIndex = 2
accUpdateTokens = 5
accUpdateAccIndex = 2
)
// Used to send and receive messages from inside the server.
@@ -279,6 +282,28 @@ func (s *Server) initEventTracking() {
if _, err := s.sysSubscribe(subject, s.remoteServerShutdown); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}
// Listen for account claims updates.
subject = fmt.Sprintf(accUpdateEventSubj, "*")
if _, err := s.sysSubscribe(subject, s.accountClaimUpdate); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}
}
// accountClaimUpdate will receive claim updates for accounts.
func (s *Server) accountClaimUpdate(sub *subscription, subject, reply string, msg []byte) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.eventsEnabled() {
return
}
toks := strings.Split(subject, tsep)
if len(toks) < accUpdateTokens {
s.Debugf("Received account claims update on bad subject %q", subject)
return
}
accName := toks[accUpdateAccIndex]
s.updateAccountWithClaimJWT(s.accounts[accName], string(msg))
}
// processRemoteServerShutdown will update any affected accounts.
@@ -393,7 +418,7 @@ func (s *Server) connsRequest(sub *subscription, subject, reply string, msg []by
if acc == nil {
return
}
if nlc := acc.NumLocalClients(); nlc > 0 {
if nlc := acc.NumLocalConnections(); nlc > 0 {
s.sendAccConnsUpdate(acc, reply)
}
}

View File

@@ -17,6 +17,9 @@ import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"
"time"
@@ -60,8 +63,8 @@ func runTrustedServer(t *testing.T) (*Server, *Options) {
opts := DefaultOptions()
kp, _ := nkeys.FromSeed(oSeed)
pub, _ := kp.PublicKey()
opts.TrustedNkeys = []string{pub}
opts.accResolver = &MemAccResolver{}
opts.TrustedKeys = []string{pub}
opts.AccountResolver = &MemAccResolver{}
s := RunServer(opts)
return s, opts
}
@@ -87,8 +90,8 @@ func runTrustedCluster(t *testing.T) (*Server, *Options, *Server, *Options) {
optsA := DefaultOptions()
optsA.Cluster.Host = "127.0.0.1"
optsA.TrustedNkeys = []string{pub}
optsA.accResolver = mr
optsA.TrustedKeys = []string{pub}
optsA.AccountResolver = mr
optsA.SystemAccount = apub
sa := RunServer(optsA)
@@ -405,7 +408,7 @@ func TestSystemAccountConnectionLimitsServersStaggered(t *testing.T) {
}
// Restart server B.
optsB.accResolver = sa.accResolver
optsB.AccountResolver = sa.accResolver
optsB.SystemAccount = sa.systemAccount().Name
sb = RunServer(optsB)
defer sb.Shutdown()
@@ -506,7 +509,7 @@ func TestSystemAccountConnectionLimitsServerShutdownForced(t *testing.T) {
defer c.Close()
}
// Now shutdown Server B. Do so such that now communications go out.
// Now shutdown Server B. Do so such that no communications go out.
sb.mu.Lock()
sb.sys = nil
sb.mu.Unlock()
@@ -533,3 +536,197 @@ func TestSystemAccountConnectionLimitsServerShutdownForced(t *testing.T) {
return nil
})
}
func TestSystemAccountFromConfig(t *testing.T) {
kp, _ := nkeys.FromSeed(oSeed)
opub, _ := kp.PublicKey()
akp, _ := nkeys.CreateAccount()
apub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(apub)
ajwt, err := nac.Encode(kp)
if err != nil {
t.Fatalf("Error generating account JWT: %v", err)
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(ajwt))
}))
defer ts.Close()
confTemplate := `
listen: -1
trusted: %s
system_account: %s
resolver: URL("%s/jwt/v1/accounts/")
`
conf := createConfFile(t, []byte(fmt.Sprintf(confTemplate, opub, apub, ts.URL)))
defer os.Remove(conf)
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()
if acc := s.SystemAccount(); acc == nil || acc.Name != apub {
t.Fatalf("System Account not properly set")
}
}
func TestAccountClaimsUpdates(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
sacc, sakp := createAccount(s)
s.setSystemAccount(sacc)
// Let's create a normal account with limits we can update.
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
nac.Limits.Conn = 4
ajwt, _ := nac.Encode(okp)
addAccountToMemResolver(s, pub, ajwt)
acc := s.LookupAccount(pub)
if acc.MaxActiveConnections() != 4 {
t.Fatalf("Expected to see a limit of 4 connections")
}
// Simulate a systems publisher so we can do an account claims update.
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
nc, err := nats.Connect(url, createUserCreds(t, s, sakp))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
// Update the account
nac = jwt.NewAccountClaims(pub)
nac.Limits.Conn = 8
issAt := time.Now().Add(-30 * time.Second).Unix()
nac.IssuedAt = issAt
expires := time.Now().Add(2 * time.Second).Unix()
nac.Expires = expires
ajwt, _ = nac.Encode(okp)
// Publish to the system update subject.
claimUpdateSubj := fmt.Sprintf(accUpdateEventSubj, pub)
nc.Publish(claimUpdateSubj, []byte(ajwt))
nc.Flush()
acc = s.LookupAccount(pub)
if acc.MaxActiveConnections() != 8 {
t.Fatalf("Account was not updated")
}
}
func TestAccountConnsLimitExceededAfterUpdate(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
sacc, _ := createAccount(s)
s.setSystemAccount(sacc)
// Let's create a normal account with limits we can update.
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
nac.Limits.Conn = 10
ajwt, _ := nac.Encode(okp)
addAccountToMemResolver(s, pub, ajwt)
acc := s.LookupAccount(pub)
// Now create the max connections.
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
for {
nc, err := nats.Connect(url, createUserCreds(t, s, akp))
if err != nil {
break
}
defer nc.Close()
}
// We should have max here.
if total := s.NumClients(); total != acc.MaxActiveConnections() {
t.Fatalf("Expected %d connections, got %d", acc.MaxActiveConnections(), total)
}
// Now change limits to make current connections over the limit.
nac = jwt.NewAccountClaims(pub)
nac.Limits.Conn = 2
ajwt, _ = nac.Encode(okp)
s.updateAccountWithClaimJWT(acc, ajwt)
if acc.MaxActiveConnections() != 2 {
t.Fatalf("Expected max connections to be set to 2, got %d", acc.MaxActiveConnections())
}
// We should have closed the excess connections.
if total := s.NumClients(); total != acc.MaxActiveConnections() {
t.Fatalf("Expected %d connections, got %d", acc.MaxActiveConnections(), total)
}
}
func TestAccountConnsLimitExceededAfterUpdateDisconnectNewOnly(t *testing.T) {
s, opts := runTrustedServer(t)
defer s.Shutdown()
sacc, _ := createAccount(s)
s.setSystemAccount(sacc)
// Let's create a normal account with limits we can update.
okp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
pub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(pub)
nac.Limits.Conn = 10
ajwt, _ := nac.Encode(okp)
addAccountToMemResolver(s, pub, ajwt)
acc := s.LookupAccount(pub)
// Now create the max connections.
// We create half then we will wait and then create the rest.
// Will test that we disconnect the newest ones.
newConns := make([]*nats.Conn, 0, 5)
url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
for i := 0; i < 5; i++ {
nats.Connect(url, nats.NoReconnect(), createUserCreds(t, s, akp))
}
time.Sleep(500 * time.Millisecond)
for i := 0; i < 5; i++ {
nc, _ := nats.Connect(url, nats.NoReconnect(), createUserCreds(t, s, akp))
newConns = append(newConns, nc)
}
// We should have max here.
if total := s.NumClients(); total != acc.MaxActiveConnections() {
t.Fatalf("Expected %d connections, got %d", acc.MaxActiveConnections(), total)
}
// Now change limits to make current connections over the limit.
nac = jwt.NewAccountClaims(pub)
nac.Limits.Conn = 5
ajwt, _ = nac.Encode(okp)
s.updateAccountWithClaimJWT(acc, ajwt)
if acc.MaxActiveConnections() != 5 {
t.Fatalf("Expected max connections to be set to 2, got %d", acc.MaxActiveConnections())
}
// We should have closed the excess connections.
if total := s.NumClients(); total != acc.MaxActiveConnections() {
t.Fatalf("Expected %d connections, got %d", acc.MaxActiveConnections(), total)
}
// Now make sure that only the new ones were closed.
var closed int
for _, nc := range newConns {
if !nc.IsClosed() {
closed++
}
}
if closed != 5 {
t.Fatalf("Expected all new clients to be closed, only got %d of 5", closed)
}
}

99
server/jwt.go Normal file
View File

@@ -0,0 +1,99 @@
// Copyright 2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"fmt"
"io/ioutil"
"regexp"
"github.com/nats-io/jwt"
"github.com/nats-io/nkeys"
)
var nscDecoratedRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}[^\n]*[-]{3,}\n)(.+)(?:\n\s*[-]{3,}[^\n]*[-]{3,}[\n]*))`)
// readOperatorJWT
func readOperatorJWT(jwtfile string) (*jwt.OperatorClaims, error) {
contents, err := ioutil.ReadFile(jwtfile)
if err != nil {
return nil, err
}
defer wipeSlice(contents)
var claim string
items := nscDecoratedRe.FindAllSubmatch(contents, -1)
if len(items) == 0 {
claim = string(contents)
} else {
// First result should be the JWT.
// We copy here so that if the file contained a seed file too we wipe appropriately.
raw := items[0][1]
tmp := make([]byte, len(raw))
copy(tmp, raw)
claim = string(tmp)
}
opc, err := jwt.DecodeOperatorClaims(claim)
if err != nil {
return nil, err
}
return opc, nil
}
// Just wipe slice with 'x', for clearing contents of nkey seed file.
func wipeSlice(buf []byte) {
for i := range buf {
buf[i] = 'x'
}
}
// validateTrustedOperators will check that we do not have conflicts with
// assigned trusted keys and trusted operators. If operators are defined we
// will expand the trusted keys in options.
func validateTrustedOperators(o *Options) error {
if len(o.TrustedOperators) == 0 {
return nil
}
if o.AllowNewAccounts {
return fmt.Errorf("operators do not allow dynamic creation of new accounts")
}
if o.AccountResolver == nil {
return fmt.Errorf("operators require an account resolver to be configured")
}
if len(o.Accounts) > 0 {
return fmt.Errorf("operators do not allow Accounts to be configured directly")
}
if len(o.Users) > 0 || len(o.Nkeys) > 0 {
return fmt.Errorf("operators do not allow users to be configured directly")
}
if len(o.TrustedOperators) > 0 && len(o.TrustedKeys) > 0 {
return fmt.Errorf("conflicting options for 'TrustedKeys' and 'TrustedOperators'")
}
// If we have operators, fill in the trusted keys.
// FIXME(dlc) - We had TrustedKeys before TrsutedOperators. The jwt.OperatorClaims
// has a DidSign(). Use that longer term. For now we can expand in place.
for _, opc := range o.TrustedOperators {
if o.TrustedKeys == nil {
o.TrustedKeys = make([]string, 0, 4)
}
o.TrustedKeys = append(o.TrustedKeys, opc.Issuer)
o.TrustedKeys = append(o.TrustedKeys, opc.SigningKeys...)
}
for _, key := range o.TrustedKeys {
if !nkeys.IsValidPublicOperatorKey(key) {
return fmt.Errorf("trusted Keys %q are required to be a valid public operator nkey", key)
}
}
return nil
}

View File

@@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"
"time"
@@ -29,7 +30,8 @@ import (
)
var (
oSeed = []byte("SOAL7GTNI66CTVVNXBNQMG6V2HTDRWC3HGEP7D2OUTWNWSNYZDXWFOX4SU")
// This matches ./configs/nkeys_jwts/test.seed
oSeed = []byte("SOAFYNORQLQFJYBYNUGC5D7SH2MXMUX5BFEWWGHN3EK4VGG5TPT5DZP7QU")
aSeed = []byte("SAANRM6JVDEYZTR6DXCWUSDDJHGOHAFITXEQBSEZSY5JENTDVRZ6WNKTTY")
)
@@ -37,7 +39,7 @@ func opTrustBasicSetup() *Server {
kp, _ := nkeys.FromSeed(oSeed)
pub, _ := kp.PublicKey()
opts := defaultServerOptions
opts.TrustedNkeys = []string{pub}
opts.TrustedKeys = []string{pub}
s, _, _, _ := rawSetup(opts)
return s
}
@@ -49,9 +51,9 @@ func buildMemAccResolver(s *Server) {
s.mu.Unlock()
}
func addAccountToMemResolver(s *Server, pub, jwt string) {
func addAccountToMemResolver(s *Server, pub, jwtclaim string) {
s.mu.Lock()
s.accResolver.(*MemAccResolver).Store(pub, jwt)
s.accResolver.Store(pub, jwtclaim)
s.mu.Unlock()
}
@@ -159,7 +161,7 @@ func TestJWTUserBadTrusted(t *testing.T) {
}
// Now place bad trusted key
s.mu.Lock()
s.trustedNkeys = []string{"bad"}
s.trustedKeys = []string{"bad"}
s.mu.Unlock()
buildMemAccResolver(s)
@@ -1376,3 +1378,80 @@ func TestJWTAccountServiceImportExpires(t *testing.T) {
parseAsyncB("PING\r\n")
expectPong(crb)
}
func TestAccountURLResolver(t *testing.T) {
kp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
apub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(apub)
ajwt, err := nac.Encode(kp)
if err != nil {
t.Fatalf("Error generating account JWT: %v", err)
}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(ajwt))
}))
defer ts.Close()
confTemplate := `
listen: -1
resolver: URL("%s/ngs/v1/accounts/jwt/")
`
conf := createConfFile(t, []byte(fmt.Sprintf(confTemplate, ts.URL)))
defer os.Remove(conf)
s, opts := RunServerWithConfig(conf)
pub, _ := kp.PublicKey()
opts.TrustedKeys = []string{pub}
defer s.Shutdown()
acc := s.LookupAccount(apub)
if acc == nil {
t.Fatalf("Expected to receive an account")
}
if acc.Name != apub {
t.Fatalf("Account name did not match claim key")
}
}
func TestAccountURLResolverTimeout(t *testing.T) {
kp, _ := nkeys.FromSeed(oSeed)
akp, _ := nkeys.CreateAccount()
apub, _ := akp.PublicKey()
nac := jwt.NewAccountClaims(apub)
ajwt, err := nac.Encode(kp)
if err != nil {
t.Fatalf("Error generating account JWT: %v", err)
}
basePath := "/ngs/v1/accounts/jwt/"
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == basePath {
w.Write([]byte("ok"))
return
}
// Purposely be slow on account lookup.
time.Sleep(2*time.Second + 200*time.Millisecond)
w.Write([]byte(ajwt))
}))
defer ts.Close()
confTemplate := `
listen: -1
resolver: URL("%s%s")
`
conf := createConfFile(t, []byte(fmt.Sprintf(confTemplate, ts.URL, basePath)))
defer os.Remove(conf)
s, opts := RunServerWithConfig(conf)
pub, _ := kp.PublicKey()
opts.TrustedKeys = []string{pub}
defer s.Shutdown()
acc := s.LookupAccount(apub)
if acc != nil {
t.Fatalf("Expected to not receive an account due to timeout")
}
}

View File

@@ -29,7 +29,7 @@ func (s *Server) nonceRequired() bool {
s.optsMu.RLock()
defer s.optsMu.RUnlock()
return len(s.opts.Nkeys) > 0 || len(s.opts.TrustedNkeys) > 0
return len(s.opts.Nkeys) > 0 || len(s.opts.TrustedKeys) > 0
}
// Generate a nonce for INFO challenge.

View File

@@ -24,11 +24,13 @@ import (
"net"
"net/url"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/nats-io/gnatsd/conf"
"github.com/nats-io/jwt"
"github.com/nats-io/nkeys"
)
@@ -129,7 +131,11 @@ type Options struct {
RQSubsSweep time.Duration `json:"-"` // Deprecated
MaxClosedClients int `json:"-"`
LameDuckDuration time.Duration `json:"-"`
TrustedNkeys []string `json:"-"`
// Operating a trusted NATS server
TrustedKeys []string `json:"-"`
TrustedOperators []*jwt.OperatorClaims `json:"-"`
AccountResolver AccountResolver `json:"-"`
CustomClientAuthentication Authentication `json:"-"`
CustomRouterAuthentication Authentication `json:"-"`
@@ -139,9 +145,6 @@ type Options struct {
// private fields, used for testing
gatewaysSolicitDelay time.Duration
// Used to spin up a memory account resolver for testing.
accResolver AccountResolver
}
type netResolver interface {
@@ -489,12 +492,71 @@ func (o *Options) ProcessConfigFile(configFile string) error {
continue
}
o.LameDuckDuration = dur
case "trusted":
case "operator", "operators", "roots", "root", "root_operators", "root_operator":
opFiles := []string{}
switch v.(type) {
case string:
o.TrustedNkeys = []string{v.(string)}
opFiles = append(opFiles, v.(string))
case []string:
o.TrustedNkeys = v.([]string)
opFiles = append(opFiles, v.([]string)...)
default:
err := &configErr{tk, fmt.Sprintf("error parsing operators: unsupported type %T", v)}
errors = append(errors, err)
}
// Assume for now these are file names.
// TODO(dlc) - If we try to read the file and it fails we could treat the string
// as the JWT itself.
o.TrustedOperators = make([]*jwt.OperatorClaims, 0, len(opFiles))
for _, fname := range opFiles {
opc, err := readOperatorJWT(fname)
if err != nil {
err := &configErr{tk, fmt.Sprintf("error parsing operator JWT: %v", err)}
errors = append(errors, err)
continue
}
o.TrustedOperators = append(o.TrustedOperators, opc)
}
case "resolver", "account_resolver", "accounts_resolver":
var memResolverRe = regexp.MustCompile(`(MEM|MEMORY|mem|memory)\s*`)
var resolverRe = regexp.MustCompile(`(?:URL|url){1}(?:\({1}\s*"?([^\s"]*)"?\s*\){1})?\s*`)
str, ok := v.(string)
if !ok {
err := &configErr{tk, fmt.Sprintf("error parsing operator resolver, wrong type %T", v)}
errors = append(errors, err)
continue
}
if memResolverRe.MatchString(str) {
o.AccountResolver = &MemAccResolver{}
} else {
items := resolverRe.FindStringSubmatch(str)
if len(items) == 2 {
url := items[1]
if ur, err := NewURLAccResolver(url); err != nil {
err := &configErr{tk, fmt.Sprintf("URL account resolver error: %v", err)}
errors = append(errors, err)
continue
} else {
o.AccountResolver = ur
}
}
}
if o.AccountResolver == nil {
err := &configErr{tk, fmt.Sprintf("error parsing account resolver, should be MEM or URL(\"url\")")}
errors = append(errors, err)
}
case "system_account", "system":
if sa, ok := v.(string); !ok {
err := &configErr{tk, fmt.Sprintf("system account name must be a string")}
errors = append(errors, err)
} else {
o.SystemAccount = sa
}
case "trusted", "trusted_keys":
switch v.(type) {
case string:
o.TrustedKeys = []string{v.(string)}
case []string:
o.TrustedKeys = v.([]string)
case []interface{}:
keys := make([]string, 0, len(v.([]interface{})))
for _, mv := range v.([]interface{}) {
@@ -507,13 +569,13 @@ func (o *Options) ProcessConfigFile(configFile string) error {
continue
}
}
o.TrustedNkeys = keys
o.TrustedKeys = keys
default:
err := &configErr{tk, fmt.Sprintf("error parsing trusted: unsupported type %T", v)}
errors = append(errors, err)
}
// Do a quick sanity check on keys
for _, key := range o.TrustedNkeys {
for _, key := range o.TrustedKeys {
if !nkeys.IsValidPublicOperatorKey(key) {
err := &configErr{tk, fmt.Sprintf("trust key %q required to be a valid public operator nkey", key)}
errors = append(errors, err)

View File

@@ -161,7 +161,7 @@ type Server struct {
ldmCh chan bool
// Trusted public operator keys.
trustedNkeys []string
trustedKeys []string
}
// Make sure all are 64bits for atomic use
@@ -173,8 +173,16 @@ type stats struct {
slowConsumers int64
}
// DEPRECATED: Use NewServer(opts)
// New will setup a new server struct after parsing the options.
func New(opts *Options) *Server {
s, _ := NewServer(opts)
return s
}
// NewServer will setup a new server struct after parsing the options.
// Could return an error if options can not be validated.
func NewServer(opts *Options) (*Server, error) {
setBaselineOptions(opts)
// Process TLS options, including whether we require client certificates.
@@ -189,10 +197,8 @@ func New(opts *Options) *Server {
// server will always be started with configuration parsing (that could
// report issues). Its options can be (incorrectly) set by hand when
// server is embedded. If there is an error, return nil.
// TODO(ik): Should probably have a new NewServer() API that returns (*Server, error)
// so user can know what's wrong.
if err := validateOptions(opts); err != nil {
return nil
return nil, err
}
info := Info{
@@ -221,9 +227,9 @@ func New(opts *Options) *Server {
configTime: now,
}
// Trusted root keys.
if !s.processTrustedNkeys() {
return nil
// Trusted root operator keys.
if !s.processTrustedKeys() {
return nil, fmt.Errorf("Error processing trusted operator keys")
}
s.mu.Lock()
@@ -245,7 +251,7 @@ func New(opts *Options) *Server {
// may try to send things to gateways.
gws, err := newGateway(opts)
if err != nil {
return nil
return nil, err
}
s.gateway = gws
@@ -269,7 +275,7 @@ func New(opts *Options) *Server {
// For tracking accounts
if err := s.configureAccounts(); err != nil {
return nil
return nil, err
}
// Used to setup Authorization.
@@ -278,10 +284,14 @@ func New(opts *Options) *Server {
// Start signal handler
s.handleSignals()
return s
return s, nil
}
func validateOptions(o *Options) error {
// Check that the trust configuration is correct.
if err := validateTrustedOperators(o); err != nil {
return err
}
// Check that gateway is properly configured. Returns no error
// if there is no gateway defined.
return validateGatewayOptions(o)
@@ -318,11 +328,11 @@ func (s *Server) configureAccounts() error {
s.registerAccount(acc)
}
// Check for configured account resolvers.
if opts.accResolver != nil {
s.accResolver = opts.accResolver
if opts.AccountResolver != nil {
s.accResolver = opts.AccountResolver
}
// Check that if we have a SystemAccount it can
// be properly resolved.
// Set the system account if it was configured.
if opts.SystemAccount != _EMPTY_ {
if acc := s.lookupAccount(opts.SystemAccount); acc == nil {
return ErrMissingAccount
@@ -347,7 +357,7 @@ func (s *Server) generateRouteInfoJSON() {
func (s *Server) isTrustedIssuer(issuer string) bool {
s.mu.Lock()
defer s.mu.Unlock()
for _, tk := range s.trustedNkeys {
for _, tk := range s.trustedKeys {
if tk == issuer {
return true
}
@@ -355,25 +365,25 @@ func (s *Server) isTrustedIssuer(issuer string) bool {
return false
}
// processTrustedNkeys will process stamped and option based
// processTrustedKeys will process stamped and option based
// trusted nkeys. Returns success.
func (s *Server) processTrustedNkeys() bool {
if trustedNkeys != "" && !s.initStampedTrustedNkeys() {
func (s *Server) processTrustedKeys() bool {
if trustedKeys != "" && !s.initStampedTrustedKeys() {
return false
} else if s.opts.TrustedNkeys != nil {
for _, key := range s.opts.TrustedNkeys {
} else if s.opts.TrustedKeys != nil {
for _, key := range s.opts.TrustedKeys {
if !nkeys.IsValidPublicOperatorKey(key) {
return false
}
}
s.trustedNkeys = s.opts.TrustedNkeys
s.trustedKeys = s.opts.TrustedKeys
}
return true
}
// checkTrustedNkeyString will check that the string is a valid array
// checkTrustedKeyString will check that the string is a valid array
// of public operator nkeys.
func checkTrustedNkeyString(keys string) []string {
func checkTrustedKeyString(keys string) []string {
tks := strings.Fields(keys)
if len(tks) == 0 {
return nil
@@ -387,19 +397,19 @@ func checkTrustedNkeyString(keys string) []string {
return tks
}
// initStampedTrustedNkeys will check the stamped trusted keys
// and will set the server field 'trustedNkeys'. Returns whether
// initStampedTrustedKeys will check the stamped trusted keys
// and will set the server field 'trustedKeys'. Returns whether
// it succeeded or not.
func (s *Server) initStampedTrustedNkeys() bool {
func (s *Server) initStampedTrustedKeys() bool {
// Check to see if we have an override in options, which will cause us to fail.
if len(s.opts.TrustedNkeys) > 0 {
if len(s.opts.TrustedKeys) > 0 {
return false
}
tks := checkTrustedNkeyString(trustedNkeys)
tks := checkTrustedKeyString(trustedKeys)
if len(tks) == 0 {
return false
}
s.trustedNkeys = tks
s.trustedKeys = tks
return true
}
@@ -509,20 +519,56 @@ func (s *Server) RegisterAccount(name string) (*Account, error) {
return acc, nil
}
// SetSystemAccount will set the internal system account.
// If root operators are present it will also check validity.
func (s *Server) SetSystemAccount(accName string) error {
s.mu.Lock()
if acc := s.accounts[accName]; acc != nil {
s.mu.Unlock()
return s.setSystemAccount(acc)
}
// If we are here we do not have local knowledge of this account.
// Do this one by hand to return more useful error.
ac, jwt, err := s.fetchAccountClaims(accName)
if err != nil {
s.mu.Unlock()
return err
}
acc := s.buildInternalAccount(ac)
acc.claimJWT = jwt
s.registerAccount(acc)
s.mu.Unlock()
return s.setSystemAccount(acc)
}
// SystemAccount returns the system account if set.
func (s *Server) SystemAccount() *Account {
s.mu.Lock()
defer s.mu.Unlock()
if s.sys != nil {
return s.sys.account
}
return nil
}
// Assign an system account. Should only be called once.
// This sets up a server to send and receive messages from inside
// the server itself.
func (s *Server) setSystemAccount(acc *Account) error {
if acc == nil {
return fmt.Errorf("system account is nil")
return ErrMissingAccount
}
// Don't try to fix this here.
if acc.IsExpired() {
return ErrAccountExpired
}
if !s.isTrustedIssuer(acc.Issuer) {
return fmt.Errorf("system account not a trusted account")
return ErrAccountValidation
}
s.mu.Lock()
if s.sys != nil {
s.mu.Unlock()
return fmt.Errorf("system account already exists")
return ErrAccountExists
}
s.sys = &internal{
@@ -602,7 +648,7 @@ func (s *Server) registerAccount(acc *Account) {
// lookupAccount is a function to return the account structure
// associated with an account name.
// Lock shiould be held on entry.
// Lock should be held on entry.
func (s *Server) lookupAccount(name string) *Account {
acc := s.accounts[name]
if acc != nil {
@@ -651,6 +697,26 @@ func (s *Server) updateAccount(acc *Account) bool {
return false
}
// updateAccountWithClaimJWT will chack and apply the claim update.
func (s *Server) updateAccountWithClaimJWT(acc *Account, claimJWT string) bool {
if acc == nil {
return false
}
acc.updated = time.Now()
if acc.claimJWT != "" && acc.claimJWT == claimJWT {
s.Debugf("Requested account update for [%s], same claims detected", acc.Name)
return false
}
accClaims, _, err := s.verifyAccountClaims(claimJWT)
if err == nil && accClaims != nil {
acc.claimJWT = claimJWT
s.updateAccountClaims(acc, accClaims)
return true
}
return false
}
// fetchRawAccountClaims will grab raw account claims iff we have a resolver.
// Lock is held upon entry.
func (s *Server) fetchRawAccountClaims(name string) (string, error) {
@@ -660,8 +726,14 @@ func (s *Server) fetchRawAccountClaims(name string) (string, error) {
}
// Need to do actual Fetch without the lock.
s.mu.Unlock()
start := time.Now()
claimJWT, err := accResolver.Fetch(name)
fetchTime := time.Since(start)
s.mu.Lock()
s.Debugf("Account resolver fetch time was %v\n", fetchTime)
if fetchTime > time.Second {
s.Warnf("Account resolver took %v to fetch account", fetchTime)
}
if err != nil {
return "", err
}
@@ -669,6 +741,7 @@ func (s *Server) fetchRawAccountClaims(name string) (string, error) {
}
// fetchAccountClaims will attempt to fetch new claims if a resolver is present.
// Lock is held upon entry.
func (s *Server) fetchAccountClaims(name string) (*jwt.AccountClaims, string, error) {
claimJWT, err := s.fetchRawAccountClaims(name)
if err != nil {
@@ -695,11 +768,10 @@ func (s *Server) verifyAccountClaims(claimJWT string) (*jwt.AccountClaims, strin
// Lock should be held upon entry.
func (s *Server) fetchAccount(name string) *Account {
if accClaims, claimJWT, _ := s.fetchAccountClaims(name); accClaims != nil {
if acc := s.buildInternalAccount(accClaims); acc != nil {
acc.claimJWT = claimJWT
s.registerAccount(acc)
return acc
}
acc := s.buildInternalAccount(accClaims)
acc.claimJWT = claimJWT
s.registerAccount(acc)
return acc
}
return nil
}
@@ -730,6 +802,20 @@ func (s *Server) Start() {
// Snapshot server options.
opts := s.getOpts()
hasOperators := len(opts.TrustedOperators) > 0
if hasOperators {
s.Noticef("Trusted Operators")
}
for _, opc := range opts.TrustedOperators {
s.Noticef(" System : %q", opc.Audience)
s.Noticef(" Operator: %q", opc.Name)
s.Noticef(" Issued : %v", time.Unix(opc.IssuedAt, 0))
s.Noticef(" Expires : %v", time.Unix(opc.Expires, 0))
}
if hasOperators && opts.SystemAccount == _EMPTY_ {
s.Warnf("Trusted Operators should utilize a System Account")
}
// Log the pid to a file
if opts.PidFile != _EMPTY_ {
if err := s.logPid(); err != nil {
@@ -745,7 +831,10 @@ func (s *Server) Start() {
// Setup system account which will start eventing stack.
if sa := opts.SystemAccount; sa != _EMPTY_ {
s.setSystemAccount(s.lookupAccount(sa))
if err := s.SetSystemAccount(sa); err != nil {
s.Fatalf("Can't set system account: %v", err)
return
}
}
// Start up gateway if needed. Do this before starting the routes, because

View File

@@ -25,60 +25,60 @@ const (
t2 = "OAHC7NGAHG3YVPTD6QOUFZGPM2OMU6EOS67O2VHBUOA6BJLPTWFHGLKU"
)
func TestStampedTrustedNkeys(t *testing.T) {
func TestStampedTrustedKeys(t *testing.T) {
opts := DefaultOptions()
defer func() { trustedNkeys = "" }()
defer func() { trustedKeys = "" }()
// Set this to a bad key. We require valid operator public keys.
trustedNkeys = "bad"
trustedKeys = "bad"
if s := New(opts); s != nil {
s.Shutdown()
t.Fatalf("Expected a bad trustedNkeys to return nil server")
t.Fatalf("Expected a bad trustedKeys to return nil server")
}
trustedNkeys = t1
trustedKeys = t1
s := New(opts)
if s == nil {
t.Fatalf("Expected non-nil server")
}
if len(s.trustedNkeys) != 1 || s.trustedNkeys[0] != t1 {
if len(s.trustedKeys) != 1 || s.trustedKeys[0] != t1 {
t.Fatalf("Trusted Nkeys not setup properly")
}
trustedNkeys = strings.Join([]string{t1, t2}, " ")
trustedKeys = strings.Join([]string{t1, t2}, " ")
if s = New(opts); s == nil {
t.Fatalf("Expected non-nil server")
}
if len(s.trustedNkeys) != 2 || s.trustedNkeys[0] != t1 || s.trustedNkeys[1] != t2 {
if len(s.trustedKeys) != 2 || s.trustedKeys[0] != t1 || s.trustedKeys[1] != t2 {
t.Fatalf("Trusted Nkeys not setup properly")
}
opts.TrustedNkeys = []string{"OVERRIDE ME"}
opts.TrustedKeys = []string{"OVERRIDE ME"}
if s = New(opts); s != nil {
t.Fatalf("Expected opts.TrustedNkeys to return nil server")
t.Fatalf("Expected opts.TrustedKeys to return nil server")
}
}
func TestTrustedKeysOptions(t *testing.T) {
trustedNkeys = ""
trustedKeys = ""
opts := DefaultOptions()
opts.TrustedNkeys = []string{"bad"}
opts.TrustedKeys = []string{"bad"}
if s := New(opts); s != nil {
s.Shutdown()
t.Fatalf("Expected a bad opts.TrustedNkeys to return nil server")
t.Fatalf("Expected a bad opts.TrustedKeys to return nil server")
}
opts.TrustedNkeys = []string{t1}
opts.TrustedKeys = []string{t1}
s := New(opts)
if s == nil {
t.Fatalf("Expected non-nil server")
}
if len(s.trustedNkeys) != 1 || s.trustedNkeys[0] != t1 {
if len(s.trustedKeys) != 1 || s.trustedKeys[0] != t1 {
t.Fatalf("Trusted Nkeys not setup properly via options")
}
opts.TrustedNkeys = []string{t1, t2}
opts.TrustedKeys = []string{t1, t2}
if s = New(opts); s == nil {
t.Fatalf("Expected non-nil server")
}
if len(s.trustedNkeys) != 2 || s.trustedNkeys[0] != t1 || s.trustedNkeys[1] != t2 {
if len(s.trustedKeys) != 2 || s.trustedKeys[0] != t1 || s.trustedKeys[1] != t2 {
t.Fatalf("Trusted Nkeys not setup properly via options")
}
}
@@ -90,11 +90,11 @@ func TestTrustConfigOption(t *testing.T) {
if err != nil {
t.Fatalf("Error parsing config: %v", err)
}
if l := len(opts.TrustedNkeys); l != 1 {
if l := len(opts.TrustedKeys); l != 1 {
t.Fatalf("Expected 1 trusted key, got %d", l)
}
if opts.TrustedNkeys[0] != t1 {
t.Fatalf("Expected trusted key to be %q, got %q", t1, opts.TrustedNkeys[0])
if opts.TrustedKeys[0] != t1 {
t.Fatalf("Expected trusted key to be %q, got %q", t1, opts.TrustedKeys[0])
}
confFileName = createConfFile(t, []byte(fmt.Sprintf("trusted = [%q, %q]", t1, t2)))
@@ -103,14 +103,14 @@ func TestTrustConfigOption(t *testing.T) {
if err != nil {
t.Fatalf("Error parsing config: %v", err)
}
if l := len(opts.TrustedNkeys); l != 2 {
if l := len(opts.TrustedKeys); l != 2 {
t.Fatalf("Expected 2 trusted key, got %d", l)
}
if opts.TrustedNkeys[0] != t1 {
t.Fatalf("Expected trusted key to be %q, got %q", t1, opts.TrustedNkeys[0])
if opts.TrustedKeys[0] != t1 {
t.Fatalf("Expected trusted key to be %q, got %q", t1, opts.TrustedKeys[0])
}
if opts.TrustedNkeys[1] != t2 {
t.Fatalf("Expected trusted key to be %q, got %q", t2, opts.TrustedNkeys[1])
if opts.TrustedKeys[1] != t2 {
t.Fatalf("Expected trusted key to be %q, got %q", t2, opts.TrustedKeys[1])
}
// Now do a bad one.