diff --git a/server/auth.go b/server/auth.go index 74ffb5f2..25b5ab7a 100644 --- a/server/auth.go +++ b/server/auth.go @@ -63,6 +63,14 @@ type Permissions struct { Subscribe []string `json:"subscribe"` } +// RoutePermissions are similar to user permissions +// but describe what a server can import/export from and to +// another server. +type RoutePermissions struct { + Import []string `json:"import"` + Export []string `json:"export"` +} + // clone performs a deep copy of the Permissions struct, returning a new clone // with all values copied. func (p *Permissions) clone() *Permissions { @@ -184,7 +192,11 @@ func (s *Server) isRouterAuthorized(c *client) bool { if opts.Cluster.Username != c.opts.Username { return false } - return comparePasswords(opts.Cluster.Password, c.opts.Password) + if !comparePasswords(opts.Cluster.Password, c.opts.Password) { + return false + } + c.setRoutePermissions(opts.Cluster.Permissions) + return true } // removeUnauthorizedSubs removes any subscriptions the client has that are no diff --git a/server/client.go b/server/client.go index cdc34753..50b72fb8 100644 --- a/server/client.go +++ b/server/client.go @@ -281,6 +281,12 @@ func (c *client) RegisterUser(user *User) { c.mu.Lock() defer c.mu.Unlock() + c.setPermissions(user.Permissions) +} + +// Initializes client.perms structure. +// Lock is held on entry. +func (c *client) setPermissions(perms *Permissions) { // Pre-allocate all to simplify checks later. c.perms = &permissions{} c.perms.sub = NewSublist() @@ -288,13 +294,13 @@ func (c *client) RegisterUser(user *User) { c.perms.pcache = make(map[string]bool) // Loop over publish permissions - for _, pubSubject := range user.Permissions.Publish { + for _, pubSubject := range perms.Publish { sub := &subscription{subject: []byte(pubSubject)} c.perms.pub.Insert(sub) } // Loop over subscribe permissions - for _, subSubject := range user.Permissions.Subscribe { + for _, subSubject := range perms.Subscribe { sub := &subscription{subject: []byte(subSubject)} c.perms.sub.Insert(sub) } @@ -1071,12 +1077,20 @@ func (c *client) processSub(argo []byte) (err error) { } // Check permissions if applicable. - if !c.canSubscribe(sub.subject) { - c.mu.Unlock() - c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject)) - c.Errorf("Subscription Violation - User %q, Subject %q, SID %s", - c.opts.Username, sub.subject, sub.sid) - return nil + if c.typ == ROUTER { + if !c.canExport(sub.subject) { + c.mu.Unlock() + c.Debugf("Ignoring subscription from route on %q due to export permissions", sub.subject) + return nil + } + } else { + if !c.canSubscribe(sub.subject) { + c.mu.Unlock() + c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject)) + c.Errorf("Subscription Violation - User %q, Subject %q, SID %s", + c.opts.Username, sub.subject, sub.sid) + return nil + } } // We can have two SUB protocols coming from a route due to some @@ -1304,36 +1318,25 @@ func (c *client) prunePubPermsCache() { } // pubAllowed checks on publish permissioning. -func (c *client) pubAllowed() bool { +func (c *client) pubAllowed(subject []byte) bool { // Disallow publish to _SYS.>, these are reserved for internals. - if len(c.pa.subject) > 4 && string(c.pa.subject[:5]) == "_SYS." { - c.pubPermissionViolation(c.pa.subject) + if len(subject) > 4 && string(subject[:5]) == "_SYS." { return false } - if c.perms == nil { return true } // Check if published subject is allowed if we have permissions in place. - allowed, ok := c.perms.pcache[string(c.pa.subject)] + allowed, ok := c.perms.pcache[string(subject)] if ok { - if !allowed { - c.pubPermissionViolation(c.pa.subject) - } return allowed } // Cache miss - r := c.perms.pub.Match(string(c.pa.subject)) + r := c.perms.pub.Match(string(subject)) allowed = len(r.psubs) != 0 - if !allowed { - c.pubPermissionViolation(c.pa.subject) - c.perms.pcache[string(c.pa.subject)] = false - } else { - c.perms.pcache[string(c.pa.subject)] = true - } + c.perms.pcache[string(subject)] = allowed // Prune if needed. - if len(c.perms.pcache) > maxPermCacheSize { c.prunePubPermsCache() } @@ -1364,8 +1367,9 @@ func (c *client) processMsg(msg []byte) { c.traceMsg(msg) } - // Check pub permissions - if !c.pubAllowed() { + // Check pub permissions (don't do this for routes) + if c.typ == CLIENT && !c.pubAllowed(c.pa.subject) { + c.pubPermissionViolation(c.pa.subject) return } diff --git a/server/opts.go b/server/opts.go index 095d71ca..f485d712 100644 --- a/server/opts.go +++ b/server/opts.go @@ -33,17 +33,18 @@ import ( // ClusterOpts are options for clusters. type ClusterOpts struct { - Host string `json:"addr,omitempty"` - Port int `json:"cluster_port,omitempty"` - Username string `json:"-"` - Password string `json:"-"` - AuthTimeout float64 `json:"auth_timeout,omitempty"` - TLSTimeout float64 `json:"-"` - TLSConfig *tls.Config `json:"-"` - ListenStr string `json:"-"` - Advertise string `json:"-"` - NoAdvertise bool `json:"-"` - ConnectRetries int `json:"-"` + Host string `json:"addr,omitempty"` + Port int `json:"cluster_port,omitempty"` + Username string `json:"-"` + Password string `json:"-"` + AuthTimeout float64 `json:"auth_timeout,omitempty"` + Permissions *RoutePermissions `json:"permissions"` + TLSTimeout float64 `json:"-"` + TLSConfig *tls.Config `json:"-"` + ListenStr string `json:"-"` + Advertise string `json:"-"` + NoAdvertise bool `json:"-"` + ConnectRetries int `json:"-"` } // Options block for gnatsd server. @@ -380,6 +381,15 @@ func parseCluster(cm map[string]interface{}, opts *Options) error { opts.Cluster.Username = auth.user opts.Cluster.Password = auth.pass opts.Cluster.AuthTimeout = auth.timeout + if auth.defaultPermissions != nil { + // For routes: + // Import is Publish + // Export is Subscribe + opts.Cluster.Permissions = &RoutePermissions{ + Import: auth.defaultPermissions.Publish, + Export: auth.defaultPermissions.Subscribe, + } + } case "routes": ra := mv.([]interface{}) opts.Routes = make([]*url.URL, 0, len(ra)) @@ -443,7 +453,7 @@ func parseAuthorization(am map[string]interface{}) (*authorization, error) { return nil, err } auth.users = users - case "default_permission", "default_permissions": + case "default_permission", "default_permissions", "permissions": pm, ok := mv.(map[string]interface{}) if !ok { return nil, fmt.Errorf("Expected default permissions to be a map/struct, got %+v", mv) @@ -515,13 +525,16 @@ func parseUserPermissions(pm map[string]interface{}) (*Permissions, error) { p := &Permissions{} for k, v := range pm { switch strings.ToLower(k) { - case "pub", "publish": + // For routes: + // Import is Publish + // Export is Subscribe + case "pub", "publish", "import": subjects, err := parseSubjects(v) if err != nil { return nil, err } p.Publish = subjects - case "sub", "subscribe": + case "sub", "subscribe", "export": subjects, err := parseSubjects(v) if err != nil { return nil, err diff --git a/server/route.go b/server/route.go index 710d3047..7ef5c568 100644 --- a/server/route.go +++ b/server/route.go @@ -465,6 +465,50 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) { } } +// If permissions are set for this cluster, this returns true if the +// given subject has a match in the Import permissions. +// This is for ROUTER connections only. +// Lock is held on entry. +func (c *client) canImport(subject []byte) bool { + // For routes: + // Import is Publish + // Export is Subscribe + // So use pubAllowed() here since we want to check Import + return c.pubAllowed(subject) +} + +// If permissions are set for this cluster, this returns true if the +// given subject has a match in the Export permissions. +// This is for ROUTER connections only. +// Lock is held on entry +func (c *client) canExport(subject []byte) bool { + // For routes: + // Import is Publish + // Export is Subscribe + // So use canSubscribe() here since we want to check Export + return c.canSubscribe(subject) +} + +// Initialize or reset cluster's permissions. +// This is for ROUTER connections only. +// Client lock is held on entry +func (c *client) setRoutePermissions(perms *RoutePermissions) { + // Reset if some were set + if perms == nil { + c.perms = nil + return + } + // Convert route permissions to user permissions. + // For routes: + // Import is Publish + // Export is Subscribe + p := &Permissions{ + Publish: perms.Import, + Subscribe: perms.Export, + } + c.setPermissions(p) +} + // This will send local subscription state to a new route connection. // FIXME(dlc) - This could be a DOS or perf issue with many clients // and large subscription space. Plus buffering in place not a good idea. @@ -476,6 +520,11 @@ func (s *Server) sendLocalSubsToRoute(route *client) { route.mu.Lock() for _, sub := range subs { + // Send SUB interest only if subject has a match in import permissions + if !route.canImport(sub.subject) { + s.Debugf("Not sending subscription interest on %q due to import permission", sub.subject) + continue + } proto := fmt.Sprintf(subProto, sub.subject, sub.queue, routeSid(sub)) route.queueOutbound([]byte(proto)) if route.out.pb > int64(route.out.sz*2) { @@ -519,6 +568,10 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL) *client { // Do this before the TLS code, otherwise, in case of failure // and if route is explicit, it would try to reconnect to 'nil'... r.url = rURL + + // Set permissions associated with the route user (if applicable). + // No lock needed since we are already under client lock. + c.setRoutePermissions(opts.Cluster.Permissions) } // Check for TLS @@ -777,7 +830,7 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) { return !exists, sendInfo } -func (s *Server) broadcastInterestToRoutes(proto string) { +func (s *Server) broadcastInterestToRoutes(sub *subscription, proto string) { var arg []byte if atomic.LoadInt32(&s.logging.trace) == 1 { arg = []byte(proto[:len(proto)-LEN_CR_LF]) @@ -787,6 +840,15 @@ func (s *Server) broadcastInterestToRoutes(proto string) { for _, route := range s.routes { // FIXME(dlc) - Make same logic as deliverMsg route.mu.Lock() + // The permission of this cluster applies to all routes, and each + // route will have the same `perms`, so check with the first route + // and send SUB interest only if subject has a match in import permissions. + // If there is no match, we stop here. + if !route.canImport(sub.subject) { + route.mu.Unlock() + s.Debugf("Not sending sub/unsub interest on %q to route due to import permission", sub.subject) + break + } route.sendProto(protoAsBytes, true) route.mu.Unlock() route.traceOutOp("", arg) @@ -802,7 +864,7 @@ func (s *Server) broadcastSubscribe(sub *subscription) { } rsid := routeSid(sub) proto := fmt.Sprintf(subProto, sub.subject, sub.queue, rsid) - s.broadcastInterestToRoutes(proto) + s.broadcastInterestToRoutes(sub, proto) } // broadcastUnSubscribe will forward a client unsubscribe @@ -820,7 +882,7 @@ func (s *Server) broadcastUnSubscribe(sub *subscription) { } rsid := routeSid(sub) proto := fmt.Sprintf(unsubProto, rsid) - s.broadcastInterestToRoutes(proto) + s.broadcastInterestToRoutes(sub, proto) } func (s *Server) routeAcceptLoop(ch chan struct{}) { diff --git a/test/configs/srv_a_perms.conf b/test/configs/srv_a_perms.conf new file mode 100644 index 00000000..736f9bf7 --- /dev/null +++ b/test/configs/srv_a_perms.conf @@ -0,0 +1,25 @@ +# Cluster Server A + +listen: 127.0.0.1:5222 + +cluster { + listen: 127.0.0.1:5244 + + authorization { + user: ruser + password: top_secret + timeout: 0.5 + permissions { + import: "foo" + export: ["bar", "baz"] + } + } + + # Routes are actively solicited and connected to from this server. + # Other servers can connect to us if they supply the correct credentials + # in their routes definitions from above. + + routes = [ + nats-route://ruser:top_secret@127.0.0.1:5246 + ] +} diff --git a/test/routes_test.go b/test/routes_test.go index a3239080..988f4400 100644 --- a/test/routes_test.go +++ b/test/routes_test.go @@ -26,6 +26,7 @@ import ( "time" "github.com/nats-io/gnatsd/server" + "github.com/nats-io/go-nats" ) const clientProtoInfo = 1 @@ -857,3 +858,158 @@ func TestRouteSendAsyncINFOToClients(t *testing.T) { opts.Cluster.NoAdvertise = true f(opts) } + +func TestRouteBasicPermissions(t *testing.T) { + srvA, optsA := RunServerWithConfig("./configs/srv_a_perms.conf") + defer srvA.Shutdown() + + srvB, optsB := RunServerWithConfig("./configs/srv_b.conf") + defer srvB.Shutdown() + + checkClusterFormed(t, srvA, srvB) + + // Create a connection to server B + ncb, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", optsB.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer ncb.Close() + ch := make(chan bool, 1) + cb := func(_ *nats.Msg) { + ch <- true + } + // Subscribe on on "bar" and "baz", which should be accepted by server A + subBbar, err := ncb.Subscribe("bar", cb) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + defer subBbar.Unsubscribe() + subBbaz, err := ncb.Subscribe("baz", cb) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + defer subBbaz.Unsubscribe() + ncb.Flush() + + // Create a connection to server A + nca, err := nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", optsA.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nca.Close() + // Publish on bar and baz, messages should be received. + if err := nca.Publish("bar", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + if err := nca.Publish("baz", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + for i := 0; i < 2; i++ { + select { + case <-ch: + case <-time.After(time.Second): + t.Fatal("Did not get the messages") + } + } + + // From B, start a subscription on "foo", which server A should drop since + // it only exports on "bar" and "baz" + subBfoo, err := ncb.Subscribe("foo", cb) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + defer subBfoo.Unsubscribe() + ncb.Flush() + // So producing on "foo" from A should not be forwarded to B. + if err := nca.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + select { + case <-ch: + t.Fatal("Message should not have been received") + case <-time.After(100 * time.Millisecond): + } + + // Now on A, create a subscription on something that A does not import, + // like "bat". + subAbat, err := nca.Subscribe("bat", cb) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + defer subAbat.Unsubscribe() + nca.Flush() + // And from B, send a message on that subject and make sure it is not received. + if err := ncb.Publish("bat", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + select { + case <-ch: + t.Fatal("Message should not have been received") + case <-time.After(100 * time.Millisecond): + } + + // Stop subscription on foo from B + subBfoo.Unsubscribe() + ncb.Flush() + + // Create subscription on foo from A, this should be forwared to B. + subAfoo, err := nca.Subscribe("foo", cb) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + defer subAfoo.Unsubscribe() + // Create another one so that test the import permissions cache + subAfoo2, err := nca.Subscribe("foo", cb) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + defer subAfoo2.Unsubscribe() + nca.Flush() + // Send a message from B and check that it is received. + if err := ncb.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + for i := 0; i < 2; i++ { + select { + case <-ch: + case <-time.After(time.Second): + t.Fatal("Did not get the message") + } + } + + // Close connection from B, and restart server B too. + // We want to make sure that + ncb.Close() + srvB.Shutdown() + + // Restart server B + srvB, optsB = RunServerWithConfig("./configs/srv_b.conf") + defer srvB.Shutdown() + + // Connect to B and send on "foo" and make sure we receive + ncb, err = nats.Connect(fmt.Sprintf("nats://127.0.0.1:%d", optsB.Port)) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer ncb.Close() + if err := ncb.Publish("foo", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + for i := 0; i < 2; i++ { + select { + case <-ch: + case <-time.After(time.Second): + t.Fatal("Did not get the message") + } + } + + // Send on "bat" and make sure that this is not received. + if err := ncb.Publish("bat", []byte("hello")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + select { + case <-ch: + t.Fatal("Message should not have been received") + case <-time.After(100 * time.Millisecond): + } +}