Add route permissions

The `client.perms` struct is left unchanged. We simply map Import
and Export semantics to existing Publish and Subscribe ones.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
Ivan Kozlovic
2018-06-21 17:01:08 -06:00
parent a05b9e1e34
commit bf3bc81722
6 changed files with 316 additions and 44 deletions

View File

@@ -63,6 +63,14 @@ type Permissions struct {
Subscribe []string `json:"subscribe"`
}
// RoutePermissions are similar to user permissions
// but describe what a server can import/export from and to
// another server.
type RoutePermissions struct {
Import []string `json:"import"`
Export []string `json:"export"`
}
// clone performs a deep copy of the Permissions struct, returning a new clone
// with all values copied.
func (p *Permissions) clone() *Permissions {
@@ -184,7 +192,11 @@ func (s *Server) isRouterAuthorized(c *client) bool {
if opts.Cluster.Username != c.opts.Username {
return false
}
return comparePasswords(opts.Cluster.Password, c.opts.Password)
if !comparePasswords(opts.Cluster.Password, c.opts.Password) {
return false
}
c.setRoutePermissions(opts.Cluster.Permissions)
return true
}
// removeUnauthorizedSubs removes any subscriptions the client has that are no

View File

@@ -281,6 +281,12 @@ func (c *client) RegisterUser(user *User) {
c.mu.Lock()
defer c.mu.Unlock()
c.setPermissions(user.Permissions)
}
// Initializes client.perms structure.
// Lock is held on entry.
func (c *client) setPermissions(perms *Permissions) {
// Pre-allocate all to simplify checks later.
c.perms = &permissions{}
c.perms.sub = NewSublist()
@@ -288,13 +294,13 @@ func (c *client) RegisterUser(user *User) {
c.perms.pcache = make(map[string]bool)
// Loop over publish permissions
for _, pubSubject := range user.Permissions.Publish {
for _, pubSubject := range perms.Publish {
sub := &subscription{subject: []byte(pubSubject)}
c.perms.pub.Insert(sub)
}
// Loop over subscribe permissions
for _, subSubject := range user.Permissions.Subscribe {
for _, subSubject := range perms.Subscribe {
sub := &subscription{subject: []byte(subSubject)}
c.perms.sub.Insert(sub)
}
@@ -1071,12 +1077,20 @@ func (c *client) processSub(argo []byte) (err error) {
}
// Check permissions if applicable.
if !c.canSubscribe(sub.subject) {
c.mu.Unlock()
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
c.Errorf("Subscription Violation - User %q, Subject %q, SID %s",
c.opts.Username, sub.subject, sub.sid)
return nil
if c.typ == ROUTER {
if !c.canExport(sub.subject) {
c.mu.Unlock()
c.Debugf("Ignoring subscription from route on %q due to export permissions", sub.subject)
return nil
}
} else {
if !c.canSubscribe(sub.subject) {
c.mu.Unlock()
c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
c.Errorf("Subscription Violation - User %q, Subject %q, SID %s",
c.opts.Username, sub.subject, sub.sid)
return nil
}
}
// We can have two SUB protocols coming from a route due to some
@@ -1304,36 +1318,25 @@ func (c *client) prunePubPermsCache() {
}
// pubAllowed checks on publish permissioning.
func (c *client) pubAllowed() bool {
func (c *client) pubAllowed(subject []byte) bool {
// Disallow publish to _SYS.>, these are reserved for internals.
if len(c.pa.subject) > 4 && string(c.pa.subject[:5]) == "_SYS." {
c.pubPermissionViolation(c.pa.subject)
if len(subject) > 4 && string(subject[:5]) == "_SYS." {
return false
}
if c.perms == nil {
return true
}
// Check if published subject is allowed if we have permissions in place.
allowed, ok := c.perms.pcache[string(c.pa.subject)]
allowed, ok := c.perms.pcache[string(subject)]
if ok {
if !allowed {
c.pubPermissionViolation(c.pa.subject)
}
return allowed
}
// Cache miss
r := c.perms.pub.Match(string(c.pa.subject))
r := c.perms.pub.Match(string(subject))
allowed = len(r.psubs) != 0
if !allowed {
c.pubPermissionViolation(c.pa.subject)
c.perms.pcache[string(c.pa.subject)] = false
} else {
c.perms.pcache[string(c.pa.subject)] = true
}
c.perms.pcache[string(subject)] = allowed
// Prune if needed.
if len(c.perms.pcache) > maxPermCacheSize {
c.prunePubPermsCache()
}
@@ -1364,8 +1367,9 @@ func (c *client) processMsg(msg []byte) {
c.traceMsg(msg)
}
// Check pub permissions
if !c.pubAllowed() {
// Check pub permissions (don't do this for routes)
if c.typ == CLIENT && !c.pubAllowed(c.pa.subject) {
c.pubPermissionViolation(c.pa.subject)
return
}

View File

@@ -33,17 +33,18 @@ import (
// ClusterOpts are options for clusters.
type ClusterOpts struct {
Host string `json:"addr,omitempty"`
Port int `json:"cluster_port,omitempty"`
Username string `json:"-"`
Password string `json:"-"`
AuthTimeout float64 `json:"auth_timeout,omitempty"`
TLSTimeout float64 `json:"-"`
TLSConfig *tls.Config `json:"-"`
ListenStr string `json:"-"`
Advertise string `json:"-"`
NoAdvertise bool `json:"-"`
ConnectRetries int `json:"-"`
Host string `json:"addr,omitempty"`
Port int `json:"cluster_port,omitempty"`
Username string `json:"-"`
Password string `json:"-"`
AuthTimeout float64 `json:"auth_timeout,omitempty"`
Permissions *RoutePermissions `json:"permissions"`
TLSTimeout float64 `json:"-"`
TLSConfig *tls.Config `json:"-"`
ListenStr string `json:"-"`
Advertise string `json:"-"`
NoAdvertise bool `json:"-"`
ConnectRetries int `json:"-"`
}
// Options block for gnatsd server.
@@ -380,6 +381,15 @@ func parseCluster(cm map[string]interface{}, opts *Options) error {
opts.Cluster.Username = auth.user
opts.Cluster.Password = auth.pass
opts.Cluster.AuthTimeout = auth.timeout
if auth.defaultPermissions != nil {
// For routes:
// Import is Publish
// Export is Subscribe
opts.Cluster.Permissions = &RoutePermissions{
Import: auth.defaultPermissions.Publish,
Export: auth.defaultPermissions.Subscribe,
}
}
case "routes":
ra := mv.([]interface{})
opts.Routes = make([]*url.URL, 0, len(ra))
@@ -443,7 +453,7 @@ func parseAuthorization(am map[string]interface{}) (*authorization, error) {
return nil, err
}
auth.users = users
case "default_permission", "default_permissions":
case "default_permission", "default_permissions", "permissions":
pm, ok := mv.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("Expected default permissions to be a map/struct, got %+v", mv)
@@ -515,13 +525,16 @@ func parseUserPermissions(pm map[string]interface{}) (*Permissions, error) {
p := &Permissions{}
for k, v := range pm {
switch strings.ToLower(k) {
case "pub", "publish":
// For routes:
// Import is Publish
// Export is Subscribe
case "pub", "publish", "import":
subjects, err := parseSubjects(v)
if err != nil {
return nil, err
}
p.Publish = subjects
case "sub", "subscribe":
case "sub", "subscribe", "export":
subjects, err := parseSubjects(v)
if err != nil {
return nil, err

View File

@@ -465,6 +465,50 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) {
}
}
// If permissions are set for this cluster, this returns true if the
// given subject has a match in the Import permissions.
// This is for ROUTER connections only.
// Lock is held on entry.
func (c *client) canImport(subject []byte) bool {
// For routes:
// Import is Publish
// Export is Subscribe
// So use pubAllowed() here since we want to check Import
return c.pubAllowed(subject)
}
// If permissions are set for this cluster, this returns true if the
// given subject has a match in the Export permissions.
// This is for ROUTER connections only.
// Lock is held on entry
func (c *client) canExport(subject []byte) bool {
// For routes:
// Import is Publish
// Export is Subscribe
// So use canSubscribe() here since we want to check Export
return c.canSubscribe(subject)
}
// Initialize or reset cluster's permissions.
// This is for ROUTER connections only.
// Client lock is held on entry
func (c *client) setRoutePermissions(perms *RoutePermissions) {
// Reset if some were set
if perms == nil {
c.perms = nil
return
}
// Convert route permissions to user permissions.
// For routes:
// Import is Publish
// Export is Subscribe
p := &Permissions{
Publish: perms.Import,
Subscribe: perms.Export,
}
c.setPermissions(p)
}
// This will send local subscription state to a new route connection.
// FIXME(dlc) - This could be a DOS or perf issue with many clients
// and large subscription space. Plus buffering in place not a good idea.
@@ -476,6 +520,11 @@ func (s *Server) sendLocalSubsToRoute(route *client) {
route.mu.Lock()
for _, sub := range subs {
// Send SUB interest only if subject has a match in import permissions
if !route.canImport(sub.subject) {
s.Debugf("Not sending subscription interest on %q due to import permission", sub.subject)
continue
}
proto := fmt.Sprintf(subProto, sub.subject, sub.queue, routeSid(sub))
route.queueOutbound([]byte(proto))
if route.out.pb > int64(route.out.sz*2) {
@@ -519,6 +568,10 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client {
// Do this before the TLS code, otherwise, in case of failure
// and if route is explicit, it would try to reconnect to 'nil'...
r.url = rURL
// Set permissions associated with the route user (if applicable).
// No lock needed since we are already under client lock.
c.setRoutePermissions(opts.Cluster.Permissions)
}
// Check for TLS
@@ -777,7 +830,7 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
return !exists, sendInfo
}
func (s *Server) broadcastInterestToRoutes(proto string) {
func (s *Server) broadcastInterestToRoutes(sub *subscription, proto string) {
var arg []byte
if atomic.LoadInt32(&s.logging.trace) == 1 {
arg = []byte(proto[:len(proto)-LEN_CR_LF])
@@ -787,6 +840,15 @@ func (s *Server) broadcastInterestToRoutes(proto string) {
for _, route := range s.routes {
// FIXME(dlc) - Make same logic as deliverMsg
route.mu.Lock()
// The permission of this cluster applies to all routes, and each
// route will have the same `perms`, so check with the first route
// and send SUB interest only if subject has a match in import permissions.
// If there is no match, we stop here.
if !route.canImport(sub.subject) {
route.mu.Unlock()
s.Debugf("Not sending sub/unsub interest on %q to route due to import permission", sub.subject)
break
}
route.sendProto(protoAsBytes, true)
route.mu.Unlock()
route.traceOutOp("", arg)
@@ -802,7 +864,7 @@ func (s *Server) broadcastSubscribe(sub *subscription) {
}
rsid := routeSid(sub)
proto := fmt.Sprintf(subProto, sub.subject, sub.queue, rsid)
s.broadcastInterestToRoutes(proto)
s.broadcastInterestToRoutes(sub, proto)
}
// broadcastUnSubscribe will forward a client unsubscribe
@@ -820,7 +882,7 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) {
}
rsid := routeSid(sub)
proto := fmt.Sprintf(unsubProto, rsid)
s.broadcastInterestToRoutes(proto)
s.broadcastInterestToRoutes(sub, proto)
}
func (s *Server) routeAcceptLoop(ch chan struct{}) {

View File

@@ -0,0 +1,25 @@
# Cluster Server A
listen: 127.0.0.1:5222
cluster {
listen: 127.0.0.1:5244
authorization {
user: ruser
password: top_secret
timeout: 0.5
permissions {
import: "foo"
export: ["bar", "baz"]
}
}
# Routes are actively solicited and connected to from this server.
# Other servers can connect to us if they supply the correct credentials
# in their routes definitions from above.
routes = [
nats-route://ruser:top_secret@127.0.0.1:5246
]
}

View File

@@ -26,6 +26,7 @@ import (
"time"
"github.com/nats-io/gnatsd/server"
"github.com/nats-io/go-nats"
)
const clientProtoInfo = 1
@@ -857,3 +858,158 @@ func TestRouteSendAsyncINFOToClients(t *testing.T) {
opts.Cluster.NoAdvertise = true
f(opts)
}
func TestRouteBasicPermissions(t *testing.T) {
srvA, optsA := RunServerWithConfig("./configs/srv_a_perms.conf")
defer srvA.Shutdown()
srvB, optsB := RunServerWithConfig("./configs/srv_b.conf")
defer srvB.Shutdown()
checkClusterFormed(t, srvA, srvB)
// Create a connection to server B
ncb, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", optsB.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncb.Close()
ch := make(chan bool, 1)
cb := func(_ *nats.Msg) {
ch <- true
}
// Subscribe on on "bar" and "baz", which should be accepted by server A
subBbar, err := ncb.Subscribe("bar", cb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
defer subBbar.Unsubscribe()
subBbaz, err := ncb.Subscribe("baz", cb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
defer subBbaz.Unsubscribe()
ncb.Flush()
// Create a connection to server A
nca, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", optsA.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nca.Close()
// Publish on bar and baz, messages should be received.
if err := nca.Publish("bar", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
if err := nca.Publish("baz", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
for i := 0; i < 2; i++ {
select {
case <-ch:
case <-time.After(time.Second):
t.Fatal("Did not get the messages")
}
}
// From B, start a subscription on "foo", which server A should drop since
// it only exports on "bar" and "baz"
subBfoo, err := ncb.Subscribe("foo", cb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
defer subBfoo.Unsubscribe()
ncb.Flush()
// So producing on "foo" from A should not be forwarded to B.
if err := nca.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
select {
case <-ch:
t.Fatal("Message should not have been received")
case <-time.After(100 * time.Millisecond):
}
// Now on A, create a subscription on something that A does not import,
// like "bat".
subAbat, err := nca.Subscribe("bat", cb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
defer subAbat.Unsubscribe()
nca.Flush()
// And from B, send a message on that subject and make sure it is not received.
if err := ncb.Publish("bat", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
select {
case <-ch:
t.Fatal("Message should not have been received")
case <-time.After(100 * time.Millisecond):
}
// Stop subscription on foo from B
subBfoo.Unsubscribe()
ncb.Flush()
// Create subscription on foo from A, this should be forwared to B.
subAfoo, err := nca.Subscribe("foo", cb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
defer subAfoo.Unsubscribe()
// Create another one so that test the import permissions cache
subAfoo2, err := nca.Subscribe("foo", cb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
defer subAfoo2.Unsubscribe()
nca.Flush()
// Send a message from B and check that it is received.
if err := ncb.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
for i := 0; i < 2; i++ {
select {
case <-ch:
case <-time.After(time.Second):
t.Fatal("Did not get the message")
}
}
// Close connection from B, and restart server B too.
// We want to make sure that
ncb.Close()
srvB.Shutdown()
// Restart server B
srvB, optsB = RunServerWithConfig("./configs/srv_b.conf")
defer srvB.Shutdown()
// Connect to B and send on "foo" and make sure we receive
ncb, err = nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", optsB.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncb.Close()
if err := ncb.Publish("foo", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
for i := 0; i < 2; i++ {
select {
case <-ch:
case <-time.After(time.Second):
t.Fatal("Did not get the message")
}
}
// Send on "bat" and make sure that this is not received.
if err := ncb.Publish("bat", []byte("hello")); err != nil {
t.Fatalf("Error on publish: %v", err)
}
select {
case <-ch:
t.Fatal("Message should not have been received")
case <-time.After(100 * time.Millisecond):
}
}