mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 11:24:44 -07:00
Merge pull request #1143 from nats-io/q-sub-perms
Add QueueSubscribe permissions
This commit is contained in:
127
server/client.go
127
server/client.go
@@ -612,6 +612,18 @@ func (c *client) RegisterNkeyUser(user *NkeyUser) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func splitSubjectQueue(sq string) ([]byte, []byte, error) {
|
||||
vals := strings.Fields(strings.TrimSpace(sq))
|
||||
s := []byte(vals[0])
|
||||
var q []byte
|
||||
if len(vals) == 2 {
|
||||
q = []byte(vals[1])
|
||||
} else if len(vals) > 2 {
|
||||
return nil, nil, fmt.Errorf("invalid subject-queue %q", sq)
|
||||
}
|
||||
return s, q, nil
|
||||
}
|
||||
|
||||
// Initializes client.perms structure.
|
||||
// Lock is held on entry.
|
||||
func (c *client) setPermissions(perms *Permissions) {
|
||||
@@ -648,11 +660,17 @@ func (c *client) setPermissions(perms *Permissions) {
|
||||
|
||||
// Loop over subscribe permissions
|
||||
if perms.Subscribe != nil {
|
||||
var err error
|
||||
if len(perms.Subscribe.Allow) > 0 {
|
||||
c.perms.sub.allow = NewSublistWithCache()
|
||||
}
|
||||
for _, subSubject := range perms.Subscribe.Allow {
|
||||
sub := &subscription{subject: []byte(subSubject)}
|
||||
sub := &subscription{}
|
||||
sub.subject, sub.queue, err = splitSubjectQueue(subSubject)
|
||||
if err != nil {
|
||||
c.Errorf("%s", err.Error())
|
||||
continue
|
||||
}
|
||||
c.perms.sub.allow.Insert(sub)
|
||||
}
|
||||
if len(perms.Subscribe.Deny) > 0 {
|
||||
@@ -661,7 +679,12 @@ func (c *client) setPermissions(perms *Permissions) {
|
||||
c.darray = perms.Subscribe.Deny
|
||||
}
|
||||
for _, subSubject := range perms.Subscribe.Deny {
|
||||
sub := &subscription{subject: []byte(subSubject)}
|
||||
sub := &subscription{}
|
||||
sub.subject, sub.queue, err = splitSubjectQueue(subSubject)
|
||||
if err != nil {
|
||||
c.Errorf("%s", err.Error())
|
||||
continue
|
||||
}
|
||||
c.perms.sub.deny.Insert(sub)
|
||||
}
|
||||
}
|
||||
@@ -1744,14 +1767,26 @@ func (c *client) processSub(argo []byte, noForward bool) (*subscription, error)
|
||||
}
|
||||
|
||||
// Check permissions if applicable.
|
||||
if kind == CLIENT && !c.canSubscribe(string(sub.subject)) {
|
||||
c.mu.Unlock()
|
||||
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
|
||||
c.Errorf("Subscription Violation - %s, Subject %q, SID %s",
|
||||
c.getAuthUser(), sub.subject, sub.sid)
|
||||
return nil, nil
|
||||
if kind == CLIENT {
|
||||
// First do a pass whether queue subscription is valid. This does not necessarily
|
||||
// mean that it will not be able to plain subscribe.
|
||||
//
|
||||
// allow = ["foo"] -> can subscribe or queue subscribe to foo using any queue
|
||||
// allow = ["foo v1"] -> can only queue subscribe to 'foo v1', no plain subs allowed.
|
||||
// allow = ["foo", "foo v1"] -> can subscribe to 'foo' but can only queue subscribe to 'foo v1'
|
||||
//
|
||||
if sub.queue != nil {
|
||||
if !c.canQueueSubscribe(string(sub.subject), string(sub.queue)) {
|
||||
c.mu.Unlock()
|
||||
c.subPermissionViolation(sub)
|
||||
return nil, nil
|
||||
}
|
||||
} else if !c.canSubscribe(string(sub.subject)) {
|
||||
c.mu.Unlock()
|
||||
c.subPermissionViolation(sub)
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we have a maximum on the number of subscriptions.
|
||||
if c.subsAtLimit() {
|
||||
c.mu.Unlock()
|
||||
@@ -1966,6 +2001,60 @@ func (c *client) canSubscribe(subject string) bool {
|
||||
return allowed
|
||||
}
|
||||
|
||||
func queueMatches(queue string, qsubs [][]*subscription) bool {
|
||||
if len(qsubs) == 0 {
|
||||
return true
|
||||
}
|
||||
for _, qsub := range qsubs {
|
||||
qs := qsub[0]
|
||||
qname := string(qs.queue)
|
||||
|
||||
// NOTE: '*' and '>' tokens can also be valid
|
||||
// queue names so we first check against the
|
||||
// literal name. e.g. v1.* == v1.*
|
||||
if queue == qname || (subjectHasWildcard(qname) && subjectIsSubsetMatch(queue, qname)) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *client) canQueueSubscribe(subject, queue string) bool {
|
||||
if c.perms == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
allowed := true
|
||||
|
||||
if c.perms.sub.allow != nil {
|
||||
r := c.perms.sub.allow.Match(subject)
|
||||
|
||||
// If perms DO NOT have queue name, then psubs will be greater than
|
||||
// zero. If perms DO have queue name, then qsubs will be greater than
|
||||
// zero.
|
||||
allowed = len(r.psubs) > 0
|
||||
if len(r.qsubs) > 0 {
|
||||
// If the queue appears in the allow list, then DO allow.
|
||||
allowed = queueMatches(queue, r.qsubs)
|
||||
}
|
||||
}
|
||||
|
||||
if allowed && c.perms.sub.deny != nil {
|
||||
r := c.perms.sub.deny.Match(subject)
|
||||
|
||||
// If perms DO NOT have queue name, then psubs will be greater than
|
||||
// zero. If perms DO have queue name, then qsubs will be greater than
|
||||
// zero.
|
||||
allowed = len(r.psubs) == 0
|
||||
if len(r.qsubs) > 0 {
|
||||
// If the queue appears in the deny list, then DO NOT allow.
|
||||
allowed = !queueMatches(queue, r.qsubs)
|
||||
}
|
||||
}
|
||||
|
||||
return allowed
|
||||
}
|
||||
|
||||
// Low level unsubscribe for a given client.
|
||||
func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool) {
|
||||
c.mu.Lock()
|
||||
@@ -2923,6 +3012,21 @@ func (c *client) pubPermissionViolation(subject []byte) {
|
||||
c.Errorf("Publish Violation - %s, Subject %q", c.getAuthUser(), subject)
|
||||
}
|
||||
|
||||
func (c *client) subPermissionViolation(sub *subscription) {
|
||||
errTxt := fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject)
|
||||
logTxt := fmt.Sprintf("Subscription Violation - %s, Subject %q, SID %s",
|
||||
c.getAuthUser(), sub.subject, sub.sid)
|
||||
|
||||
if sub.queue != nil {
|
||||
errTxt = fmt.Sprintf("Permissions Violation for Subscription to %q using queue %q", sub.subject, sub.queue)
|
||||
logTxt = fmt.Sprintf("Subscription Violation - %s, Subject %q, Queue: %q, SID %s",
|
||||
c.getAuthUser(), sub.subject, sub.queue, sub.sid)
|
||||
}
|
||||
|
||||
c.sendErr(errTxt)
|
||||
c.Errorf(logTxt)
|
||||
}
|
||||
|
||||
func (c *client) replySubjectViolation(reply []byte) {
|
||||
c.sendErr(fmt.Sprintf("Permissions Violation for Publish with Reply of %q", reply))
|
||||
c.Errorf("Publish Violation - %s, Reply %q", c.getAuthUser(), reply)
|
||||
@@ -3118,7 +3222,10 @@ func (c *client) processSubsOnConfigReload(awcsti map[string]struct{}) {
|
||||
for _, sub := range c.subs {
|
||||
// Just checking to rebuild mperms under the lock, will collect removed though here.
|
||||
// Only collect under subs array of canSubscribe and checkAcc true.
|
||||
if !c.canSubscribe(string(sub.subject)) {
|
||||
canSub := c.canSubscribe(string(sub.subject))
|
||||
canQSub := sub.queue != nil && c.canQueueSubscribe(string(sub.subject), string(sub.queue))
|
||||
|
||||
if !canSub && !canQSub {
|
||||
removed = append(removed, sub)
|
||||
} else if checkAcc {
|
||||
subs = append(subs, sub)
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"net"
|
||||
"reflect"
|
||||
@@ -485,6 +486,189 @@ func TestClientPubWithQueueSub(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitSubjectQueue(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
sq string
|
||||
wantSubject []byte
|
||||
wantQueue []byte
|
||||
wantErr bool
|
||||
}{
|
||||
{name: "single subject",
|
||||
sq: "foo", wantSubject: []byte("foo"), wantQueue: nil},
|
||||
{name: "subject and queue",
|
||||
sq: "foo bar", wantSubject: []byte("foo"), wantQueue: []byte("bar")},
|
||||
{name: "subject and queue with surrounding spaces",
|
||||
sq: " foo bar ", wantSubject: []byte("foo"), wantQueue: []byte("bar")},
|
||||
{name: "subject and queue with extra spaces in the middle",
|
||||
sq: "foo bar", wantSubject: []byte("foo"), wantQueue: []byte("bar")},
|
||||
{name: "subject, queue, and extra token",
|
||||
sq: "foo bar fizz", wantSubject: []byte(nil), wantQueue: []byte(nil), wantErr: true},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
sub, que, err := splitSubjectQueue(c.sq)
|
||||
if err == nil && c.wantErr {
|
||||
t.Fatal("Expected error, but got nil")
|
||||
}
|
||||
if err != nil && !c.wantErr {
|
||||
t.Fatalf("Expected nil error, but got %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(sub, c.wantSubject) {
|
||||
t.Fatalf("Expected to get subject %#v, but instead got %#v", c.wantSubject, sub)
|
||||
}
|
||||
if !reflect.DeepEqual(que, c.wantQueue) {
|
||||
t.Fatalf("Expected to get queue %#v, but instead got %#v", c.wantQueue, que)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueueSubscribePermissions(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
perms *SubjectPermission
|
||||
subject string
|
||||
queue string
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "plain subscription on foo",
|
||||
perms: &SubjectPermission{Allow: []string{"foo"}},
|
||||
subject: "foo",
|
||||
want: "+OK\r\n",
|
||||
},
|
||||
{
|
||||
name: "queue subscribe with allowed group",
|
||||
perms: &SubjectPermission{Allow: []string{"foo bar"}},
|
||||
subject: "foo",
|
||||
queue: "bar",
|
||||
want: "+OK\r\n",
|
||||
},
|
||||
{
|
||||
name: "queue subscribe with wildcard allowed group",
|
||||
perms: &SubjectPermission{Allow: []string{"foo bar.*"}},
|
||||
subject: "foo",
|
||||
queue: "bar.fizz",
|
||||
want: "+OK\r\n",
|
||||
},
|
||||
{
|
||||
name: "queue subscribe with full wildcard subject and subgroup",
|
||||
perms: &SubjectPermission{Allow: []string{"> bar.>"}},
|
||||
subject: "whizz",
|
||||
queue: "bar.bang",
|
||||
want: "+OK\r\n",
|
||||
},
|
||||
{
|
||||
name: "plain subscribe with full wildcard subject and subgroup",
|
||||
perms: &SubjectPermission{Allow: []string{"> bar.>"}},
|
||||
subject: "whizz",
|
||||
want: "-ERR 'Permissions Violation for Subscription to \"whizz\"'\r\n",
|
||||
},
|
||||
{
|
||||
name: "deny plain subscription on foo",
|
||||
perms: &SubjectPermission{Allow: []string{">"}, Deny: []string{"foo"}},
|
||||
subject: "foo",
|
||||
queue: "bar",
|
||||
want: "-ERR 'Permissions Violation for Subscription to \"foo\" using queue \"bar\"'\r\n",
|
||||
},
|
||||
{
|
||||
name: "allow plain subscription, except foo",
|
||||
perms: &SubjectPermission{Allow: []string{">"}, Deny: []string{"foo"}},
|
||||
subject: "bar",
|
||||
want: "+OK\r\n",
|
||||
},
|
||||
{
|
||||
name: "deny everything",
|
||||
perms: &SubjectPermission{Allow: []string{">"}, Deny: []string{">"}},
|
||||
subject: "foo",
|
||||
queue: "bar",
|
||||
want: "-ERR 'Permissions Violation for Subscription to \"foo\" using queue \"bar\"'\r\n",
|
||||
},
|
||||
{
|
||||
name: "can only subscribe to queues v1",
|
||||
perms: &SubjectPermission{Allow: []string{"> v1.>"}},
|
||||
subject: "foo",
|
||||
queue: "v1.prod",
|
||||
want: "+OK\r\n",
|
||||
},
|
||||
{
|
||||
name: "cannot subscribe to queues, plain subscribe ok",
|
||||
perms: &SubjectPermission{Allow: []string{">"}, Deny: []string{"> >"}},
|
||||
subject: "foo",
|
||||
want: "+OK\r\n",
|
||||
},
|
||||
{
|
||||
name: "cannot subscribe to queues, queue subscribe not ok",
|
||||
perms: &SubjectPermission{Deny: []string{"> >"}},
|
||||
subject: "foo",
|
||||
queue: "bar",
|
||||
want: "-ERR 'Permissions Violation for Subscription to \"foo\" using queue \"bar\"'\r\n",
|
||||
},
|
||||
{
|
||||
name: "deny all queue subscriptions on dev or stg only",
|
||||
perms: &SubjectPermission{Deny: []string{"> *.dev", "> *.stg"}},
|
||||
subject: "foo",
|
||||
queue: "bar",
|
||||
want: "+OK\r\n",
|
||||
},
|
||||
{
|
||||
name: "allow only queue subscription on dev or stg",
|
||||
perms: &SubjectPermission{Allow: []string{"> *.dev", "> *.stg"}},
|
||||
subject: "foo",
|
||||
queue: "bar",
|
||||
want: "-ERR 'Permissions Violation for Subscription to \"foo\" using queue \"bar\"'\r\n",
|
||||
},
|
||||
{
|
||||
name: "deny queue subscriptions with subject foo",
|
||||
perms: &SubjectPermission{Deny: []string{"foo >"}},
|
||||
subject: "foo",
|
||||
queue: "bar",
|
||||
want: "-ERR 'Permissions Violation for Subscription to \"foo\" using queue \"bar\"'\r\n",
|
||||
},
|
||||
{
|
||||
name: "plain sub is allowed, but queue subscribe with queue not in list",
|
||||
perms: &SubjectPermission{Allow: []string{"foo bar"}},
|
||||
subject: "foo",
|
||||
queue: "fizz",
|
||||
want: "-ERR 'Permissions Violation for Subscription to \"foo\" using queue \"fizz\"'\r\n",
|
||||
},
|
||||
{
|
||||
name: "allow plain sub, but do queue subscribe",
|
||||
perms: &SubjectPermission{Allow: []string{"foo"}},
|
||||
subject: "foo",
|
||||
queue: "bar",
|
||||
want: "+OK\r\n",
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
_, client, r := setupClient()
|
||||
|
||||
client.RegisterUser(&User{
|
||||
Permissions: &Permissions{Subscribe: c.perms},
|
||||
})
|
||||
connect := []byte("CONNECT {\"verbose\":true}\r\n")
|
||||
qsub := []byte(fmt.Sprintf("SUB %s %s 1\r\n", c.subject, c.queue))
|
||||
|
||||
go client.parseFlushAndClose(append(connect, qsub...))
|
||||
|
||||
var buf bytes.Buffer
|
||||
if _, err := io.Copy(&buf, r); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Extra OK is from the successful CONNECT.
|
||||
want := "+OK\r\n" + c.want
|
||||
if got := buf.String(); got != want {
|
||||
t.Fatalf("Expected to receive %q, but instead received %q", want, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientPubWithQueueSubNoEcho(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
s := RunServer(opts)
|
||||
|
||||
Reference in New Issue
Block a user