From 1ce1a434b09942942d901866d5c58d033a08d222 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 5 Nov 2018 19:03:51 -0800 Subject: [PATCH 1/6] Fix for #792 Allow deny clauses for subscriptions to still allow wildcard subscriptions but do not deliver the messages themselves. Signed-off-by: Derek Collison --- .travis.yml | 4 + server/client.go | 157 ++++++++++++++++++++++++-------- server/reload.go | 8 +- server/route.go | 15 +-- server/sublist.go | 5 + test/bench_test.go | 74 +++++++++++++++ test/configs/authorization.conf | 2 + test/configs/auths.conf | 13 ++- test/user_authorization_test.go | 54 +++++++++++ 9 files changed, 284 insertions(+), 48 deletions(-) diff --git a/.travis.yml b/.travis.yml index 78e2368c..e566c5b4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,7 @@ +# Tests are starting to consume enough memory during --race tests to exceed container env on Travis of 4GB. +sudo: required +dist: trusty + language: go go: - 1.9.x diff --git a/server/client.go b/server/client.go index cad86fbf..c3cb9ef8 100644 --- a/server/client.go +++ b/server/client.go @@ -133,27 +133,29 @@ const ( type client struct { // Here first because of use of atomics, and memory alignment. stats - mpay int64 - msubs int - mu sync.Mutex - typ int - cid uint64 - opts clientOpts - start time.Time - nonce []byte - nc net.Conn - ncs string - out outbound - srv *Server - acc *Account - subs map[string]*subscription - perms *permissions - in readCache - pcd map[*client]struct{} - atmr *time.Timer - ping pinfo - msgb [msgScratchSize]byte - last time.Time + mpay int64 + msubs int + mu sync.Mutex + typ int + cid uint64 + opts clientOpts + start time.Time + nonce []byte + nc net.Conn + ncs string + out outbound + srv *Server + acc *Account + subs map[string]*subscription + perms *permissions + mperms *msgDeny + darray []string + in readCache + pcd map[*client]struct{} + atmr *time.Timer + ping pinfo + msgb [msgScratchSize]byte + last time.Time parseState rtt time.Duration @@ -200,10 +202,20 @@ type permissions struct { pcache map[string]bool } +// msgDeny is used when a user permission for subscriptions has a deny +// clause but a subscription could be made that is of broader scope. +// e.g. deny = "foo", but user subscribes to "*". That subscription should +// succeed but no message sent on foo should be delivered. +type msgDeny struct { + deny *Sublist + dcache map[string]bool +} + const ( - maxResultCacheSize = 512 - maxPermCacheSize = 32 - pruneSize = 16 + maxResultCacheSize = 512 + maxDenyPermCacheSize = 256 + maxPermCacheSize = 128 + pruneSize = 32 ) // Used in readloop to cache hot subject lookups and group statistics. @@ -376,6 +388,7 @@ func (c *client) RegisterUser(user *User) { if user.Permissions == nil { // Reset perms to nil in case client previously had them. c.perms = nil + c.mperms = nil return } @@ -398,6 +411,7 @@ func (c *client) RegisterNkeyUser(user *NkeyUser) { if user.Permissions == nil { // Reset perms to nil in case client previously had them. c.perms = nil + c.mperms = nil return } @@ -442,6 +456,8 @@ func (c *client) setPermissions(perms *Permissions) { } if len(perms.Subscribe.Deny) > 0 { c.perms.sub.deny = NewSublist() + // Also hold onto this array for later. + c.darray = perms.Subscribe.Deny } for _, subSubject := range perms.Subscribe.Deny { sub := &subscription{subject: []byte(subSubject)} @@ -450,6 +466,16 @@ func (c *client) setPermissions(perms *Permissions) { } } +// This will load up the deny structure used for filtering delivered +// messages based on a deny clause for subscriptions. +// Lock should be held. +func (c *client) loadMsgDenyFilter() { + c.mperms = &msgDeny{NewSublist(), make(map[string]bool)} + for _, sub := range c.darray { + c.mperms.deny.Insert(&subscription{subject: []byte(sub)}) + } +} + // writeLoop is the main socket write functionality. // Runs in its own Go routine. func (c *client) writeLoop() { @@ -1298,11 +1324,11 @@ func (c *client) processSub(argo []byte) (err error) { // Check permissions if applicable. if ctype == ROUTER { - if !c.canExport(sub.subject) { + if !c.canExport(string(sub.subject)) { c.mu.Unlock() return nil } - } else if !c.canSubscribe(sub.subject) { + } else if !c.canSubscribe(string(sub.subject)) { c.mu.Unlock() c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject)) c.Errorf("Subscription Violation - User %q, Subject %q, SID %s", @@ -1419,7 +1445,7 @@ func (c *client) addShadowSubscriptions(acc *Account, sub *subscription) error { // canSubscribe determines if the client is authorized to subscribe to the // given subject. Assumes caller is holding lock. -func (c *client) canSubscribe(subject []byte) bool { +func (c *client) canSubscribe(subject string) bool { if c.perms == nil { return true } @@ -1428,13 +1454,28 @@ func (c *client) canSubscribe(subject []byte) bool { // Check allow list. If no allow list that means all are allowed. Deny can overrule. if c.perms.sub.allow != nil { - r := c.perms.sub.allow.Match(string(subject)) + r := c.perms.sub.allow.Match(subject) allowed = len(r.psubs) != 0 } // If we have a deny list and we think we are allowed, check that as well. if allowed && c.perms.sub.deny != nil { - r := c.perms.sub.deny.Match(string(subject)) + r := c.perms.sub.deny.Match(subject) allowed = len(r.psubs) == 0 + + // We use the actual subscription to signal us to spin up the deny mperms + // and cache. We check if the subject is a wildcard that contains any of + // the deny clauses. + // FIXME(dlc) - We could be smarter and track when these go away and remove. + if c.mperms == nil && subjectHasWildcard(subject) { + // Whip through the deny array and check if this wildcard subject is within scope. + for _, sub := range c.darray { + tokens := strings.Split(sub, tsep) + if isSubsetMatch(tokens, sub) { + c.loadMsgDenyFilter() + break + } + } + } } return allowed } @@ -1526,6 +1567,25 @@ func (c *client) processUnsub(arg []byte) error { return nil } +// checkDenySub will check if we are allowed to deliver this message in the +// presence of deny clauses for subscriptions. Deny clauses will not prevent +// larger scoped wildcard subscriptions, so we need to check at delivery time. +// Lock should be held. +func (c *client) checkDenySub(subject string) bool { + if denied, ok := c.mperms.dcache[subject]; ok { + return denied + } else if r := c.mperms.deny.Match(subject); len(r.psubs) != 0 { + c.mperms.dcache[subject] = true + return true + } else { + c.mperms.dcache[subject] = false + } + if len(c.mperms.dcache) > maxDenyPermCacheSize { + c.pruneDenyCache() + } + return false +} + func (c *client) msgHeader(mh []byte, sub *subscription, reply []byte) []byte { if len(sub.sid) > 0 { mh = append(mh, sub.sid...) @@ -1556,6 +1616,15 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { return false } + // Check if we have a subscribe deny clause. This will trigger us to check the subject + // for a match against the denied subjects. + if client.mperms != nil { + if client.checkDenySub(string(c.pa.subject)) { + client.mu.Unlock() + return false + } + } + srv := client.srv sub.nm++ @@ -1637,7 +1706,21 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { return true } -// pruneCache will prune the cache via randomly +// pruneDenyCache will prune the deny cache via randomly +// deleting items. Doing so pruneSize items at a time. +// Lock must be held for this one since it is shared under +// deliverMsg. +func (c *client) pruneDenyCache() { + r := 0 + for subject := range c.mperms.dcache { + delete(c.mperms.dcache, subject) + if r++; r > pruneSize { + break + } + } +} + +// prunePubPermsCache will prune the cache via randomly // deleting items. Doing so pruneSize items at a time. func (c *client) prunePubPermsCache() { r := 0 @@ -1650,18 +1733,18 @@ func (c *client) prunePubPermsCache() { } // pubAllowed checks on publish permissioning. -func (c *client) pubAllowed(subject []byte) bool { - if c.perms == nil { +func (c *client) pubAllowed(subject string) bool { + if c.perms == nil || (c.perms.pub.allow == nil && c.perms.pub.deny == nil) { return true } // Check if published subject is allowed if we have permissions in place. - allowed, ok := c.perms.pcache[string(subject)] + allowed, ok := c.perms.pcache[subject] if ok { return allowed } // Cache miss, check allow then deny as needed. if c.perms.pub.allow != nil { - r := c.perms.pub.allow.Match(string(subject)) + r := c.perms.pub.allow.Match(subject) allowed = len(r.psubs) != 0 } else { // No entries means all are allowed. Deny will overrule as needed. @@ -1669,7 +1752,7 @@ func (c *client) pubAllowed(subject []byte) bool { } // If we have a deny list and are currently allowed, check that as well. if allowed && c.perms.pub.deny != nil { - r := c.perms.pub.deny.Match(string(subject)) + r := c.perms.pub.deny.Match(subject) allowed = len(r.psubs) == 0 } // Update our cache here. @@ -1733,7 +1816,7 @@ func (c *client) processInboundClientMsg(msg []byte) { } // Check pub permissions - if c.perms != nil && !c.pubAllowed(c.pa.subject) { + if c.perms != nil && (c.perms.pub.allow != nil || c.perms.pub.deny != nil) && !c.pubAllowed(string(c.pa.subject)) { c.pubPermissionViolation(c.pa.subject) return } @@ -2172,7 +2255,7 @@ func (c *client) processSubsOnConfigReload(awcsti map[string]struct{}) { // from config reload code prior to calling this function. So there is no risk // of concurrent access to c.perms. for _, sub := range subs { - if checkPerms && !c.canSubscribe(sub.subject) { + if checkPerms && !c.canSubscribe(string(sub.subject)) { removed = append(removed, sub) c.unsubscribe(acc, sub, true) } else if checkAcc { diff --git a/server/reload.go b/server/reload.go index 5ea5332b..fc2be1df 100644 --- a/server/reload.go +++ b/server/reload.go @@ -870,8 +870,9 @@ func (s *Server) reloadClusterPermissions() { // Go through all local subscriptions for _, sub := range localSubs { // Get all subs that can now be imported - couldImportThen := oldPermsTester.canImport(sub.subject) - canImportNow := newPermsTester.canImport(sub.subject) + subj := string(sub.subject) + couldImportThen := oldPermsTester.canImport(subj) + canImportNow := newPermsTester.canImport(subj) if canImportNow { // If we could not before, then will need to send a SUB protocol. if !couldImportThen { @@ -896,7 +897,8 @@ func (s *Server) reloadClusterPermissions() { for _, sub := range route.subs { // If we can't export, we need to drop the subscriptions that // we have on behalf of this route. - if !route.canExport(sub.subject) { + subj := string(sub.subject) + if !route.canExport(subj) { delete(route.subs, string(sub.sid)) deleteRoutedSubs = append(deleteRoutedSubs, sub) } diff --git a/server/route.go b/server/route.go index c37a46e4..426599f0 100644 --- a/server/route.go +++ b/server/route.go @@ -505,7 +505,7 @@ func (s *Server) updateRemoteRoutePerms(route *client, info *Info) { s.gacc.sl.localSubs(&localSubs) route.sendRouteSubProtos(localSubs, false, func(sub *subscription) bool { - subj := sub.subject + subj := string(sub.subject) // If the remote can now export but could not before, and this server can import this // subject, then send SUB protocol. if newPermsTester.canExport(subj) && !oldPermsTester.canExport(subj) && route.canImport(subj) { @@ -613,7 +613,7 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) { // canImport is whether or not we will send a SUB for interest to the other side. // This is for ROUTER connections only. // Lock is held on entry. -func (c *client) canImport(subject []byte) bool { +func (c *client) canImport(subject string) bool { // Use pubAllowed() since this checks Publish permissions which // is what Import maps to. return c.pubAllowed(subject) @@ -622,7 +622,7 @@ func (c *client) canImport(subject []byte) bool { // canExport is whether or not we will accept a SUB from the remote for a given subject. // This is for ROUTER connections only. // Lock is held on entry -func (c *client) canExport(subject []byte) bool { +func (c *client) canExport(subject string) bool { // Use canSubscribe() since this checks Subscribe permissions which // is what Export maps to. return c.canSubscribe(subject) @@ -635,6 +635,7 @@ func (c *client) setRoutePermissions(perms *RoutePermissions) { // Reset if some were set if perms == nil { c.perms = nil + c.mperms = nil return } // Convert route permissions to user permissions. @@ -795,7 +796,7 @@ func (c *client) processRemoteSub(argo []byte) (err error) { } // Check permissions if applicable. - if !c.canExport(sub.subject) { + if !c.canExport(string(sub.subject)) { c.mu.Unlock() c.Debugf("Can not export %q, ignoring remote subscription request", sub.subject) return nil @@ -878,7 +879,7 @@ func (s *Server) sendSubsToRoute(route *client) { a.mu.RUnlock() closed = route.sendRouteSubProtos(subs, false, func(sub *subscription) bool { - return route.canImport(sub.subject) + return route.canImport(string(sub.subject)) }) if closed { @@ -1308,7 +1309,7 @@ func (s *Server) broadcastSubscribe(sub *subscription) { for _, route := range s.routes { route.mu.Lock() route.sendRouteSubProtos(subs, trace, func(sub *subscription) bool { - return route.canImport(sub.subject) + return route.canImport(string(sub.subject)) }) route.mu.Unlock() } @@ -1324,7 +1325,7 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) { for _, route := range s.routes { route.mu.Lock() route.sendRouteUnSubProtos(subs, trace, func(sub *subscription) bool { - return route.canImport(sub.subject) + return route.canImport(string(sub.subject)) }) route.mu.Unlock() } diff --git a/server/sublist.go b/server/sublist.go index c75b9242..86141eaf 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -731,6 +731,11 @@ func visitLevel(l *level, depth int) int { return maxDepth } +// Determine if a subject has any wildcard tokens. +func subjectHasWildcard(subject string) bool { + return !subjectIsLiteral(subject) +} + // Determine if the subject has any wildcards. Fast version, does not check for // valid subject. Used in caching layer. func subjectIsLiteral(subject string) bool { diff --git a/test/bench_test.go b/test/bench_test.go index 4f90b411..f7ce2d28 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -400,6 +400,80 @@ func Benchmark___PubEightQueueSub(b *testing.B) { s.Shutdown() } +func Benchmark__DenyMsgNoWCPubSub(b *testing.B) { + s, opts := RunServerWithConfig("./configs/authorization.conf") + defer s.Shutdown() + + c := createClientConn(b, opts.Host, opts.Port) + defer c.Close() + + expectAuthRequired(b, c) + cs := fmt.Sprintf("CONNECT {\"verbose\":false,\"pedantic\":false,\"user\":\"%s\",\"pass\":\"%s\"}\r\n", "bench-deny", DefaultPass) + sendProto(b, c, cs) + + sendProto(b, c, "SUB foo 1\r\n") + bw := bufio.NewWriterSize(c, defaultSendBufSize) + sendOp := []byte(fmt.Sprintf("PUB foo 2\r\nok\r\n")) + ch := make(chan bool) + expected := len("MSG foo 1 2\r\nok\r\n") * b.N + go drainConnection(b, c, ch, expected) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, err := bw.Write(sendOp) + if err != nil { + b.Errorf("Received error on PUB write: %v\n", err) + } + } + err := bw.Flush() + if err != nil { + b.Errorf("Received error on FLUSH write: %v\n", err) + } + + // Wait for connection to be drained + <-ch + + // To not count defer cleanup of client and server. + b.StopTimer() +} + +func Benchmark_DenyMsgYesWCPubSub(b *testing.B) { + s, opts := RunServerWithConfig("./configs/authorization.conf") + defer s.Shutdown() + + c := createClientConn(b, opts.Host, opts.Port) + defer c.Close() + + expectAuthRequired(b, c) + cs := fmt.Sprintf("CONNECT {\"verbose\":false,\"pedantic\":false,\"user\":\"%s\",\"pass\":\"%s\"}\r\n", "bench-deny", DefaultPass) + sendProto(b, c, cs) + + sendProto(b, c, "SUB * 1\r\n") + bw := bufio.NewWriterSize(c, defaultSendBufSize) + sendOp := []byte(fmt.Sprintf("PUB foo 2\r\nok\r\n")) + ch := make(chan bool) + expected := len("MSG foo 1 2\r\nok\r\n") * b.N + go drainConnection(b, c, ch, expected) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, err := bw.Write(sendOp) + if err != nil { + b.Errorf("Received error on PUB write: %v\n", err) + } + } + err := bw.Flush() + if err != nil { + b.Errorf("Received error on FLUSH write: %v\n", err) + } + + // Wait for connection to be drained + <-ch + + // To not count defer cleanup of client and server. + b.StopTimer() +} + func routePubSub(b *testing.B, size int) { b.StopTimer() diff --git a/test/configs/authorization.conf b/test/configs/authorization.conf index 7e3ff463..356abf26 100644 --- a/test/configs/authorization.conf +++ b/test/configs/authorization.conf @@ -14,5 +14,7 @@ authorization { {user: bench, password: $PASS, permissions: $BENCH} {user: joe, password: $PASS} {user: ns, password: $PASS, permissions: $NEW_STYLE} + {user: ns-pub, password: $PASS, permissions: $NS_PUB} + {user: bench-deny, password: $PASS, permissions: $BENCH_DENY} ] } diff --git a/test/configs/auths.conf b/test/configs/auths.conf index cecbaa0c..b1a651c8 100644 --- a/test/configs/auths.conf +++ b/test/configs/auths.conf @@ -27,7 +27,6 @@ BENCH = { publish = "a" } - # New Style Permissions NEW_STYLE = { @@ -40,3 +39,15 @@ NEW_STYLE = { deny = "foo.baz" } } + +NS_PUB = { + publish = "foo.baz" + subscribe = "foo.baz" +} + +BENCH_DENY = { + subscribe = { + allow = ["foo", "*"] + deny = "foo.bar" + } +} \ No newline at end of file diff --git a/test/user_authorization_test.go b/test/user_authorization_test.go index b334e94a..41dde80f 100644 --- a/test/user_authorization_test.go +++ b/test/user_authorization_test.go @@ -139,4 +139,58 @@ func TestUserAuthorizationProto(t *testing.T) { expectResult(t, c, permErrRe) sendProto(t, c, "SUB foo.baz 1\r\n") expectResult(t, c, permErrRe) + + // Deny clauses for subscriptions need to be able to allow subscriptions + // on larger scoped wildcards, but prevent delivery of a message whose + // subject matches a deny clause. + + // Clear old stuff + c.Close() + + c = createClientConn(t, opts.Host, opts.Port) + defer c.Close() + expectAuthRequired(t, c) + doAuthConnect(t, c, "", "ns", DefaultPass) + expectResult(t, c, okRe) + + sendProto(t, c, "SUB foo.* 1\r\n") + expectResult(t, c, okRe) + + sendProto(t, c, "SUB foo.* bar 2\r\n") + expectResult(t, c, okRe) + + // Now send on foo.baz which should not be received on first client. + // Joe is a default user + nc := createClientConn(t, opts.Host, opts.Port) + defer nc.Close() + expectAuthRequired(t, nc) + doAuthConnect(t, nc, "", "ns-pub", DefaultPass) + expectResult(t, nc, okRe) + + sendProto(t, nc, "PUB foo.baz 2\r\nok\r\n") + expectResult(t, nc, okRe) + + // Expect nothing from the wildcard subscription. + expectNothing(t, c) + + sendProto(t, c, "PING\r\n") + expectResult(t, c, pongRe) + + // Now create a queue sub on our ns-pub user. We want to test that + // queue subscribers can be denied and delivery will route around. + sendProto(t, nc, "SUB foo.baz bar 2\r\n") + expectResult(t, nc, okRe) + + // Make sure we always get the message on our queue subscriber. + // Do this several times since we should select the other subscriber + // but get permission denied.. + for i := 0; i < 20; i++ { + sendProto(t, nc, "PUB foo.baz 2\r\nok\r\n") + buf := expectResult(t, nc, okRe) + if msgRe.Match(buf) { + continue + } else { + expectResult(t, nc, msgRe) + } + } } From b2ec5b3a9870d2be00ed9abb7dcab5addc3df4d5 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 6 Nov 2018 19:58:42 -0800 Subject: [PATCH 2/6] Added more tests, e.g. reload Signed-off-by: Derek Collison --- server/client.go | 15 +-- server/configs/reload/authorization_1.conf | 5 +- server/configs/reload/authorization_2.conf | 5 +- server/reload.go | 19 ++++ server/reload_test.go | 102 ++++++++++++++++++++- test/configs/auths.conf | 4 +- test/user_authorization_test.go | 27 ++++++ 7 files changed, 164 insertions(+), 13 deletions(-) diff --git a/server/client.go b/server/client.go index c3cb9ef8..dc3b0f9e 100644 --- a/server/client.go +++ b/server/client.go @@ -1466,7 +1466,7 @@ func (c *client) canSubscribe(subject string) bool { // and cache. We check if the subject is a wildcard that contains any of // the deny clauses. // FIXME(dlc) - We could be smarter and track when these go away and remove. - if c.mperms == nil && subjectHasWildcard(subject) { + if allowed && c.mperms == nil && subjectHasWildcard(subject) { // Whip through the deny array and check if this wildcard subject is within scope. for _, sub := range c.darray { tokens := strings.Split(sub, tsep) @@ -1618,11 +1618,9 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { // Check if we have a subscribe deny clause. This will trigger us to check the subject // for a match against the denied subjects. - if client.mperms != nil { - if client.checkDenySub(string(c.pa.subject)) { - client.mu.Unlock() - return false - } + if client.mperms != nil && client.checkDenySub(string(c.pa.subject)) { + client.mu.Unlock() + return false } srv := client.srv @@ -2245,9 +2243,14 @@ func (c *client) processSubsOnConfigReload(awcsti map[string]struct{}) { checkAcc = false } } + // We will clear any mperms we have here. It will rebuild on the fly with canSubscribe, + // so we do that here as we collect them. We will check result down below. + c.mperms = nil // Collect client's subs under the lock for _, sub := range c.subs { subs = append(subs, sub) + // Just checking to rebuild mperms under the lock. + c.canSubscribe(string(sub.subject)) } c.mu.Unlock() diff --git a/server/configs/reload/authorization_1.conf b/server/configs/reload/authorization_1.conf index b1f6a1f0..f54f011c 100644 --- a/server/configs/reload/authorization_1.conf +++ b/server/configs/reload/authorization_1.conf @@ -20,7 +20,10 @@ authorization { # Setup a default user that can subscribe to anything, but has # no publish capabilities. default_user = { - subscribe = "PUBLIC.>" + subscribe = { + allow: ["PUBLIC.>", "foo.*"] + deny: "foo.bar" + } } # Default permissions if none presented. e.g. susan below. diff --git a/server/configs/reload/authorization_2.conf b/server/configs/reload/authorization_2.conf index 1b33ad8a..018c7d3c 100644 --- a/server/configs/reload/authorization_2.conf +++ b/server/configs/reload/authorization_2.conf @@ -20,7 +20,10 @@ authorization { # Setup a default user that can subscribe to anything, but has # no publish capabilities. default_user = { - subscribe = "PUBLIC.>" + subscribe = { + allow: ["PUBLIC.>", "foo.*"] + deny: ["PUBLIC.foo"] + } } # Default permissions if none presented. e.g. susan below. diff --git a/server/reload.go b/server/reload.go index fc2be1df..7eea4f59 100644 --- a/server/reload.go +++ b/server/reload.go @@ -681,12 +681,31 @@ func (s *Server) applyOptions(opts []option) { s.Noticef("Reloaded server configuration") } +// On reload we have clients that may exists that are connected +// but are not assigned to an account. Check here and assign to +// the global account. +// Lock should be held. +func (s *Server) assignGlobalAccountToOrphanUsers() { + for _, u := range s.users { + if u.Account == nil { + u.Account = s.gacc + } + } + for _, u := range s.nkeys { + if u.Account == nil { + u.Account = s.gacc + } + } +} + // reloadAuthorization reconfigures the server authorization settings, // disconnects any clients who are no longer authorized, and removes any // unauthorized subscriptions. func (s *Server) reloadAuthorization() { s.mu.Lock() + s.configureAuthorization() + s.assignGlobalAccountToOrphanUsers() // This map will contain the names of accounts that have their streams // import configuration changed. diff --git a/server/reload_test.go b/server/reload_test.go index 6e16eaeb..e865a0a7 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -1071,14 +1071,72 @@ func TestConfigReloadChangePermissions(t *testing.T) { t.Fatalf("Msg is incorrect.\nexpected: %+v\ngot: %+v", []byte("world"), msg.Data) } + // Susan will subscribe to two subjects, both will succeed but a send to foo.bar should not succeed + // however PUBLIC.foo should. + sconn, err := nats.Connect(addr, nats.UserInfo("susan", "baz")) + if err != nil { + t.Fatalf("Error creating client: %v", err) + } + defer sconn.Close() + + asyncErr2 := make(chan error, 1) + sconn.SetErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) { + asyncErr2 <- err + }) + + fooSub, err := sconn.SubscribeSync("foo.*") + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + sconn.Flush() + + // Publishing from bob on foo.bar should not come through. + if err := conn.Publish("foo.bar", []byte("hello")); err != nil { + t.Fatalf("Error publishing message: %v", err) + } + conn.Flush() + + msg, err = fooSub.NextMsg(100 * time.Millisecond) + if err != nats.ErrTimeout { + t.Fatalf("Received a message we shouldn't have") + } + + pubSub, err := sconn.SubscribeSync("PUBLIC.*") + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + sconn.Flush() + + select { + case err := <-asyncErr2: + t.Fatalf("Received unexpected error for susan: %v", err) + default: + } + + // This should work ok with original config. + if err := conn.Publish("PUBLIC.foo", []byte("hello monkey")); err != nil { + t.Fatalf("Error publishing message: %v", err) + } + conn.Flush() + + msg, err = pubSub.NextMsg(2 * time.Second) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + if string(msg.Data) != "hello monkey" { + t.Fatalf("Msg is incorrect.\nexpected: %q\ngot: %q", "hello monkey", msg.Data) + } + + /////////////////////////////////////////// // Change permissions. + /////////////////////////////////////////// + changeCurrentConfigContent(t, config, "./configs/reload/authorization_2.conf") if err := server.Reload(); err != nil { t.Fatalf("Error reloading config: %v", err) } - // Ensure we receive an error for the subscription that is no longer - // authorized. + // Ensure we receive an error for the subscription that is no longer authorized. // In this test, since connection is not closed by the server, // the client must receive an -ERR select { @@ -1149,6 +1207,42 @@ func TestConfigReloadChangePermissions(t *testing.T) { t.Fatalf("Received unexpected error: %v", err) default: } + + // Now check susan again. + // + // This worked ok with original config but should not deliver a message now. + if err := conn.Publish("PUBLIC.foo", []byte("hello monkey")); err != nil { + t.Fatalf("Error publishing message: %v", err) + } + conn.Flush() + + msg, err = pubSub.NextMsg(100 * time.Millisecond) + if err != nats.ErrTimeout { + t.Fatalf("Received a message we shouldn't have") + } + + // Now check foo.bar, which did not work before but should work now.. + if err := conn.Publish("foo.bar", []byte("hello?")); err != nil { + t.Fatalf("Error publishing message: %v", err) + } + conn.Flush() + + msg, err = fooSub.NextMsg(2 * time.Second) + if err != nil { + t.Fatalf("Error receiving msg: %v", err) + } + if string(msg.Data) != "hello?" { + t.Fatalf("Msg is incorrect.\nexpected: %q\ngot: %q", "hello?", msg.Data) + } + + // Once last check for no errors. + sconn.Flush() + + select { + case err := <-asyncErr2: + t.Fatalf("Received unexpected error for susan: %v", err) + default: + } } // Ensure Reload returns an error when attempting to change cluster address @@ -2740,7 +2834,9 @@ func TestConfigReloadAccountNKeyUsers(t *testing.T) { s.mu.Lock() nkeys := s.nkeys + globalAcc := s.gacc s.mu.Unlock() + if n := len(nkeys); n != 2 { t.Fatalf("NKeys map should have 2 users, got %v", n) } @@ -2755,7 +2851,7 @@ func TestConfigReloadAccountNKeyUsers(t *testing.T) { if ivan == nil { t.Fatal("NKey for user Ivan not found") } - if ivan.Account != nil { + if ivan.Account != globalAcc { t.Fatalf("Invalid account for user Ivan: %#v", ivan.Account) } if s.LookupAccount("synadia") != nil { diff --git a/test/configs/auths.conf b/test/configs/auths.conf index b1a651c8..545317bf 100644 --- a/test/configs/auths.conf +++ b/test/configs/auths.conf @@ -35,8 +35,8 @@ NEW_STYLE = { deny = ["SYS.*", "bar.baz", "foo.*"] } subscribe = { - allow = "foo.*" - deny = "foo.baz" + allow = ["foo.*", "SYS.TEST.>"] + deny = ["foo.baz", "SYS.*"] } } diff --git a/test/user_authorization_test.go b/test/user_authorization_test.go index 41dde80f..03488ec1 100644 --- a/test/user_authorization_test.go +++ b/test/user_authorization_test.go @@ -193,4 +193,31 @@ func TestUserAuthorizationProto(t *testing.T) { expectResult(t, nc, msgRe) } } + + // Clear old stuff + c.Close() + + c = createClientConn(t, opts.Host, opts.Port) + defer c.Close() + expectAuthRequired(t, c) + doAuthConnect(t, c, "", "ns", DefaultPass) + expectResult(t, c, okRe) + + sendProto(t, c, "SUB foo.bar 1\r\n") + expectResult(t, c, okRe) + + sendProto(t, c, "SUB foo.bar.baz 2\r\n") + expectResult(t, c, errRe) + + sendProto(t, c, "SUB > 3\r\n") + expectResult(t, c, errRe) + + sendProto(t, c, "SUB SYS.> 4\r\n") + expectResult(t, c, errRe) + + sendProto(t, c, "SUB SYS.TEST.foo 5\r\n") + expectResult(t, c, okRe) + + sendProto(t, c, "SUB SYS.bar 5\r\n") + expectResult(t, c, errRe) } From 3dde5b5a930c3939a68ffb269d6e7cdd515b51b0 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Tue, 6 Nov 2018 20:06:18 -0800 Subject: [PATCH 3/6] megacheck fix Signed-off-by: Derek Collison --- server/reload_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/reload_test.go b/server/reload_test.go index e865a0a7..ddb4c5ee 100644 --- a/server/reload_test.go +++ b/server/reload_test.go @@ -1096,7 +1096,7 @@ func TestConfigReloadChangePermissions(t *testing.T) { } conn.Flush() - msg, err = fooSub.NextMsg(100 * time.Millisecond) + _, err = fooSub.NextMsg(100 * time.Millisecond) if err != nats.ErrTimeout { t.Fatalf("Received a message we shouldn't have") } @@ -1216,7 +1216,7 @@ func TestConfigReloadChangePermissions(t *testing.T) { } conn.Flush() - msg, err = pubSub.NextMsg(100 * time.Millisecond) + _, err = pubSub.NextMsg(100 * time.Millisecond) if err != nats.ErrTimeout { t.Fatalf("Received a message we shouldn't have") } From a442f6cde4d6d5c2bb32b59727196bc93f2ab80f Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 7 Nov 2018 09:25:32 -0800 Subject: [PATCH 4/6] Collect remove subs on first check Signed-off-by: Derek Collison --- server/client.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/client.go b/server/client.go index dc3b0f9e..3dcdae00 100644 --- a/server/client.go +++ b/server/client.go @@ -2249,8 +2249,10 @@ func (c *client) processSubsOnConfigReload(awcsti map[string]struct{}) { // Collect client's subs under the lock for _, sub := range c.subs { subs = append(subs, sub) - // Just checking to rebuild mperms under the lock. - c.canSubscribe(string(sub.subject)) + // Just checking to rebuild mperms under the lock, will collect removed though here. + if !c.canSubscribe(string(sub.subject)) { + removed = append(removed, sub) + } } c.mu.Unlock() @@ -2258,10 +2260,7 @@ func (c *client) processSubsOnConfigReload(awcsti map[string]struct{}) { // from config reload code prior to calling this function. So there is no risk // of concurrent access to c.perms. for _, sub := range subs { - if checkPerms && !c.canSubscribe(string(sub.subject)) { - removed = append(removed, sub) - c.unsubscribe(acc, sub, true) - } else if checkAcc { + if checkAcc { c.mu.Lock() oldShadows := sub.shadow sub.shadow = nil @@ -2275,6 +2274,7 @@ func (c *client) processSubsOnConfigReload(awcsti map[string]struct{}) { // Report back to client and logs. for _, sub := range removed { + c.unsubscribe(acc, sub, true) c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %q)", sub.subject, sub.sid)) srv.Noticef("Removed sub %q (sid %q) for user %q - not authorized", From 5077025801cbaa1e7ca504e98387b2b8fd76e69c Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 7 Nov 2018 09:52:29 -0800 Subject: [PATCH 5/6] Make assiging global account consistent Signed-off-by: Derek Collison --- server/auth.go | 17 +++++++++++++++++ server/reload.go | 18 ------------------ 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/server/auth.go b/server/auth.go index fbbe2184..44464d24 100644 --- a/server/auth.go +++ b/server/auth.go @@ -151,6 +151,22 @@ func (s *Server) checkAuthforWarnings() { } } +// If opts.Users or opts.Nkeys have definitions without an account +// defined assign them to the default global account. +// Lock should be held. +func (s *Server) assignGlobalAccountToOrphanUsers() { + for _, u := range s.users { + if u.Account == nil { + u.Account = s.gacc + } + } + for _, u := range s.nkeys { + if u.Account == nil { + u.Account = s.gacc + } + } +} + // configureAuthorization will do any setup needed for authorization. // Lock is assumed held. func (s *Server) configureAuthorization() { @@ -179,6 +195,7 @@ func (s *Server) configureAuthorization() { s.users[u.Username] = u } } + s.assignGlobalAccountToOrphanUsers() s.info.AuthRequired = true } else if opts.Username != "" || opts.Authorization != "" { s.info.AuthRequired = true diff --git a/server/reload.go b/server/reload.go index 7eea4f59..8b57ca66 100644 --- a/server/reload.go +++ b/server/reload.go @@ -681,23 +681,6 @@ func (s *Server) applyOptions(opts []option) { s.Noticef("Reloaded server configuration") } -// On reload we have clients that may exists that are connected -// but are not assigned to an account. Check here and assign to -// the global account. -// Lock should be held. -func (s *Server) assignGlobalAccountToOrphanUsers() { - for _, u := range s.users { - if u.Account == nil { - u.Account = s.gacc - } - } - for _, u := range s.nkeys { - if u.Account == nil { - u.Account = s.gacc - } - } -} - // reloadAuthorization reconfigures the server authorization settings, // disconnects any clients who are no longer authorized, and removes any // unauthorized subscriptions. @@ -705,7 +688,6 @@ func (s *Server) reloadAuthorization() { s.mu.Lock() s.configureAuthorization() - s.assignGlobalAccountToOrphanUsers() // This map will contain the names of accounts that have their streams // import configuration changed. From 42b5e664c621754b9739d60ea2dad8cc7e653b59 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 7 Nov 2018 09:57:26 -0800 Subject: [PATCH 6/6] Cleanup reload logic on subs Signed-off-by: Derek Collison --- server/client.go | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/server/client.go b/server/client.go index 3dcdae00..f35ffa4b 100644 --- a/server/client.go +++ b/server/client.go @@ -2248,31 +2248,29 @@ func (c *client) processSubsOnConfigReload(awcsti map[string]struct{}) { c.mperms = nil // Collect client's subs under the lock for _, sub := range c.subs { - subs = append(subs, sub) // Just checking to rebuild mperms under the lock, will collect removed though here. + // Only collect under subs array of canSubscribe and checkAcc true. if !c.canSubscribe(string(sub.subject)) { removed = append(removed, sub) + } else if checkAcc { + subs = append(subs, sub) } } c.mu.Unlock() - // We can call canSubscribe() without locking since the permissions are updated - // from config reload code prior to calling this function. So there is no risk - // of concurrent access to c.perms. + // This list is all subs who are allowed and we need to check accounts. for _, sub := range subs { - if checkAcc { - c.mu.Lock() - oldShadows := sub.shadow - sub.shadow = nil - c.mu.Unlock() - c.addShadowSubscriptions(acc, sub) - for _, nsub := range oldShadows { - nsub.im.acc.sl.Remove(nsub) - } + c.mu.Lock() + oldShadows := sub.shadow + sub.shadow = nil + c.mu.Unlock() + c.addShadowSubscriptions(acc, sub) + for _, nsub := range oldShadows { + nsub.im.acc.sl.Remove(nsub) } } - // Report back to client and logs. + // Unsubscribe all that need to be removed and report back to client and logs. for _, sub := range removed { c.unsubscribe(acc, sub, true) c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q (sid %q)",