mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 03:38:42 -07:00
Added more tests, e.g. reload
Signed-off-by: Derek Collison <derek@nats.io>
This commit is contained in:
@@ -1466,7 +1466,7 @@ func (c *client) canSubscribe(subject string) bool {
|
||||
// and cache. We check if the subject is a wildcard that contains any of
|
||||
// the deny clauses.
|
||||
// FIXME(dlc) - We could be smarter and track when these go away and remove.
|
||||
if c.mperms == nil && subjectHasWildcard(subject) {
|
||||
if allowed && c.mperms == nil && subjectHasWildcard(subject) {
|
||||
// Whip through the deny array and check if this wildcard subject is within scope.
|
||||
for _, sub := range c.darray {
|
||||
tokens := strings.Split(sub, tsep)
|
||||
@@ -1618,11 +1618,9 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
|
||||
|
||||
// Check if we have a subscribe deny clause. This will trigger us to check the subject
|
||||
// for a match against the denied subjects.
|
||||
if client.mperms != nil {
|
||||
if client.checkDenySub(string(c.pa.subject)) {
|
||||
client.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
if client.mperms != nil && client.checkDenySub(string(c.pa.subject)) {
|
||||
client.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
srv := client.srv
|
||||
@@ -2245,9 +2243,14 @@ func (c *client) processSubsOnConfigReload(awcsti map[string]struct{}) {
|
||||
checkAcc = false
|
||||
}
|
||||
}
|
||||
// We will clear any mperms we have here. It will rebuild on the fly with canSubscribe,
|
||||
// so we do that here as we collect them. We will check result down below.
|
||||
c.mperms = nil
|
||||
// Collect client's subs under the lock
|
||||
for _, sub := range c.subs {
|
||||
subs = append(subs, sub)
|
||||
// Just checking to rebuild mperms under the lock.
|
||||
c.canSubscribe(string(sub.subject))
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
|
||||
@@ -20,7 +20,10 @@ authorization {
|
||||
# Setup a default user that can subscribe to anything, but has
|
||||
# no publish capabilities.
|
||||
default_user = {
|
||||
subscribe = "PUBLIC.>"
|
||||
subscribe = {
|
||||
allow: ["PUBLIC.>", "foo.*"]
|
||||
deny: "foo.bar"
|
||||
}
|
||||
}
|
||||
|
||||
# Default permissions if none presented. e.g. susan below.
|
||||
|
||||
@@ -20,7 +20,10 @@ authorization {
|
||||
# Setup a default user that can subscribe to anything, but has
|
||||
# no publish capabilities.
|
||||
default_user = {
|
||||
subscribe = "PUBLIC.>"
|
||||
subscribe = {
|
||||
allow: ["PUBLIC.>", "foo.*"]
|
||||
deny: ["PUBLIC.foo"]
|
||||
}
|
||||
}
|
||||
|
||||
# Default permissions if none presented. e.g. susan below.
|
||||
|
||||
@@ -681,12 +681,31 @@ func (s *Server) applyOptions(opts []option) {
|
||||
s.Noticef("Reloaded server configuration")
|
||||
}
|
||||
|
||||
// On reload we have clients that may exists that are connected
|
||||
// but are not assigned to an account. Check here and assign to
|
||||
// the global account.
|
||||
// Lock should be held.
|
||||
func (s *Server) assignGlobalAccountToOrphanUsers() {
|
||||
for _, u := range s.users {
|
||||
if u.Account == nil {
|
||||
u.Account = s.gacc
|
||||
}
|
||||
}
|
||||
for _, u := range s.nkeys {
|
||||
if u.Account == nil {
|
||||
u.Account = s.gacc
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// reloadAuthorization reconfigures the server authorization settings,
|
||||
// disconnects any clients who are no longer authorized, and removes any
|
||||
// unauthorized subscriptions.
|
||||
func (s *Server) reloadAuthorization() {
|
||||
s.mu.Lock()
|
||||
|
||||
s.configureAuthorization()
|
||||
s.assignGlobalAccountToOrphanUsers()
|
||||
|
||||
// This map will contain the names of accounts that have their streams
|
||||
// import configuration changed.
|
||||
|
||||
@@ -1071,14 +1071,72 @@ func TestConfigReloadChangePermissions(t *testing.T) {
|
||||
t.Fatalf("Msg is incorrect.\nexpected: %+v\ngot: %+v", []byte("world"), msg.Data)
|
||||
}
|
||||
|
||||
// Susan will subscribe to two subjects, both will succeed but a send to foo.bar should not succeed
|
||||
// however PUBLIC.foo should.
|
||||
sconn, err := nats.Connect(addr, nats.UserInfo("susan", "baz"))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
defer sconn.Close()
|
||||
|
||||
asyncErr2 := make(chan error, 1)
|
||||
sconn.SetErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
|
||||
asyncErr2 <- err
|
||||
})
|
||||
|
||||
fooSub, err := sconn.SubscribeSync("foo.*")
|
||||
if err != nil {
|
||||
t.Fatalf("Error subscribing: %v", err)
|
||||
}
|
||||
sconn.Flush()
|
||||
|
||||
// Publishing from bob on foo.bar should not come through.
|
||||
if err := conn.Publish("foo.bar", []byte("hello")); err != nil {
|
||||
t.Fatalf("Error publishing message: %v", err)
|
||||
}
|
||||
conn.Flush()
|
||||
|
||||
msg, err = fooSub.NextMsg(100 * time.Millisecond)
|
||||
if err != nats.ErrTimeout {
|
||||
t.Fatalf("Received a message we shouldn't have")
|
||||
}
|
||||
|
||||
pubSub, err := sconn.SubscribeSync("PUBLIC.*")
|
||||
if err != nil {
|
||||
t.Fatalf("Error subscribing: %v", err)
|
||||
}
|
||||
sconn.Flush()
|
||||
|
||||
select {
|
||||
case err := <-asyncErr2:
|
||||
t.Fatalf("Received unexpected error for susan: %v", err)
|
||||
default:
|
||||
}
|
||||
|
||||
// This should work ok with original config.
|
||||
if err := conn.Publish("PUBLIC.foo", []byte("hello monkey")); err != nil {
|
||||
t.Fatalf("Error publishing message: %v", err)
|
||||
}
|
||||
conn.Flush()
|
||||
|
||||
msg, err = pubSub.NextMsg(2 * time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving msg: %v", err)
|
||||
}
|
||||
if string(msg.Data) != "hello monkey" {
|
||||
t.Fatalf("Msg is incorrect.\nexpected: %q\ngot: %q", "hello monkey", msg.Data)
|
||||
}
|
||||
|
||||
///////////////////////////////////////////
|
||||
// Change permissions.
|
||||
///////////////////////////////////////////
|
||||
|
||||
changeCurrentConfigContent(t, config, "./configs/reload/authorization_2.conf")
|
||||
if err := server.Reload(); err != nil {
|
||||
t.Fatalf("Error reloading config: %v", err)
|
||||
}
|
||||
|
||||
// Ensure we receive an error for the subscription that is no longer
|
||||
// authorized.
|
||||
// Ensure we receive an error for the subscription that is no longer authorized.
|
||||
// In this test, since connection is not closed by the server,
|
||||
// the client must receive an -ERR
|
||||
select {
|
||||
@@ -1149,6 +1207,42 @@ func TestConfigReloadChangePermissions(t *testing.T) {
|
||||
t.Fatalf("Received unexpected error: %v", err)
|
||||
default:
|
||||
}
|
||||
|
||||
// Now check susan again.
|
||||
//
|
||||
// This worked ok with original config but should not deliver a message now.
|
||||
if err := conn.Publish("PUBLIC.foo", []byte("hello monkey")); err != nil {
|
||||
t.Fatalf("Error publishing message: %v", err)
|
||||
}
|
||||
conn.Flush()
|
||||
|
||||
msg, err = pubSub.NextMsg(100 * time.Millisecond)
|
||||
if err != nats.ErrTimeout {
|
||||
t.Fatalf("Received a message we shouldn't have")
|
||||
}
|
||||
|
||||
// Now check foo.bar, which did not work before but should work now..
|
||||
if err := conn.Publish("foo.bar", []byte("hello?")); err != nil {
|
||||
t.Fatalf("Error publishing message: %v", err)
|
||||
}
|
||||
conn.Flush()
|
||||
|
||||
msg, err = fooSub.NextMsg(2 * time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Error receiving msg: %v", err)
|
||||
}
|
||||
if string(msg.Data) != "hello?" {
|
||||
t.Fatalf("Msg is incorrect.\nexpected: %q\ngot: %q", "hello?", msg.Data)
|
||||
}
|
||||
|
||||
// Once last check for no errors.
|
||||
sconn.Flush()
|
||||
|
||||
select {
|
||||
case err := <-asyncErr2:
|
||||
t.Fatalf("Received unexpected error for susan: %v", err)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure Reload returns an error when attempting to change cluster address
|
||||
@@ -2740,7 +2834,9 @@ func TestConfigReloadAccountNKeyUsers(t *testing.T) {
|
||||
|
||||
s.mu.Lock()
|
||||
nkeys := s.nkeys
|
||||
globalAcc := s.gacc
|
||||
s.mu.Unlock()
|
||||
|
||||
if n := len(nkeys); n != 2 {
|
||||
t.Fatalf("NKeys map should have 2 users, got %v", n)
|
||||
}
|
||||
@@ -2755,7 +2851,7 @@ func TestConfigReloadAccountNKeyUsers(t *testing.T) {
|
||||
if ivan == nil {
|
||||
t.Fatal("NKey for user Ivan not found")
|
||||
}
|
||||
if ivan.Account != nil {
|
||||
if ivan.Account != globalAcc {
|
||||
t.Fatalf("Invalid account for user Ivan: %#v", ivan.Account)
|
||||
}
|
||||
if s.LookupAccount("synadia") != nil {
|
||||
|
||||
@@ -35,8 +35,8 @@ NEW_STYLE = {
|
||||
deny = ["SYS.*", "bar.baz", "foo.*"]
|
||||
}
|
||||
subscribe = {
|
||||
allow = "foo.*"
|
||||
deny = "foo.baz"
|
||||
allow = ["foo.*", "SYS.TEST.>"]
|
||||
deny = ["foo.baz", "SYS.*"]
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -193,4 +193,31 @@ func TestUserAuthorizationProto(t *testing.T) {
|
||||
expectResult(t, nc, msgRe)
|
||||
}
|
||||
}
|
||||
|
||||
// Clear old stuff
|
||||
c.Close()
|
||||
|
||||
c = createClientConn(t, opts.Host, opts.Port)
|
||||
defer c.Close()
|
||||
expectAuthRequired(t, c)
|
||||
doAuthConnect(t, c, "", "ns", DefaultPass)
|
||||
expectResult(t, c, okRe)
|
||||
|
||||
sendProto(t, c, "SUB foo.bar 1\r\n")
|
||||
expectResult(t, c, okRe)
|
||||
|
||||
sendProto(t, c, "SUB foo.bar.baz 2\r\n")
|
||||
expectResult(t, c, errRe)
|
||||
|
||||
sendProto(t, c, "SUB > 3\r\n")
|
||||
expectResult(t, c, errRe)
|
||||
|
||||
sendProto(t, c, "SUB SYS.> 4\r\n")
|
||||
expectResult(t, c, errRe)
|
||||
|
||||
sendProto(t, c, "SUB SYS.TEST.foo 5\r\n")
|
||||
expectResult(t, c, okRe)
|
||||
|
||||
sendProto(t, c, "SUB SYS.bar 5\r\n")
|
||||
expectResult(t, c, errRe)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user