mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Combined canSubscribe and canQueueSubscribe for consistency in specialized deny clause handling.
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -3380,3 +3380,38 @@ func TestAccountLimitsServerConfig(t *testing.T) {
|
||||
_, err = nats.Connect(s.ClientURL(), nats.UserInfo("derek", "foo"))
|
||||
require_Error(t, err)
|
||||
}
|
||||
|
||||
func TestAccountUserSubPermsWithQueueGroups(t *testing.T) {
|
||||
cf := createConfFile(t, []byte(`
|
||||
listen: 127.0.0.1:-1
|
||||
|
||||
authorization {
|
||||
users = [
|
||||
{ user: user, password: "pass",
|
||||
permissions: {
|
||||
publish: "foo.restricted"
|
||||
subscribe: { allow: "foo.>", deny: "foo.restricted" }
|
||||
allow_responses: { max: 1, ttl: 0s }
|
||||
}
|
||||
}
|
||||
]}
|
||||
`))
|
||||
defer removeFile(t, cf)
|
||||
|
||||
s, _ := RunServerWithConfig(cf)
|
||||
defer s.Shutdown()
|
||||
|
||||
nc, err := nats.Connect(s.ClientURL(), nats.UserInfo("user", "pass"))
|
||||
require_NoError(t, err)
|
||||
|
||||
// qsub solo.
|
||||
qsub, err := nc.QueueSubscribeSync("foo.>", "qg")
|
||||
require_NoError(t, err)
|
||||
|
||||
err = nc.Publish("foo.restricted", []byte("RESTRICTED"))
|
||||
require_NoError(t, err)
|
||||
nc.Flush()
|
||||
|
||||
// Expect no msgs.
|
||||
checkSubsPending(t, qsub, 0)
|
||||
}
|
||||
|
||||
@@ -2441,7 +2441,7 @@ func (c *client) processSubEx(subject, queue, bsid []byte, cb msgHandler, noForw
|
||||
// allow = ["foo", "foo v1"] -> can subscribe to 'foo' but can only queue subscribe to 'foo v1'
|
||||
//
|
||||
if sub.queue != nil {
|
||||
if !c.canQueueSubscribe(string(sub.subject), string(sub.queue)) {
|
||||
if !c.canSubscribe(string(sub.subject), string(sub.queue)) {
|
||||
c.mu.Unlock()
|
||||
c.subPermissionViolation(sub)
|
||||
return nil, ErrSubscribePermissionViolation
|
||||
@@ -2691,17 +2691,27 @@ func (c *client) addShadowSub(sub *subscription, ime *ime) (*subscription, error
|
||||
|
||||
// canSubscribe determines if the client is authorized to subscribe to the
|
||||
// given subject. Assumes caller is holding lock.
|
||||
func (c *client) canSubscribe(subject string) bool {
|
||||
func (c *client) canSubscribe(subject string, optQueue ...string) bool {
|
||||
if c.perms == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
allowed := true
|
||||
|
||||
// Optional queue group.
|
||||
var queue string
|
||||
if len(optQueue) > 0 {
|
||||
queue = optQueue[0]
|
||||
}
|
||||
|
||||
// Check allow list. If no allow list that means all are allowed. Deny can overrule.
|
||||
if c.perms.sub.allow != nil {
|
||||
r := c.perms.sub.allow.Match(subject)
|
||||
allowed = len(r.psubs) != 0
|
||||
allowed = len(r.psubs) > 0
|
||||
if queue != _EMPTY_ && len(r.qsubs) > 0 {
|
||||
// If the queue appears in the allow list, then DO allow.
|
||||
allowed = queueMatches(queue, r.qsubs)
|
||||
}
|
||||
// Leafnodes operate slightly differently in that they allow broader scoped subjects.
|
||||
// They will prune based on publish perms before sending to a leafnode client.
|
||||
if !allowed && c.kind == LEAF && subjectHasWildcard(subject) {
|
||||
@@ -2714,6 +2724,11 @@ func (c *client) canSubscribe(subject string) bool {
|
||||
r := c.perms.sub.deny.Match(subject)
|
||||
allowed = len(r.psubs) == 0
|
||||
|
||||
if queue != _EMPTY_ && len(r.qsubs) > 0 {
|
||||
// If the queue appears in the deny list, then DO NOT allow.
|
||||
allowed = !queueMatches(queue, r.qsubs)
|
||||
}
|
||||
|
||||
// We use the actual subscription to signal us to spin up the deny mperms
|
||||
// and cache. We check if the subject is a wildcard that contains any of
|
||||
// the deny clauses.
|
||||
@@ -2749,42 +2764,6 @@ func queueMatches(queue string, qsubs [][]*subscription) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *client) canQueueSubscribe(subject, queue string) bool {
|
||||
if c.perms == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
allowed := true
|
||||
|
||||
if c.perms.sub.allow != nil {
|
||||
r := c.perms.sub.allow.Match(subject)
|
||||
|
||||
// If perms DO NOT have queue name, then psubs will be greater than
|
||||
// zero. If perms DO have queue name, then qsubs will be greater than
|
||||
// zero.
|
||||
allowed = len(r.psubs) > 0
|
||||
if len(r.qsubs) > 0 {
|
||||
// If the queue appears in the allow list, then DO allow.
|
||||
allowed = queueMatches(queue, r.qsubs)
|
||||
}
|
||||
}
|
||||
|
||||
if allowed && c.perms.sub.deny != nil {
|
||||
r := c.perms.sub.deny.Match(subject)
|
||||
|
||||
// If perms DO NOT have queue name, then psubs will be greater than
|
||||
// zero. If perms DO have queue name, then qsubs will be greater than
|
||||
// zero.
|
||||
allowed = len(r.psubs) == 0
|
||||
if len(r.qsubs) > 0 {
|
||||
// If the queue appears in the deny list, then DO NOT allow.
|
||||
allowed = !queueMatches(queue, r.qsubs)
|
||||
}
|
||||
}
|
||||
|
||||
return allowed
|
||||
}
|
||||
|
||||
// Low level unsubscribe for a given client.
|
||||
func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool) {
|
||||
c.mu.Lock()
|
||||
@@ -4684,7 +4663,7 @@ func (c *client) processSubsOnConfigReload(awcsti map[string]struct{}) {
|
||||
// Just checking to rebuild mperms under the lock, will collect removed though here.
|
||||
// Only collect under subs array of canSubscribe and checkAcc true.
|
||||
canSub := c.canSubscribe(string(sub.subject))
|
||||
canQSub := sub.queue != nil && c.canQueueSubscribe(string(sub.subject), string(sub.queue))
|
||||
canQSub := sub.queue != nil && c.canSubscribe(string(sub.subject), string(sub.queue))
|
||||
|
||||
if !canSub && !canQSub {
|
||||
removed = append(removed, sub)
|
||||
|
||||
Reference in New Issue
Block a user