mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
@@ -14,10 +14,14 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/nats-io/nkeys"
|
||||
)
|
||||
|
||||
func simpleAccountServer(t *testing.T) (*Server, *Account, *Account) {
|
||||
@@ -45,9 +49,6 @@ func TestRegisterDuplicateAccounts(t *testing.T) {
|
||||
|
||||
func TestAccountIsolation(t *testing.T) {
|
||||
s, fooAcc, barAcc := simpleAccountServer(t)
|
||||
if fooAcc == nil || barAcc == nil {
|
||||
t.Fatalf("Error retrieving accounts for 'foo' and 'bar'")
|
||||
}
|
||||
cfoo, crFoo, _ := newClientForServer(s)
|
||||
if err := cfoo.registerWithAccount(fooAcc); err != nil {
|
||||
t.Fatalf("Error register client with 'foo' account: %v", err)
|
||||
@@ -135,10 +136,18 @@ func TestNewAccountsFromClients(t *testing.T) {
|
||||
opts.AllowNewAccounts = true
|
||||
s = New(&opts)
|
||||
|
||||
c, _, _ = newClientForServer(s)
|
||||
c, cr, _ = newClientForServer(s)
|
||||
err := c.parse(connectOp)
|
||||
if err != nil {
|
||||
t.Fatalf("Received an error trying to create an account: %v", err)
|
||||
t.Fatalf("Received an error trying to connect: %v", err)
|
||||
}
|
||||
go c.parse([]byte("PING\r\n"))
|
||||
l, err = cr.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading response for client from server: %v", err)
|
||||
}
|
||||
if !strings.HasPrefix(l, "PONG\r\n") {
|
||||
t.Fatalf("PONG response incorrect: %q", l)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -255,7 +264,6 @@ func TestAccountParseConfig(t *testing.T) {
|
||||
if u.Username == "derek" {
|
||||
if u.Account != natsAcc {
|
||||
t.Fatalf("Expected to see the 'nats.io' account, but received %+v", u.Account)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -302,8 +310,7 @@ func TestAccountParseConfigImportsExports(t *testing.T) {
|
||||
for _, acc := range opts.Accounts {
|
||||
if acc.Name == "nats.io" {
|
||||
natsAcc = acc
|
||||
}
|
||||
if acc.Name == "synadia" {
|
||||
} else if acc.Name == "synadia" {
|
||||
synAcc = acc
|
||||
}
|
||||
}
|
||||
@@ -420,7 +427,7 @@ func TestImportExportConfigFailures(t *testing.T) {
|
||||
cf = createConfFile(t, []byte(`
|
||||
accounts {
|
||||
nats.io {
|
||||
exports = [{service: {account: nats.io, subject:"foo.*"}]
|
||||
exports = [{service: {account: nats.io, subject:"foo.*"}}]
|
||||
}
|
||||
}
|
||||
`))
|
||||
@@ -490,6 +497,7 @@ func TestImportAuthorized(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSimpleMapping(t *testing.T) {
|
||||
t.Helper()
|
||||
s, fooAcc, barAcc := simpleAccountServer(t)
|
||||
defer s.Shutdown()
|
||||
|
||||
@@ -555,7 +563,7 @@ func TestSimpleMapping(t *testing.T) {
|
||||
|
||||
l, err = crBar.ReadString('\n')
|
||||
if err != nil {
|
||||
t.Fatalf("Error reading from client 'baz': %v", err)
|
||||
t.Fatalf("Error reading from client 'bar': %v", err)
|
||||
}
|
||||
checkMsg(l, "2")
|
||||
checkPayload(crBar, []byte("hello\r\n"), t)
|
||||
@@ -578,11 +586,11 @@ func TestNoPrefixWildcardMapping(t *testing.T) {
|
||||
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
||||
}
|
||||
|
||||
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined.
|
||||
t.Fatalf("Error adding account export to client foo: %v", err)
|
||||
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil {
|
||||
t.Fatalf("Error adding stream export to client foo: %v", err)
|
||||
}
|
||||
if err := cbar.acc.addStreamImport(fooAcc, "*", ""); err != nil {
|
||||
t.Fatalf("Error adding account import to client bar: %v", err)
|
||||
t.Fatalf("Error adding stream import to client bar: %v", err)
|
||||
}
|
||||
|
||||
// Normal Subscription on bar client for literal "foo".
|
||||
@@ -631,11 +639,12 @@ func TestPrefixWildcardMapping(t *testing.T) {
|
||||
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
||||
}
|
||||
|
||||
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined.
|
||||
t.Fatalf("Error adding account export to client foo: %v", err)
|
||||
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil {
|
||||
t.Fatalf("Error adding stream export to client foo: %v", err)
|
||||
}
|
||||
// Checking that trailing '.' is accepted, tested that it is auto added above.
|
||||
if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil {
|
||||
t.Fatalf("Error adding account import to client bar: %v", err)
|
||||
t.Fatalf("Error adding stream import to client bar: %v", err)
|
||||
}
|
||||
|
||||
// Normal Subscription on bar client for wildcard.
|
||||
@@ -684,11 +693,11 @@ func TestPrefixWildcardMappingWithLiteralSub(t *testing.T) {
|
||||
t.Fatalf("Error registering client with 'bar' account: %v", err)
|
||||
}
|
||||
|
||||
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil { // Public with no accounts defined.
|
||||
t.Fatalf("Error adding account export to client foo: %v", err)
|
||||
if err := cfoo.acc.addStreamExport(">", []*Account{barAcc}); err != nil {
|
||||
t.Fatalf("Error adding stream export to client foo: %v", err)
|
||||
}
|
||||
if err := cbar.acc.addStreamImport(fooAcc, "*", "pub.imports."); err != nil {
|
||||
t.Fatalf("Error adding account import to client bar: %v", err)
|
||||
t.Fatalf("Error adding stream import to client bar: %v", err)
|
||||
}
|
||||
|
||||
// Normal Subscription on bar client for wildcard.
|
||||
@@ -819,6 +828,133 @@ func TestCrossAccountRequestReply(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccountMapsUsers(t *testing.T) {
|
||||
// Used for the nkey users to properly sign.
|
||||
seed1 := "SUAPM67TC4RHQLKBX55NIQXSMATZDOZK6FNEOSS36CAYA7F7TY66LP4BOM"
|
||||
seed2 := "SUAIS5JPX4X4GJ7EIIJEQ56DH2GWPYJRPWN5XJEDENJOZHCBLI7SEPUQDE"
|
||||
|
||||
confFileName := createConfFile(t, []byte(`
|
||||
accounts {
|
||||
synadia {
|
||||
users = [
|
||||
{user: derek, password: foo},
|
||||
{nkey: UCNGL4W5QX66CFX6A6DCBVDH5VOHMI7B2UZZU7TXAUQQSI2JPHULCKBR}
|
||||
]
|
||||
}
|
||||
nats {
|
||||
users = [
|
||||
{user: ivan, password: bar},
|
||||
{nkey: UDPGQVFIWZ7Q5UH4I5E6DBCZULQS6VTVBG6CYBD7JV3G3N2GMQOMNAUH}
|
||||
]
|
||||
}
|
||||
}
|
||||
`))
|
||||
defer os.Remove(confFileName)
|
||||
opts, err := ProcessConfigFile(confFileName)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error parsing config file: %v", err)
|
||||
}
|
||||
s := New(opts)
|
||||
synadia := s.LookupAccount("synadia")
|
||||
nats := s.LookupAccount("nats")
|
||||
|
||||
if synadia == nil || nats == nil {
|
||||
t.Fatalf("Expected non nil accounts during lookup")
|
||||
}
|
||||
|
||||
// Make sure a normal log in maps the accounts correctly.
|
||||
c, _, _ := newClientForServer(s)
|
||||
connectOp := []byte("CONNECT {\"user\":\"derek\",\"pass\":\"foo\"}\r\n")
|
||||
c.parse(connectOp)
|
||||
if c.acc != synadia {
|
||||
t.Fatalf("Expected the client's account to match 'synadia', got %v", c.acc)
|
||||
}
|
||||
// Also test client sublist.
|
||||
if c.sl != synadia.sl {
|
||||
t.Fatalf("Expected the client's sublist to match 'synadia' account")
|
||||
}
|
||||
|
||||
c, _, _ = newClientForServer(s)
|
||||
connectOp = []byte("CONNECT {\"user\":\"ivan\",\"pass\":\"bar\"}\r\n")
|
||||
c.parse(connectOp)
|
||||
if c.acc != nats {
|
||||
t.Fatalf("Expected the client's account to match 'nats', got %v", c.acc)
|
||||
}
|
||||
// Also test client sublist.
|
||||
if c.sl != nats.sl {
|
||||
t.Fatalf("Expected the client's sublist to match 'nats' account")
|
||||
}
|
||||
|
||||
// Now test nkeys as well.
|
||||
kp, _ := nkeys.FromSeed(seed1)
|
||||
pubKey, _ := kp.PublicKey()
|
||||
|
||||
c, cr, l := newClientForServer(s)
|
||||
// Check for Nonce
|
||||
var info nonceInfo
|
||||
err = json.Unmarshal([]byte(l[5:]), &info)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not parse INFO json: %v\n", err)
|
||||
}
|
||||
if info.Nonce == "" {
|
||||
t.Fatalf("Expected a non-empty nonce with nkeys defined")
|
||||
}
|
||||
sigraw, err := kp.Sign([]byte(info.Nonce))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed signing nonce: %v", err)
|
||||
}
|
||||
sig := base64.StdEncoding.EncodeToString(sigraw)
|
||||
|
||||
// PING needed to flush the +OK to us.
|
||||
cs := fmt.Sprintf("CONNECT {\"nkey\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", pubKey, sig)
|
||||
go c.parse([]byte(cs))
|
||||
l, _ = cr.ReadString('\n')
|
||||
if !strings.HasPrefix(l, "+OK") {
|
||||
t.Fatalf("Expected an OK, got: %v", l)
|
||||
}
|
||||
if c.acc != synadia {
|
||||
t.Fatalf("Expected the nkey client's account to match 'synadia', got %v", c.acc)
|
||||
}
|
||||
// Also test client sublist.
|
||||
if c.sl != synadia.sl {
|
||||
t.Fatalf("Expected the client's sublist to match 'synadia' account")
|
||||
}
|
||||
|
||||
// Now nats account nkey user.
|
||||
kp, _ = nkeys.FromSeed(seed2)
|
||||
pubKey, _ = kp.PublicKey()
|
||||
|
||||
c, cr, l = newClientForServer(s)
|
||||
// Check for Nonce
|
||||
err = json.Unmarshal([]byte(l[5:]), &info)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not parse INFO json: %v\n", err)
|
||||
}
|
||||
if info.Nonce == "" {
|
||||
t.Fatalf("Expected a non-empty nonce with nkeys defined")
|
||||
}
|
||||
sigraw, err = kp.Sign([]byte(info.Nonce))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed signing nonce: %v", err)
|
||||
}
|
||||
sig = base64.StdEncoding.EncodeToString(sigraw)
|
||||
|
||||
// PING needed to flush the +OK to us.
|
||||
cs = fmt.Sprintf("CONNECT {\"nkey\":%q,\"sig\":\"%s\",\"verbose\":true,\"pedantic\":true}\r\nPING\r\n", pubKey, sig)
|
||||
go c.parse([]byte(cs))
|
||||
l, _ = cr.ReadString('\n')
|
||||
if !strings.HasPrefix(l, "+OK") {
|
||||
t.Fatalf("Expected an OK, got: %v", l)
|
||||
}
|
||||
if c.acc != nats {
|
||||
t.Fatalf("Expected the nkey client's account to match 'nats', got %v", c.acc)
|
||||
}
|
||||
// Also test client sublist.
|
||||
if c.sl != nats.sl {
|
||||
t.Fatalf("Expected the client's sublist to match 'nats' account")
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkNewRouteReply(b *testing.B) {
|
||||
opts := defaultServerOptions
|
||||
s := New(&opts)
|
||||
|
||||
@@ -64,11 +64,13 @@ type serviceImport struct {
|
||||
ae bool
|
||||
}
|
||||
|
||||
// importMap tracks the imported streams and services.
|
||||
type importMap struct {
|
||||
streams map[string]*streamImport
|
||||
services map[string]*serviceImport // TODO(dlc) sync.Map may be better.
|
||||
}
|
||||
|
||||
// exportMap tracks the exported streams and services.
|
||||
type exportMap struct {
|
||||
streams map[string]map[string]*Account
|
||||
services map[string]map[string]*Account
|
||||
@@ -364,7 +366,7 @@ func (s *Server) checkAuthforWarnings() {
|
||||
}
|
||||
if warn {
|
||||
// Warning about using plaintext passwords.
|
||||
s.Warnf("Plaintext passwords detected. Use Nkeys or Bcrypt passwords in config files.")
|
||||
s.Warnf("Plaintext passwords detected, use nkeys or bcrypt.")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -480,6 +482,7 @@ func (s *Server) isClientAuthorized(c *client) bool {
|
||||
if err := pub.Verify(c.nonce, sig); err != nil {
|
||||
return false
|
||||
}
|
||||
c.RegisterNkeyUser(nkey)
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
@@ -335,13 +335,42 @@ func (c *client) registerWithAccount(acc *Account) error {
|
||||
}
|
||||
|
||||
// RegisterUser allows auth to call back into a new client
|
||||
// with the authenticated user. This is used to map any permissions
|
||||
// into the client.
|
||||
// with the authenticated user. This is used to map
|
||||
// any permissions into the client and setup accounts.
|
||||
func (c *client) RegisterUser(user *User) {
|
||||
// Process Permissions and map into client connection structures.
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// Register with proper account and sublist.
|
||||
if user.Account != nil {
|
||||
c.acc = user.Account
|
||||
c.sl = c.acc.sl
|
||||
}
|
||||
|
||||
// Assign permissions.
|
||||
if user.Permissions == nil {
|
||||
// Reset perms to nil in case client previously had them.
|
||||
c.perms = nil
|
||||
return
|
||||
}
|
||||
|
||||
c.setPermissions(user.Permissions)
|
||||
}
|
||||
|
||||
// RegisterNkey allows auth to call back into a new nkey
|
||||
// client with the authenticated user. This is used to map
|
||||
// any permissions into the client and setup accounts.
|
||||
func (c *client) RegisterNkeyUser(user *NkeyUser) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// Register with proper account and sublist.
|
||||
if user.Account != nil {
|
||||
c.acc = user.Account
|
||||
c.sl = c.acc.sl
|
||||
}
|
||||
|
||||
// Assign permissions.
|
||||
if user.Permissions == nil {
|
||||
// Reset perms to nil in case client previously had them.
|
||||
c.perms = nil
|
||||
@@ -1339,7 +1368,6 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
|
||||
// Check to see if we need to create a shadow subscription due to imports
|
||||
// in other accounts.
|
||||
// Assume lock is held
|
||||
func (c *client) checkAccountImports(sub *subscription) error {
|
||||
c.mu.Lock()
|
||||
acc := c.acc
|
||||
@@ -1967,28 +1995,38 @@ func (c *client) typeString() string {
|
||||
// longer authorized, e.g. due to a config reload.
|
||||
func (c *client) removeUnauthorizedSubs() {
|
||||
c.mu.Lock()
|
||||
if c.perms == nil {
|
||||
if c.perms == nil || c.sl == nil {
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
srv := c.srv
|
||||
subs := make(map[string]*subscription, len(c.subs))
|
||||
for sid, sub := range c.subs {
|
||||
subs[sid] = sub
|
||||
|
||||
var subsa [32]*subscription
|
||||
subs := subsa[:0]
|
||||
for _, sub := range c.subs {
|
||||
subs = append(subs, sub)
|
||||
}
|
||||
|
||||
var removedSubs [32]*subscription
|
||||
removed := removedSubs[:0]
|
||||
|
||||
for _, sub := range subs {
|
||||
if !c.canSubscribe(sub.subject) {
|
||||
removed = append(removed, sub)
|
||||
delete(c.subs, string(sub.sid))
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
for sid, sub := range subs {
|
||||
if c.sl != nil && !c.canSubscribe(sub.subject) {
|
||||
_ = c.sl.Remove(sub)
|
||||
c.mu.Lock()
|
||||
delete(c.subs, sid)
|
||||
c.mu.Unlock()
|
||||
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)",
|
||||
sub.subject, sub.sid))
|
||||
srv.Noticef("Removed sub %q for user %q - not authorized",
|
||||
string(sub.subject), c.opts.Username)
|
||||
}
|
||||
// Remove unauthorized clients subscriptions.
|
||||
c.sl.RemoveBatch(removed)
|
||||
|
||||
// Report back to client and logs.
|
||||
for _, sub := range removed {
|
||||
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %s)",
|
||||
sub.subject, sub.sid))
|
||||
srv.Noticef("Removed sub %q for user %q - not authorized",
|
||||
string(sub.subject), c.opts.Username)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -915,7 +915,11 @@ func parseExportStreamOrService(v map[string]interface{}, pedantic bool) (*expor
|
||||
if curStream != nil {
|
||||
return nil, nil, fmt.Errorf("Detected service but already saw a stream: %+v", mv)
|
||||
}
|
||||
curService = &export{sub: mv.(string)}
|
||||
mvs, ok := mv.(string)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("Expected service to be string name, got %T", mv)
|
||||
}
|
||||
curService = &export{sub: mvs}
|
||||
if accounts != nil {
|
||||
curService.accs = accounts
|
||||
}
|
||||
|
||||
@@ -762,7 +762,8 @@ func (s *Server) reloadClusterPermissions() {
|
||||
subsNeedUNSUB []*subscription
|
||||
deleteRoutedSubs []*subscription
|
||||
)
|
||||
s.sl.localSubs(&localSubs)
|
||||
// FIXME(dlc) - Change for accounts.
|
||||
s.gsl.localSubs(&localSubs)
|
||||
|
||||
// Go through all local subscriptions
|
||||
for _, sub := range localSubs {
|
||||
@@ -810,7 +811,8 @@ func (s *Server) reloadClusterPermissions() {
|
||||
route.mu.Unlock()
|
||||
}
|
||||
// Remove as a batch all the subs that we have removed from each route.
|
||||
s.sl.RemoveBatch(deleteRoutedSubs)
|
||||
// FIXME(dlc) - Change for accounts.
|
||||
s.gsl.RemoveBatch(deleteRoutedSubs)
|
||||
}
|
||||
|
||||
// validateClusterOpts ensures the new ClusterOpts does not change host or
|
||||
|
||||
@@ -428,7 +428,7 @@ func (s *Server) updateRemoteRoutePerms(route *client, info *Info) {
|
||||
_localSubs [4096]*subscription
|
||||
localSubs = _localSubs[:0]
|
||||
)
|
||||
s.sl.localSubs(&localSubs)
|
||||
s.gsl.localSubs(&localSubs)
|
||||
|
||||
route.sendRouteSubProtos(localSubs, func(sub *subscription) bool {
|
||||
subj := sub.subject
|
||||
|
||||
@@ -305,7 +305,8 @@ func (s *Server) newAccountsAllowed() bool {
|
||||
return s.opts.AllowNewAccounts
|
||||
}
|
||||
|
||||
func (s *Server) LookupOrRegisterAccount(name string) (*Account, bool) {
|
||||
// LookupOrRegisterAccount will return the given account if known or create a new entry.
|
||||
func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew bool) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if acc, ok := s.accounts[name]; ok {
|
||||
@@ -835,7 +836,6 @@ func (s *Server) copyInfo() Info {
|
||||
|
||||
func (s *Server) createClient(conn net.Conn) *client {
|
||||
// Snapshot server options.
|
||||
// TODO(dlc) - This can get expensive.
|
||||
opts := s.getOpts()
|
||||
|
||||
max_pay := int64(opts.MaxPayload)
|
||||
|
||||
Reference in New Issue
Block a user