mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 02:07:59 -07:00
@@ -1,3 +1,7 @@
|
||||
# Tests are starting to consume enough memory during --race tests to exceed container env on Travis of 4GB.
|
||||
sudo: required
|
||||
dist: trusty
|
||||
|
||||
language: go
|
||||
go:
|
||||
- 1.9.x
|
||||
|
||||
@@ -151,6 +151,22 @@ func (s *Server) checkAuthforWarnings() {
|
||||
}
|
||||
}
|
||||
|
||||
// If opts.Users or opts.Nkeys have definitions without an account
|
||||
// defined assign them to the default global account.
|
||||
// Lock should be held.
|
||||
func (s *Server) assignGlobalAccountToOrphanUsers() {
|
||||
for _, u := range s.users {
|
||||
if u.Account == nil {
|
||||
u.Account = s.gacc
|
||||
}
|
||||
}
|
||||
for _, u := range s.nkeys {
|
||||
if u.Account == nil {
|
||||
u.Account = s.gacc
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// configureAuthorization will do any setup needed for authorization.
|
||||
// Lock is assumed held.
|
||||
func (s *Server) configureAuthorization() {
|
||||
@@ -179,6 +195,7 @@ func (s *Server) configureAuthorization() {
|
||||
s.users[u.Username] = u
|
||||
}
|
||||
}
|
||||
s.assignGlobalAccountToOrphanUsers()
|
||||
s.info.AuthRequired = true
|
||||
} else if opts.Username != "" || opts.Authorization != "" {
|
||||
s.info.AuthRequired = true
|
||||
|
||||
190
server/client.go
190
server/client.go
@@ -133,27 +133,29 @@ const (
|
||||
type client struct {
|
||||
// Here first because of use of atomics, and memory alignment.
|
||||
stats
|
||||
mpay int64
|
||||
msubs int
|
||||
mu sync.Mutex
|
||||
typ int
|
||||
cid uint64
|
||||
opts clientOpts
|
||||
start time.Time
|
||||
nonce []byte
|
||||
nc net.Conn
|
||||
ncs string
|
||||
out outbound
|
||||
srv *Server
|
||||
acc *Account
|
||||
subs map[string]*subscription
|
||||
perms *permissions
|
||||
in readCache
|
||||
pcd map[*client]struct{}
|
||||
atmr *time.Timer
|
||||
ping pinfo
|
||||
msgb [msgScratchSize]byte
|
||||
last time.Time
|
||||
mpay int64
|
||||
msubs int
|
||||
mu sync.Mutex
|
||||
typ int
|
||||
cid uint64
|
||||
opts clientOpts
|
||||
start time.Time
|
||||
nonce []byte
|
||||
nc net.Conn
|
||||
ncs string
|
||||
out outbound
|
||||
srv *Server
|
||||
acc *Account
|
||||
subs map[string]*subscription
|
||||
perms *permissions
|
||||
mperms *msgDeny
|
||||
darray []string
|
||||
in readCache
|
||||
pcd map[*client]struct{}
|
||||
atmr *time.Timer
|
||||
ping pinfo
|
||||
msgb [msgScratchSize]byte
|
||||
last time.Time
|
||||
parseState
|
||||
|
||||
rtt time.Duration
|
||||
@@ -200,10 +202,20 @@ type permissions struct {
|
||||
pcache map[string]bool
|
||||
}
|
||||
|
||||
// msgDeny is used when a user permission for subscriptions has a deny
|
||||
// clause but a subscription could be made that is of broader scope.
|
||||
// e.g. deny = "foo", but user subscribes to "*". That subscription should
|
||||
// succeed but no message sent on foo should be delivered.
|
||||
type msgDeny struct {
|
||||
deny *Sublist
|
||||
dcache map[string]bool
|
||||
}
|
||||
|
||||
const (
|
||||
maxResultCacheSize = 512
|
||||
maxPermCacheSize = 32
|
||||
pruneSize = 16
|
||||
maxResultCacheSize = 512
|
||||
maxDenyPermCacheSize = 256
|
||||
maxPermCacheSize = 128
|
||||
pruneSize = 32
|
||||
)
|
||||
|
||||
// Used in readloop to cache hot subject lookups and group statistics.
|
||||
@@ -376,6 +388,7 @@ func (c *client) RegisterUser(user *User) {
|
||||
if user.Permissions == nil {
|
||||
// Reset perms to nil in case client previously had them.
|
||||
c.perms = nil
|
||||
c.mperms = nil
|
||||
return
|
||||
}
|
||||
|
||||
@@ -398,6 +411,7 @@ func (c *client) RegisterNkeyUser(user *NkeyUser) {
|
||||
if user.Permissions == nil {
|
||||
// Reset perms to nil in case client previously had them.
|
||||
c.perms = nil
|
||||
c.mperms = nil
|
||||
return
|
||||
}
|
||||
|
||||
@@ -442,6 +456,8 @@ func (c *client) setPermissions(perms *Permissions) {
|
||||
}
|
||||
if len(perms.Subscribe.Deny) > 0 {
|
||||
c.perms.sub.deny = NewSublist()
|
||||
// Also hold onto this array for later.
|
||||
c.darray = perms.Subscribe.Deny
|
||||
}
|
||||
for _, subSubject := range perms.Subscribe.Deny {
|
||||
sub := &subscription{subject: []byte(subSubject)}
|
||||
@@ -450,6 +466,16 @@ func (c *client) setPermissions(perms *Permissions) {
|
||||
}
|
||||
}
|
||||
|
||||
// This will load up the deny structure used for filtering delivered
|
||||
// messages based on a deny clause for subscriptions.
|
||||
// Lock should be held.
|
||||
func (c *client) loadMsgDenyFilter() {
|
||||
c.mperms = &msgDeny{NewSublist(), make(map[string]bool)}
|
||||
for _, sub := range c.darray {
|
||||
c.mperms.deny.Insert(&subscription{subject: []byte(sub)})
|
||||
}
|
||||
}
|
||||
|
||||
// writeLoop is the main socket write functionality.
|
||||
// Runs in its own Go routine.
|
||||
func (c *client) writeLoop() {
|
||||
@@ -1298,11 +1324,11 @@ func (c *client) processSub(argo []byte) (err error) {
|
||||
|
||||
// Check permissions if applicable.
|
||||
if ctype == ROUTER {
|
||||
if !c.canExport(sub.subject) {
|
||||
if !c.canExport(string(sub.subject)) {
|
||||
c.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
} else if !c.canSubscribe(sub.subject) {
|
||||
} else if !c.canSubscribe(string(sub.subject)) {
|
||||
c.mu.Unlock()
|
||||
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
|
||||
c.Errorf("Subscription Violation - User %q, Subject %q, SID %s",
|
||||
@@ -1419,7 +1445,7 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error {
|
||||
|
||||
// canSubscribe determines if the client is authorized to subscribe to the
|
||||
// given subject. Assumes caller is holding lock.
|
||||
func (c *client) canSubscribe(subject []byte) bool {
|
||||
func (c *client) canSubscribe(subject string) bool {
|
||||
if c.perms == nil {
|
||||
return true
|
||||
}
|
||||
@@ -1428,13 +1454,28 @@ func (c *client) canSubscribe(subject []byte) bool {
|
||||
|
||||
// Check allow list. If no allow list that means all are allowed. Deny can overrule.
|
||||
if c.perms.sub.allow != nil {
|
||||
r := c.perms.sub.allow.Match(string(subject))
|
||||
r := c.perms.sub.allow.Match(subject)
|
||||
allowed = len(r.psubs) != 0
|
||||
}
|
||||
// If we have a deny list and we think we are allowed, check that as well.
|
||||
if allowed && c.perms.sub.deny != nil {
|
||||
r := c.perms.sub.deny.Match(string(subject))
|
||||
r := c.perms.sub.deny.Match(subject)
|
||||
allowed = len(r.psubs) == 0
|
||||
|
||||
// We use the actual subscription to signal us to spin up the deny mperms
|
||||
// and cache. We check if the subject is a wildcard that contains any of
|
||||
// the deny clauses.
|
||||
// FIXME(dlc) - We could be smarter and track when these go away and remove.
|
||||
if allowed && c.mperms == nil && subjectHasWildcard(subject) {
|
||||
// Whip through the deny array and check if this wildcard subject is within scope.
|
||||
for _, sub := range c.darray {
|
||||
tokens := strings.Split(sub, tsep)
|
||||
if isSubsetMatch(tokens, sub) {
|
||||
c.loadMsgDenyFilter()
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return allowed
|
||||
}
|
||||
@@ -1526,6 +1567,25 @@ func (c *client) processUnsub(arg []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkDenySub will check if we are allowed to deliver this message in the
|
||||
// presence of deny clauses for subscriptions. Deny clauses will not prevent
|
||||
// larger scoped wildcard subscriptions, so we need to check at delivery time.
|
||||
// Lock should be held.
|
||||
func (c *client) checkDenySub(subject string) bool {
|
||||
if denied, ok := c.mperms.dcache[subject]; ok {
|
||||
return denied
|
||||
} else if r := c.mperms.deny.Match(subject); len(r.psubs) != 0 {
|
||||
c.mperms.dcache[subject] = true
|
||||
return true
|
||||
} else {
|
||||
c.mperms.dcache[subject] = false
|
||||
}
|
||||
if len(c.mperms.dcache) > maxDenyPermCacheSize {
|
||||
c.pruneDenyCache()
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *client) msgHeader(mh []byte, sub *subscription, reply []byte) []byte {
|
||||
if len(sub.sid) > 0 {
|
||||
mh = append(mh, sub.sid...)
|
||||
@@ -1556,6 +1616,13 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if we have a subscribe deny clause. This will trigger us to check the subject
|
||||
// for a match against the denied subjects.
|
||||
if client.mperms != nil && client.checkDenySub(string(c.pa.subject)) {
|
||||
client.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
srv := client.srv
|
||||
|
||||
sub.nm++
|
||||
@@ -1637,7 +1704,21 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// pruneCache will prune the cache via randomly
|
||||
// pruneDenyCache will prune the deny cache via randomly
|
||||
// deleting items. Doing so pruneSize items at a time.
|
||||
// Lock must be held for this one since it is shared under
|
||||
// deliverMsg.
|
||||
func (c *client) pruneDenyCache() {
|
||||
r := 0
|
||||
for subject := range c.mperms.dcache {
|
||||
delete(c.mperms.dcache, subject)
|
||||
if r++; r > pruneSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// prunePubPermsCache will prune the cache via randomly
|
||||
// deleting items. Doing so pruneSize items at a time.
|
||||
func (c *client) prunePubPermsCache() {
|
||||
r := 0
|
||||
@@ -1650,18 +1731,18 @@ func (c *client) prunePubPermsCache() {
|
||||
}
|
||||
|
||||
// pubAllowed checks on publish permissioning.
|
||||
func (c *client) pubAllowed(subject []byte) bool {
|
||||
if c.perms == nil {
|
||||
func (c *client) pubAllowed(subject string) bool {
|
||||
if c.perms == nil || (c.perms.pub.allow == nil && c.perms.pub.deny == nil) {
|
||||
return true
|
||||
}
|
||||
// Check if published subject is allowed if we have permissions in place.
|
||||
allowed, ok := c.perms.pcache[string(subject)]
|
||||
allowed, ok := c.perms.pcache[subject]
|
||||
if ok {
|
||||
return allowed
|
||||
}
|
||||
// Cache miss, check allow then deny as needed.
|
||||
if c.perms.pub.allow != nil {
|
||||
r := c.perms.pub.allow.Match(string(subject))
|
||||
r := c.perms.pub.allow.Match(subject)
|
||||
allowed = len(r.psubs) != 0
|
||||
} else {
|
||||
// No entries means all are allowed. Deny will overrule as needed.
|
||||
@@ -1669,7 +1750,7 @@ func (c *client) pubAllowed(subject []byte) bool {
|
||||
}
|
||||
// If we have a deny list and are currently allowed, check that as well.
|
||||
if allowed && c.perms.pub.deny != nil {
|
||||
r := c.perms.pub.deny.Match(string(subject))
|
||||
r := c.perms.pub.deny.Match(subject)
|
||||
allowed = len(r.psubs) == 0
|
||||
}
|
||||
// Update our cache here.
|
||||
@@ -1733,7 +1814,7 @@ func (c *client) processInboundClientMsg(msg []byte) {
|
||||
}
|
||||
|
||||
// Check pub permissions
|
||||
if c.perms != nil && !c.pubAllowed(c.pa.subject) {
|
||||
if c.perms != nil && (c.perms.pub.allow != nil || c.perms.pub.deny != nil) && !c.pubAllowed(string(c.pa.subject)) {
|
||||
c.pubPermissionViolation(c.pa.subject)
|
||||
return
|
||||
}
|
||||
@@ -2162,33 +2243,36 @@ func (c *client) processSubsOnConfigReload(awcsti map[string]struct{}) {
|
||||
checkAcc = false
|
||||
}
|
||||
}
|
||||
// We will clear any mperms we have here. It will rebuild on the fly with canSubscribe,
|
||||
// so we do that here as we collect them. We will check result down below.
|
||||
c.mperms = nil
|
||||
// Collect client's subs under the lock
|
||||
for _, sub := range c.subs {
|
||||
subs = append(subs, sub)
|
||||
// Just checking to rebuild mperms under the lock, will collect removed though here.
|
||||
// Only collect under subs array of canSubscribe and checkAcc true.
|
||||
if !c.canSubscribe(string(sub.subject)) {
|
||||
removed = append(removed, sub)
|
||||
} else if checkAcc {
|
||||
subs = append(subs, sub)
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
// We can call canSubscribe() without locking since the permissions are updated
|
||||
// from config reload code prior to calling this function. So there is no risk
|
||||
// of concurrent access to c.perms.
|
||||
// This list is all subs who are allowed and we need to check accounts.
|
||||
for _, sub := range subs {
|
||||
if checkPerms && !c.canSubscribe(sub.subject) {
|
||||
removed = append(removed, sub)
|
||||
c.unsubscribe(acc, sub, true)
|
||||
} else if checkAcc {
|
||||
c.mu.Lock()
|
||||
oldShadows := sub.shadow
|
||||
sub.shadow = nil
|
||||
c.mu.Unlock()
|
||||
c.addShadowSubscriptions(acc, sub)
|
||||
for _, nsub := range oldShadows {
|
||||
nsub.im.acc.sl.Remove(nsub)
|
||||
}
|
||||
c.mu.Lock()
|
||||
oldShadows := sub.shadow
|
||||
sub.shadow = nil
|
||||
c.mu.Unlock()
|
||||
c.addShadowSubscriptions(acc, sub)
|
||||
for _, nsub := range oldShadows {
|
||||
nsub.im.acc.sl.Remove(nsub)
|
||||
}
|
||||
}
|
||||
|
||||
// Report back to client and logs.
|
||||
// Unsubscribe all that need to be removed and report back to client and logs.
|
||||
for _, sub := range removed {
|
||||
c.unsubscribe(acc, sub, true)
|
||||
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %q)",
|
||||
sub.subject, sub.sid))
|
||||
srv.Noticef("Removed sub %q (sid %q) for user %q - not authorized",
|
||||
|
||||
@@ -20,7 +20,10 @@ authorization {
|
||||
# Setup a default user that can subscribe to anything, but has
|
||||
# no publish capabilities.
|
||||
default_user = {
|
||||
subscribe = "PUBLIC.>"
|
||||
subscribe = {
|
||||
allow: ["PUBLIC.>", "foo.*"]
|
||||
deny: "foo.bar"
|
||||
}
|
||||
}
|
||||
|
||||
# Default permissions if none presented. e.g. susan below.
|
||||
|
||||
@@ -20,7 +20,10 @@ authorization {
|
||||
# Setup a default user that can subscribe to anything, but has
|
||||
# no publish capabilities.
|
||||
default_user = {
|
||||
subscribe = "PUBLIC.>"
|
||||
subscribe = {
|
||||
allow: ["PUBLIC.>", "foo.*"]
|
||||
deny: ["PUBLIC.foo"]
|
||||
}
|
||||
}
|
||||
|
||||
# Default permissions if none presented. e.g. susan below.
|
||||
|
||||
@@ -686,6 +686,7 @@ func (s *Server) applyOptions(opts []option) {
|
||||
// unauthorized subscriptions.
|
||||
func (s *Server) reloadAuthorization() {
|
||||
s.mu.Lock()
|
||||
|
||||
s.configureAuthorization()
|
||||
|
||||
// This map will contain the names of accounts that have their streams
|
||||
@@ -870,8 +871,9 @@ func (s *Server) reloadClusterPermissions() {
|
||||
// Go through all local subscriptions
|
||||
for _, sub := range localSubs {
|
||||
// Get all subs that can now be imported
|
||||
couldImportThen := oldPermsTester.canImport(sub.subject)
|
||||
canImportNow := newPermsTester.canImport(sub.subject)
|
||||
subj := string(sub.subject)
|
||||
couldImportThen := oldPermsTester.canImport(subj)
|
||||
canImportNow := newPermsTester.canImport(subj)
|
||||
if canImportNow {
|
||||
// If we could not before, then will need to send a SUB protocol.
|
||||
if !couldImportThen {
|
||||
@@ -896,7 +898,8 @@ func (s *Server) reloadClusterPermissions() {
|
||||
for _, sub := range route.subs {
|
||||
// If we can't export, we need to drop the subscriptions that
|
||||
// we have on behalf of this route.
|
||||
if !route.canExport(sub.subject) {
|
||||
subj := string(sub.subject)
|
||||
if !route.canExport(subj) {
|
||||
delete(route.subs, string(sub.sid))
|
||||
deleteRoutedSubs = append(deleteRoutedSubs, sub)
|
||||
}
|
||||
|
||||
@@ -1071,14 +1071,72 @@ func TestConfigReloadChangePermissions(t *testing.T) {
|
||||
t.Fatalf("Msg is incorrect.\nexpected: %+v\ngot: %+v", []byte("world"), msg.Data)
|
||||
}
|
||||
|
||||
// Susan will subscribe to two subjects, both will succeed but a send to foo.bar should not succeed
|
||||
// however PUBLIC.foo should.
|
||||
sconn, err := nats.Connect(addr, nats.UserInfo("susan", "baz"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
defer sconn.Close()
|
||||
|
||||
asyncErr2 := make(chan error, 1)
|
||||
sconn.SetErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
|
||||
asyncErr2 <- err
|
||||
})
|
||||
|
||||
fooSub, err := sconn.SubscribeSync("foo.*")
|
||||
if err != nil {
|
||||
t.Fatalf("Error subscribing: %v", err)
|
||||
}
|
||||
sconn.Flush()
|
||||
|
||||
// Publishing from bob on foo.bar should not come through.
|
||||
if err := conn.Publish("foo.bar", []byte("hello")); err != nil {
|
||||
t.Fatalf("Error publishing message: %v", err)
|
||||
}
|
||||
conn.Flush()
|
||||
|
||||
_, err = fooSub.NextMsg(100 * time.Millisecond)
|
||||
if err != nats.ErrTimeout {
|
||||
t.Fatalf("Received a message we shouldn't have")
|
||||
}
|
||||
|
||||
pubSub, err := sconn.SubscribeSync("PUBLIC.*")
|
||||
if err != nil {
|
||||
t.Fatalf("Error subscribing: %v", err)
|
||||
}
|
||||
sconn.Flush()
|
||||
|
||||
select {
|
||||
case err := <-asyncErr2:
|
||||
t.Fatalf("Received unexpected error for susan: %v", err)
|
||||
default:
|
||||
}
|
||||
|
||||
// This should work ok with original config.
|
||||
if err := conn.Publish("PUBLIC.foo", []byte("hello monkey")); err != nil {
|
||||
t.Fatalf("Error publishing message: %v", err)
|
||||
}
|
||||
conn.Flush()
|
||||
|
||||
msg, err = pubSub.NextMsg(2 * time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving msg: %v", err)
|
||||
}
|
||||
if string(msg.Data) != "hello monkey" {
|
||||
t.Fatalf("Msg is incorrect.\nexpected: %q\ngot: %q", "hello monkey", msg.Data)
|
||||
}
|
||||
|
||||
///////////////////////////////////////////
|
||||
// Change permissions.
|
||||
///////////////////////////////////////////
|
||||
|
||||
changeCurrentConfigContent(t, config, "./configs/reload/authorization_2.conf")
|
||||
if err := server.Reload(); err != nil {
|
||||
t.Fatalf("Error reloading config: %v", err)
|
||||
}
|
||||
|
||||
// Ensure we receive an error for the subscription that is no longer
|
||||
// authorized.
|
||||
// Ensure we receive an error for the subscription that is no longer authorized.
|
||||
// In this test, since connection is not closed by the server,
|
||||
// the client must receive an -ERR
|
||||
select {
|
||||
@@ -1149,6 +1207,42 @@ func TestConfigReloadChangePermissions(t *testing.T) {
|
||||
t.Fatalf("Received unexpected error: %v", err)
|
||||
default:
|
||||
}
|
||||
|
||||
// Now check susan again.
|
||||
//
|
||||
// This worked ok with original config but should not deliver a message now.
|
||||
if err := conn.Publish("PUBLIC.foo", []byte("hello monkey")); err != nil {
|
||||
t.Fatalf("Error publishing message: %v", err)
|
||||
}
|
||||
conn.Flush()
|
||||
|
||||
_, err = pubSub.NextMsg(100 * time.Millisecond)
|
||||
if err != nats.ErrTimeout {
|
||||
t.Fatalf("Received a message we shouldn't have")
|
||||
}
|
||||
|
||||
// Now check foo.bar, which did not work before but should work now..
|
||||
if err := conn.Publish("foo.bar", []byte("hello?")); err != nil {
|
||||
t.Fatalf("Error publishing message: %v", err)
|
||||
}
|
||||
conn.Flush()
|
||||
|
||||
msg, err = fooSub.NextMsg(2 * time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving msg: %v", err)
|
||||
}
|
||||
if string(msg.Data) != "hello?" {
|
||||
t.Fatalf("Msg is incorrect.\nexpected: %q\ngot: %q", "hello?", msg.Data)
|
||||
}
|
||||
|
||||
// Once last check for no errors.
|
||||
sconn.Flush()
|
||||
|
||||
select {
|
||||
case err := <-asyncErr2:
|
||||
t.Fatalf("Received unexpected error for susan: %v", err)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure Reload returns an error when attempting to change cluster address
|
||||
@@ -2740,7 +2834,9 @@ func TestConfigReloadAccountNKeyUsers(t *testing.T) {
|
||||
|
||||
s.mu.Lock()
|
||||
nkeys := s.nkeys
|
||||
globalAcc := s.gacc
|
||||
s.mu.Unlock()
|
||||
|
||||
if n := len(nkeys); n != 2 {
|
||||
t.Fatalf("NKeys map should have 2 users, got %v", n)
|
||||
}
|
||||
@@ -2755,7 +2851,7 @@ func TestConfigReloadAccountNKeyUsers(t *testing.T) {
|
||||
if ivan == nil {
|
||||
t.Fatal("NKey for user Ivan not found")
|
||||
}
|
||||
if ivan.Account != nil {
|
||||
if ivan.Account != globalAcc {
|
||||
t.Fatalf("Invalid account for user Ivan: %#v", ivan.Account)
|
||||
}
|
||||
if s.LookupAccount("synadia") != nil {
|
||||
|
||||
@@ -505,7 +505,7 @@ func (s *Server) updateRemoteRoutePerms(route *client, info *Info) {
|
||||
s.gacc.sl.localSubs(&localSubs)
|
||||
|
||||
route.sendRouteSubProtos(localSubs, false, func(sub *subscription) bool {
|
||||
subj := sub.subject
|
||||
subj := string(sub.subject)
|
||||
// If the remote can now export but could not before, and this server can import this
|
||||
// subject, then send SUB protocol.
|
||||
if newPermsTester.canExport(subj) && !oldPermsTester.canExport(subj) && route.canImport(subj) {
|
||||
@@ -613,7 +613,7 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) {
|
||||
// canImport is whether or not we will send a SUB for interest to the other side.
|
||||
// This is for ROUTER connections only.
|
||||
// Lock is held on entry.
|
||||
func (c *client) canImport(subject []byte) bool {
|
||||
func (c *client) canImport(subject string) bool {
|
||||
// Use pubAllowed() since this checks Publish permissions which
|
||||
// is what Import maps to.
|
||||
return c.pubAllowed(subject)
|
||||
@@ -622,7 +622,7 @@ func (c *client) canImport(subject []byte) bool {
|
||||
// canExport is whether or not we will accept a SUB from the remote for a given subject.
|
||||
// This is for ROUTER connections only.
|
||||
// Lock is held on entry
|
||||
func (c *client) canExport(subject []byte) bool {
|
||||
func (c *client) canExport(subject string) bool {
|
||||
// Use canSubscribe() since this checks Subscribe permissions which
|
||||
// is what Export maps to.
|
||||
return c.canSubscribe(subject)
|
||||
@@ -635,6 +635,7 @@ func (c *client) setRoutePermissions(perms *RoutePermissions) {
|
||||
// Reset if some were set
|
||||
if perms == nil {
|
||||
c.perms = nil
|
||||
c.mperms = nil
|
||||
return
|
||||
}
|
||||
// Convert route permissions to user permissions.
|
||||
@@ -795,7 +796,7 @@ func (c *client) processRemoteSub(argo []byte) (err error) {
|
||||
}
|
||||
|
||||
// Check permissions if applicable.
|
||||
if !c.canExport(sub.subject) {
|
||||
if !c.canExport(string(sub.subject)) {
|
||||
c.mu.Unlock()
|
||||
c.Debugf("Can not export %q, ignoring remote subscription request", sub.subject)
|
||||
return nil
|
||||
@@ -878,7 +879,7 @@ func (s *Server) sendSubsToRoute(route *client) {
|
||||
a.mu.RUnlock()
|
||||
|
||||
closed = route.sendRouteSubProtos(subs, false, func(sub *subscription) bool {
|
||||
return route.canImport(sub.subject)
|
||||
return route.canImport(string(sub.subject))
|
||||
})
|
||||
|
||||
if closed {
|
||||
@@ -1308,7 +1309,7 @@ func (s *Server) broadcastSubscribe(sub *subscription) {
|
||||
for _, route := range s.routes {
|
||||
route.mu.Lock()
|
||||
route.sendRouteSubProtos(subs, trace, func(sub *subscription) bool {
|
||||
return route.canImport(sub.subject)
|
||||
return route.canImport(string(sub.subject))
|
||||
})
|
||||
route.mu.Unlock()
|
||||
}
|
||||
@@ -1324,7 +1325,7 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) {
|
||||
for _, route := range s.routes {
|
||||
route.mu.Lock()
|
||||
route.sendRouteUnSubProtos(subs, trace, func(sub *subscription) bool {
|
||||
return route.canImport(sub.subject)
|
||||
return route.canImport(string(sub.subject))
|
||||
})
|
||||
route.mu.Unlock()
|
||||
}
|
||||
|
||||
@@ -731,6 +731,11 @@ func visitLevel(l *level, depth int) int {
|
||||
return maxDepth
|
||||
}
|
||||
|
||||
// Determine if a subject has any wildcard tokens.
|
||||
func subjectHasWildcard(subject string) bool {
|
||||
return !subjectIsLiteral(subject)
|
||||
}
|
||||
|
||||
// Determine if the subject has any wildcards. Fast version, does not check for
|
||||
// valid subject. Used in caching layer.
|
||||
func subjectIsLiteral(subject string) bool {
|
||||
|
||||
@@ -400,6 +400,80 @@ func Benchmark___PubEightQueueSub(b *testing.B) {
|
||||
s.Shutdown()
|
||||
}
|
||||
|
||||
func Benchmark__DenyMsgNoWCPubSub(b *testing.B) {
|
||||
s, opts := RunServerWithConfig("./configs/authorization.conf")
|
||||
defer s.Shutdown()
|
||||
|
||||
c := createClientConn(b, opts.Host, opts.Port)
|
||||
defer c.Close()
|
||||
|
||||
expectAuthRequired(b, c)
|
||||
cs := fmt.Sprintf("CONNECT {\"verbose\":false,\"pedantic\":false,\"user\":\"%s\",\"pass\":\"%s\"}\r\n", "bench-deny", DefaultPass)
|
||||
sendProto(b, c, cs)
|
||||
|
||||
sendProto(b, c, "SUB foo 1\r\n")
|
||||
bw := bufio.NewWriterSize(c, defaultSendBufSize)
|
||||
sendOp := []byte(fmt.Sprintf("PUB foo 2\r\nok\r\n"))
|
||||
ch := make(chan bool)
|
||||
expected := len("MSG foo 1 2\r\nok\r\n") * b.N
|
||||
go drainConnection(b, c, ch, expected)
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := bw.Write(sendOp)
|
||||
if err != nil {
|
||||
b.Errorf("Received error on PUB write: %v\n", err)
|
||||
}
|
||||
}
|
||||
err := bw.Flush()
|
||||
if err != nil {
|
||||
b.Errorf("Received error on FLUSH write: %v\n", err)
|
||||
}
|
||||
|
||||
// Wait for connection to be drained
|
||||
<-ch
|
||||
|
||||
// To not count defer cleanup of client and server.
|
||||
b.StopTimer()
|
||||
}
|
||||
|
||||
func Benchmark_DenyMsgYesWCPubSub(b *testing.B) {
|
||||
s, opts := RunServerWithConfig("./configs/authorization.conf")
|
||||
defer s.Shutdown()
|
||||
|
||||
c := createClientConn(b, opts.Host, opts.Port)
|
||||
defer c.Close()
|
||||
|
||||
expectAuthRequired(b, c)
|
||||
cs := fmt.Sprintf("CONNECT {\"verbose\":false,\"pedantic\":false,\"user\":\"%s\",\"pass\":\"%s\"}\r\n", "bench-deny", DefaultPass)
|
||||
sendProto(b, c, cs)
|
||||
|
||||
sendProto(b, c, "SUB * 1\r\n")
|
||||
bw := bufio.NewWriterSize(c, defaultSendBufSize)
|
||||
sendOp := []byte(fmt.Sprintf("PUB foo 2\r\nok\r\n"))
|
||||
ch := make(chan bool)
|
||||
expected := len("MSG foo 1 2\r\nok\r\n") * b.N
|
||||
go drainConnection(b, c, ch, expected)
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := bw.Write(sendOp)
|
||||
if err != nil {
|
||||
b.Errorf("Received error on PUB write: %v\n", err)
|
||||
}
|
||||
}
|
||||
err := bw.Flush()
|
||||
if err != nil {
|
||||
b.Errorf("Received error on FLUSH write: %v\n", err)
|
||||
}
|
||||
|
||||
// Wait for connection to be drained
|
||||
<-ch
|
||||
|
||||
// To not count defer cleanup of client and server.
|
||||
b.StopTimer()
|
||||
}
|
||||
|
||||
func routePubSub(b *testing.B, size int) {
|
||||
b.StopTimer()
|
||||
|
||||
|
||||
@@ -14,5 +14,7 @@ authorization {
|
||||
{user: bench, password: $PASS, permissions: $BENCH}
|
||||
{user: joe, password: $PASS}
|
||||
{user: ns, password: $PASS, permissions: $NEW_STYLE}
|
||||
{user: ns-pub, password: $PASS, permissions: $NS_PUB}
|
||||
{user: bench-deny, password: $PASS, permissions: $BENCH_DENY}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -27,7 +27,6 @@ BENCH = {
|
||||
publish = "a"
|
||||
}
|
||||
|
||||
|
||||
# New Style Permissions
|
||||
|
||||
NEW_STYLE = {
|
||||
@@ -36,7 +35,19 @@ NEW_STYLE = {
|
||||
deny = ["SYS.*", "bar.baz", "foo.*"]
|
||||
}
|
||||
subscribe = {
|
||||
allow = "foo.*"
|
||||
deny = "foo.baz"
|
||||
allow = ["foo.*", "SYS.TEST.>"]
|
||||
deny = ["foo.baz", "SYS.*"]
|
||||
}
|
||||
}
|
||||
|
||||
NS_PUB = {
|
||||
publish = "foo.baz"
|
||||
subscribe = "foo.baz"
|
||||
}
|
||||
|
||||
BENCH_DENY = {
|
||||
subscribe = {
|
||||
allow = ["foo", "*"]
|
||||
deny = "foo.bar"
|
||||
}
|
||||
}
|
||||
@@ -139,4 +139,85 @@ func TestUserAuthorizationProto(t *testing.T) {
|
||||
expectResult(t, c, permErrRe)
|
||||
sendProto(t, c, "SUB foo.baz 1\r\n")
|
||||
expectResult(t, c, permErrRe)
|
||||
|
||||
// Deny clauses for subscriptions need to be able to allow subscriptions
|
||||
// on larger scoped wildcards, but prevent delivery of a message whose
|
||||
// subject matches a deny clause.
|
||||
|
||||
// Clear old stuff
|
||||
c.Close()
|
||||
|
||||
c = createClientConn(t, opts.Host, opts.Port)
|
||||
defer c.Close()
|
||||
expectAuthRequired(t, c)
|
||||
doAuthConnect(t, c, "", "ns", DefaultPass)
|
||||
expectResult(t, c, okRe)
|
||||
|
||||
sendProto(t, c, "SUB foo.* 1\r\n")
|
||||
expectResult(t, c, okRe)
|
||||
|
||||
sendProto(t, c, "SUB foo.* bar 2\r\n")
|
||||
expectResult(t, c, okRe)
|
||||
|
||||
// Now send on foo.baz which should not be received on first client.
|
||||
// Joe is a default user
|
||||
nc := createClientConn(t, opts.Host, opts.Port)
|
||||
defer nc.Close()
|
||||
expectAuthRequired(t, nc)
|
||||
doAuthConnect(t, nc, "", "ns-pub", DefaultPass)
|
||||
expectResult(t, nc, okRe)
|
||||
|
||||
sendProto(t, nc, "PUB foo.baz 2\r\nok\r\n")
|
||||
expectResult(t, nc, okRe)
|
||||
|
||||
// Expect nothing from the wildcard subscription.
|
||||
expectNothing(t, c)
|
||||
|
||||
sendProto(t, c, "PING\r\n")
|
||||
expectResult(t, c, pongRe)
|
||||
|
||||
// Now create a queue sub on our ns-pub user. We want to test that
|
||||
// queue subscribers can be denied and delivery will route around.
|
||||
sendProto(t, nc, "SUB foo.baz bar 2\r\n")
|
||||
expectResult(t, nc, okRe)
|
||||
|
||||
// Make sure we always get the message on our queue subscriber.
|
||||
// Do this several times since we should select the other subscriber
|
||||
// but get permission denied..
|
||||
for i := 0; i < 20; i++ {
|
||||
sendProto(t, nc, "PUB foo.baz 2\r\nok\r\n")
|
||||
buf := expectResult(t, nc, okRe)
|
||||
if msgRe.Match(buf) {
|
||||
continue
|
||||
} else {
|
||||
expectResult(t, nc, msgRe)
|
||||
}
|
||||
}
|
||||
|
||||
// Clear old stuff
|
||||
c.Close()
|
||||
|
||||
c = createClientConn(t, opts.Host, opts.Port)
|
||||
defer c.Close()
|
||||
expectAuthRequired(t, c)
|
||||
doAuthConnect(t, c, "", "ns", DefaultPass)
|
||||
expectResult(t, c, okRe)
|
||||
|
||||
sendProto(t, c, "SUB foo.bar 1\r\n")
|
||||
expectResult(t, c, okRe)
|
||||
|
||||
sendProto(t, c, "SUB foo.bar.baz 2\r\n")
|
||||
expectResult(t, c, errRe)
|
||||
|
||||
sendProto(t, c, "SUB > 3\r\n")
|
||||
expectResult(t, c, errRe)
|
||||
|
||||
sendProto(t, c, "SUB SYS.> 4\r\n")
|
||||
expectResult(t, c, errRe)
|
||||
|
||||
sendProto(t, c, "SUB SYS.TEST.foo 5\r\n")
|
||||
expectResult(t, c, okRe)
|
||||
|
||||
sendProto(t, c, "SUB SYS.bar 5\r\n")
|
||||
expectResult(t, c, errRe)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user