mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-02 11:48:43 -07:00
[CHANGED] Cluster permissions moved out of cluster's authorization
It will be possible to set subjects permissions regardless of the presence of an authorization block. Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
23
README.md
23
README.md
@@ -679,10 +679,11 @@ cluster {
|
||||
# bcrypted hash of "top_secret"
|
||||
password: $2a$11$UaoHwUEqHaMwqo6L4kM2buOBnGFnSCWxNXY87hl.kCERqKK8WAXM.
|
||||
timeout: 3
|
||||
permissions {
|
||||
import:["_INBOX.>", "global.>"]
|
||||
export:["_INBOX.>", "global.>", "sensors.>"]
|
||||
}
|
||||
}
|
||||
|
||||
permissions {
|
||||
import:["_INBOX.>", "global.>"]
|
||||
export:["_INBOX.>", "global.>", "sensors.>"]
|
||||
}
|
||||
|
||||
routes = [
|
||||
@@ -702,10 +703,11 @@ cluster {
|
||||
# bcrypted hash of "top_secret"
|
||||
password: $2a$11$UaoHwUEqHaMwqo6L4kM2buOBnGFnSCWxNXY87hl.kCERqKK8WAXM.
|
||||
timeout: 3
|
||||
permissions {
|
||||
import:["_INBOX.>", "global.>", "sensors.>"]
|
||||
export:["_INBOX.>", "global.>"]
|
||||
}
|
||||
}
|
||||
|
||||
permissions {
|
||||
import:["_INBOX.>", "global.>", "sensors.>"]
|
||||
export:["_INBOX.>", "global.>"]
|
||||
}
|
||||
|
||||
routes = [
|
||||
@@ -716,6 +718,11 @@ cluster {
|
||||
|
||||
The example above allows request/reply and messages published to any subject matching `global.>` to be freely propagated throughout the cluster. The cloud server imports and locally delivers messages published to subjects matching `sensors.>`, but won't export messages published to subjects matching `sensors.>`. This enforces a directional flow of sensor data from edge servers to the cloud servers. Also, as new edge servers are added they will not receive sensor data from other edge servers. Importing and exporting subjects in server clustering can provide additional security and optimize use of network resources.
|
||||
|
||||
> Note: When first introduced, the `permissions` block had to be defined in the `authorization` block forcing a cluster user to be defined in order for permissions to work.
|
||||
This has been changed and the `permissions` block is now moved to the top-level `cluster` block, allowing use of subject permissions even without the presence of an `authorization` block.
|
||||
If `permissions` are defined in both `authorization` and top-level `cluster` blocks, the content of `permissions` in the `authorization` block is ignored. It is recommended that the configuration
|
||||
files be updated to move the permissions to the top-level block.
|
||||
|
||||
### TLS
|
||||
|
||||
The server can use modern TLS semantics for client connections, route connections, and the HTTPS monitoring port.
|
||||
|
||||
@@ -217,7 +217,6 @@ func (s *Server) isRouterAuthorized(c *client) bool {
|
||||
if !comparePasswords(opts.Cluster.Password, c.opts.Password) {
|
||||
return false
|
||||
}
|
||||
c.setRoutePermissions(opts.Cluster.Permissions)
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
@@ -773,8 +773,13 @@ func (c *client) processConnect(arg []byte) error {
|
||||
|
||||
// Grab connection name of remote route.
|
||||
if typ == ROUTER && r != nil {
|
||||
var routePerms *RoutePermissions
|
||||
if srv != nil {
|
||||
routePerms = srv.getOpts().Cluster.Permissions
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.route.remoteID = c.opts.Name
|
||||
c.setRoutePermissions(routePerms)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
|
||||
@@ -387,16 +387,9 @@ func parseCluster(cm map[string]interface{}, opts *Options) error {
|
||||
opts.Cluster.Username = auth.user
|
||||
opts.Cluster.Password = auth.pass
|
||||
opts.Cluster.AuthTimeout = auth.timeout
|
||||
if auth.defaultPermissions != nil {
|
||||
// Import is whether or not we will send a SUB for interest to the other side.
|
||||
// Export is whether or not we will accept a SUB from the remote for a given subject.
|
||||
// Both only effect interest registration.
|
||||
// The parsing sets Import into Publish and Export into Subscribe, convert
|
||||
// accordingly.
|
||||
opts.Cluster.Permissions = &RoutePermissions{
|
||||
Import: auth.defaultPermissions.Publish,
|
||||
Export: auth.defaultPermissions.Subscribe,
|
||||
}
|
||||
// Do not set permissions if they were specified in top-level cluster block.
|
||||
if auth.defaultPermissions != nil && opts.Cluster.Permissions == nil {
|
||||
setClusterPermissions(&opts.Cluster, auth.defaultPermissions)
|
||||
}
|
||||
case "routes":
|
||||
ra := mv.([]interface{})
|
||||
@@ -430,11 +423,36 @@ func parseCluster(cm map[string]interface{}, opts *Options) error {
|
||||
opts.Cluster.NoAdvertise = mv.(bool)
|
||||
case "connect_retries":
|
||||
opts.Cluster.ConnectRetries = int(mv.(int64))
|
||||
case "permissions":
|
||||
pm, ok := mv.(map[string]interface{})
|
||||
if !ok {
|
||||
return fmt.Errorf("Expected permissions to be a map/struct, got %+v", mv)
|
||||
}
|
||||
perms, err := parseUserPermissions(pm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// This will possibly override permissions that were define in auth block
|
||||
setClusterPermissions(&opts.Cluster, perms)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sets cluster's permissions based on given pub/sub permissions,
|
||||
// doing the appropriate translation.
|
||||
func setClusterPermissions(opts *ClusterOpts, perms *Permissions) {
|
||||
// Import is whether or not we will send a SUB for interest to the other side.
|
||||
// Export is whether or not we will accept a SUB from the remote for a given subject.
|
||||
// Both only effect interest registration.
|
||||
// The parsing sets Import into Publish and Export into Subscribe, convert
|
||||
// accordingly.
|
||||
opts.Permissions = &RoutePermissions{
|
||||
Import: perms.Publish,
|
||||
Export: perms.Subscribe,
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to parse Authorization configs.
|
||||
func parseAuthorization(am map[string]interface{}) (*authorization, error) {
|
||||
auth := &authorization{}
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/url"
|
||||
"os"
|
||||
@@ -1152,3 +1153,161 @@ func TestConfigureOptions(t *testing.T) {
|
||||
t.Fatal("Expected TLSConfig to be set")
|
||||
}
|
||||
}
|
||||
|
||||
func TestClusterPermissionsConfig(t *testing.T) {
|
||||
template := `
|
||||
cluster {
|
||||
port: 1234
|
||||
%s
|
||||
authorization {
|
||||
user: ivan
|
||||
password: pwd
|
||||
permissions {
|
||||
import {
|
||||
allow: "foo"
|
||||
}
|
||||
export {
|
||||
allow: "bar"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
`
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(template, "")))
|
||||
defer os.Remove(conf)
|
||||
opts, err := ProcessConfigFile(conf)
|
||||
if err != nil {
|
||||
t.Fatalf("Error processing config file: %v", err)
|
||||
}
|
||||
if opts.Cluster.Permissions == nil {
|
||||
t.Fatal("Expected cluster permissions to be set")
|
||||
}
|
||||
if opts.Cluster.Permissions.Import == nil {
|
||||
t.Fatal("Expected cluster import permissions to be set")
|
||||
}
|
||||
if len(opts.Cluster.Permissions.Import.Allow) != 1 || opts.Cluster.Permissions.Import.Allow[0] != "foo" {
|
||||
t.Fatalf("Expected cluster import permissions to have %q, got %v", "foo", opts.Cluster.Permissions.Import.Allow)
|
||||
}
|
||||
if opts.Cluster.Permissions.Export == nil {
|
||||
t.Fatal("Expected cluster export permissions to be set")
|
||||
}
|
||||
if len(opts.Cluster.Permissions.Export.Allow) != 1 || opts.Cluster.Permissions.Export.Allow[0] != "bar" {
|
||||
t.Fatalf("Expected cluster export permissions to have %q, got %v", "bar", opts.Cluster.Permissions.Export.Allow)
|
||||
}
|
||||
|
||||
// Now add permissions in top level cluster and check
|
||||
// that this is the one that is being used.
|
||||
conf = createConfFile(t, []byte(fmt.Sprintf(template, `
|
||||
permissions {
|
||||
import {
|
||||
allow: "baz"
|
||||
}
|
||||
export {
|
||||
allow: "bat"
|
||||
}
|
||||
}
|
||||
`)))
|
||||
defer os.Remove(conf)
|
||||
opts, err = ProcessConfigFile(conf)
|
||||
if err != nil {
|
||||
t.Fatalf("Error processing config file: %v", err)
|
||||
}
|
||||
if opts.Cluster.Permissions == nil {
|
||||
t.Fatal("Expected cluster permissions to be set")
|
||||
}
|
||||
if opts.Cluster.Permissions.Import == nil {
|
||||
t.Fatal("Expected cluster import permissions to be set")
|
||||
}
|
||||
if len(opts.Cluster.Permissions.Import.Allow) != 1 || opts.Cluster.Permissions.Import.Allow[0] != "baz" {
|
||||
t.Fatalf("Expected cluster import permissions to have %q, got %v", "baz", opts.Cluster.Permissions.Import.Allow)
|
||||
}
|
||||
if opts.Cluster.Permissions.Export == nil {
|
||||
t.Fatal("Expected cluster export permissions to be set")
|
||||
}
|
||||
if len(opts.Cluster.Permissions.Export.Allow) != 1 || opts.Cluster.Permissions.Export.Allow[0] != "bat" {
|
||||
t.Fatalf("Expected cluster export permissions to have %q, got %v", "bat", opts.Cluster.Permissions.Export.Allow)
|
||||
}
|
||||
|
||||
// Tests with invalid permissions
|
||||
invalidPerms := []string{
|
||||
`permissions: foo`,
|
||||
`permissions {
|
||||
unknown_field: "foo"
|
||||
}`,
|
||||
`permissions {
|
||||
import: [1, 2, 3]
|
||||
}`,
|
||||
`permissions {
|
||||
import {
|
||||
unknown_field: "foo"
|
||||
}
|
||||
}`,
|
||||
`permissions {
|
||||
import {
|
||||
allow {
|
||||
x: y
|
||||
}
|
||||
}
|
||||
}`,
|
||||
`permissions {
|
||||
import {
|
||||
deny {
|
||||
x: y
|
||||
}
|
||||
}
|
||||
}`,
|
||||
`permissions {
|
||||
export: [1, 2, 3]
|
||||
}`,
|
||||
`permissions {
|
||||
export {
|
||||
unknown_field: "foo"
|
||||
}
|
||||
}`,
|
||||
`permissions {
|
||||
export {
|
||||
allow {
|
||||
x: y
|
||||
}
|
||||
}
|
||||
}`,
|
||||
`permissions {
|
||||
export {
|
||||
deny {
|
||||
x: y
|
||||
}
|
||||
}
|
||||
}`,
|
||||
}
|
||||
for _, perms := range invalidPerms {
|
||||
conf = createConfFile(t, []byte(fmt.Sprintf(`
|
||||
cluster {
|
||||
port: 1234
|
||||
%s
|
||||
}
|
||||
`, perms)))
|
||||
_, err := ProcessConfigFile(conf)
|
||||
os.Remove(conf)
|
||||
if err == nil {
|
||||
t.Fatalf("Expected failure for permissions %s", perms)
|
||||
}
|
||||
}
|
||||
|
||||
for _, perms := range invalidPerms {
|
||||
conf = createConfFile(t, []byte(fmt.Sprintf(`
|
||||
cluster {
|
||||
port: 1234
|
||||
authorization {
|
||||
user: ivan
|
||||
password: pwd
|
||||
%s
|
||||
}
|
||||
}
|
||||
`, perms)))
|
||||
_, err := ProcessConfigFile(conf)
|
||||
os.Remove(conf)
|
||||
if err == nil {
|
||||
t.Fatalf("Expected failure for permissions %s", perms)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -998,3 +998,93 @@ func TestRouteFailedConnRemovedFromTmpMap(t *testing.T) {
|
||||
srvA.Shutdown()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestRoutePermsAppliedOnInboundAndOutboundRoute(t *testing.T) {
|
||||
|
||||
perms := &RoutePermissions{
|
||||
Import: &SubjectPermission{
|
||||
Allow: []string{"imp.foo"},
|
||||
Deny: []string{"imp.bar"},
|
||||
},
|
||||
Export: &SubjectPermission{
|
||||
Allow: []string{"exp.foo"},
|
||||
Deny: []string{"exp.bar"},
|
||||
},
|
||||
}
|
||||
|
||||
optsA, _ := ProcessConfigFile("./configs/seed.conf")
|
||||
optsA.NoLog = true
|
||||
optsA.NoSigs = true
|
||||
optsA.Cluster.Permissions = perms
|
||||
srva := RunServer(optsA)
|
||||
defer srva.Shutdown()
|
||||
|
||||
optsB := DefaultOptions()
|
||||
optsB.Routes = RoutesFromStr(fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, optsA.Cluster.Port))
|
||||
srvb := RunServer(optsB)
|
||||
defer srvb.Shutdown()
|
||||
|
||||
checkClusterFormed(t, srva, srvb)
|
||||
|
||||
// Ensure permission is properly set
|
||||
check := func(t *testing.T, s *Server) {
|
||||
t.Helper()
|
||||
var route *client
|
||||
s.mu.Lock()
|
||||
for _, r := range s.routes {
|
||||
route = r
|
||||
break
|
||||
}
|
||||
s.mu.Unlock()
|
||||
route.mu.Lock()
|
||||
perms := route.perms
|
||||
route.mu.Unlock()
|
||||
if perms == nil {
|
||||
t.Fatal("Expected perms to be set")
|
||||
}
|
||||
if perms.pub.allow == nil || perms.pub.allow.Count() != 1 {
|
||||
t.Fatal("unexpected pub allow perms")
|
||||
}
|
||||
if r := perms.pub.allow.Match("imp.foo"); len(r.psubs) != 1 {
|
||||
t.Fatal("unexpected pub allow match")
|
||||
}
|
||||
if perms.pub.deny == nil || perms.pub.deny.Count() != 1 {
|
||||
t.Fatal("unexpected pub deny perms")
|
||||
}
|
||||
if r := perms.pub.deny.Match("imp.bar"); len(r.psubs) != 1 {
|
||||
t.Fatal("unexpected pub deny match")
|
||||
}
|
||||
if perms.sub.allow == nil || perms.sub.allow.Count() != 1 {
|
||||
t.Fatal("unexpected sub allow perms")
|
||||
}
|
||||
if r := perms.sub.allow.Match("exp.foo"); len(r.psubs) != 1 {
|
||||
t.Fatal("unexpected sub allow match")
|
||||
}
|
||||
if perms.sub.deny == nil || perms.sub.deny.Count() != 1 {
|
||||
t.Fatal("unexpected sub deny perms")
|
||||
}
|
||||
if r := perms.sub.deny.Match("exp.bar"); len(r.psubs) != 1 {
|
||||
t.Fatal("unexpected sub deny match")
|
||||
}
|
||||
}
|
||||
|
||||
// First check when permissions are set on the server accepting the route connection
|
||||
check(t, srva)
|
||||
|
||||
srvb.Shutdown()
|
||||
srva.Shutdown()
|
||||
|
||||
optsA.Cluster.Permissions = nil
|
||||
optsB.Cluster.Permissions = perms
|
||||
|
||||
srva = RunServer(optsA)
|
||||
defer srva.Shutdown()
|
||||
|
||||
srvb = RunServer(optsB)
|
||||
defer srvb.Shutdown()
|
||||
|
||||
checkClusterFormed(t, srva, srvb)
|
||||
|
||||
// Now check for permissions set on server initiating the route connection
|
||||
check(t, srvb)
|
||||
}
|
||||
|
||||
@@ -10,12 +10,13 @@ cluster {
|
||||
user: ruser
|
||||
password: top_secret
|
||||
timeout: 0.5
|
||||
permissions {
|
||||
import: "foo"
|
||||
export: {
|
||||
allow: "*"
|
||||
deny: ["foo", "nats"]
|
||||
}
|
||||
}
|
||||
|
||||
permissions {
|
||||
import: "foo"
|
||||
export: {
|
||||
allow: "*"
|
||||
deny: ["foo", "nats"]
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user