diff --git a/server/auth.go b/server/auth.go index b481e2b8..8f5060dd 100644 --- a/server/auth.go +++ b/server/auth.go @@ -1,4 +1,4 @@ -// Copyright 2012-2018 The NATS Authors +// Copyright 2012-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -18,6 +18,7 @@ import ( "encoding/base64" "net" "strings" + "time" "github.com/nats-io/jwt" "github.com/nats-io/nkeys" @@ -89,11 +90,19 @@ type SubjectPermission struct { Deny []string `json:"deny,omitempty"` } +// ResponsePermission can be used to allow responses to any reply subject +// that is received on a valid subscription. +type ResponsePermission struct { + MaxMsgs int `json:"max"` + Expires time.Duration `json:"ttl"` +} + // Permissions are the allowed subjects on a per // publish or subscribe basis. type Permissions struct { - Publish *SubjectPermission `json:"publish"` - Subscribe *SubjectPermission `json:"subscribe"` + Publish *SubjectPermission `json:"publish"` + Subscribe *SubjectPermission `json:"subscribe"` + Response *ResponsePermission `json:"responses,omitempty"` } // RoutePermissions are similar to user permissions @@ -134,6 +143,12 @@ func (p *Permissions) clone() *Permissions { if p.Subscribe != nil { clone.Subscribe = p.Subscribe.clone() } + if p.Response != nil { + clone.Response = &ResponsePermission{ + MaxMsgs: p.Response.MaxMsgs, + Expires: p.Response.Expires, + } + } return clone } diff --git a/server/client.go b/server/client.go index a575a275..6baae0ab 100644 --- a/server/client.go +++ b/server/client.go @@ -157,33 +157,35 @@ const ( type client struct { // Here first because of use of atomics, and memory alignment. stats - mpay int32 - msubs int32 - mcl int32 - mu sync.Mutex - kind int - cid uint64 - opts clientOpts - start time.Time - nonce []byte - nc net.Conn - ncs string - out outbound - srv *Server - acc *Account - user *NkeyUser - host string - port uint16 - subs map[string]*subscription - perms *permissions - mperms *msgDeny - darray []string - in readCache - pcd map[*client]struct{} - atmr *time.Timer - ping pinfo - msgb [msgScratchSize]byte - last time.Time + mpay int32 + msubs int32 + mcl int32 + mu sync.Mutex + kind int + cid uint64 + opts clientOpts + start time.Time + nonce []byte + nc net.Conn + ncs string + out outbound + srv *Server + acc *Account + user *NkeyUser + host string + port uint16 + subs map[string]*subscription + perms *permissions + replies map[string]*resp + rcheck time.Time + mperms *msgDeny + darray []string + in readCache + pcd map[*client]struct{} + atmr *time.Timer + ping pinfo + msgb [msgScratchSize]byte + last time.Time parseState rtt time.Duration @@ -230,12 +232,21 @@ type perm struct { allow *Sublist deny *Sublist } + type permissions struct { sub perm pub perm + resp *ResponsePermission pcache map[string]bool } +// This is used to dynamically track responses and reply subjects +// for dynamic permissioning. +type resp struct { + t time.Time + n int +} + // msgDeny is used when a user permission for subscriptions has a deny // clause but a subscription could be made that is of broader scope. // e.g. deny = "foo", but user subscribes to "*". That subscription should @@ -259,6 +270,8 @@ const ( maxPermCacheSize = 128 pruneSize = 32 routeTargetInit = 8 + replyPermLimit = 4096 + replyCheckMin = 30 * time.Second ) // Used in readloop to cache hot subject lookups and group statistics. @@ -536,10 +549,9 @@ func (c *client) RegisterUser(user *User) { // Reset perms to nil in case client previously had them. c.perms = nil c.mperms = nil - c.mu.Unlock() - return + } else { + c.setPermissions(user.Permissions) } - c.setPermissions(user.Permissions) c.mu.Unlock() } @@ -580,7 +592,7 @@ func (c *client) setPermissions(perms *Permissions) { // Loop over publish permissions if perms.Publish != nil { - if len(perms.Publish.Allow) > 0 { + if perms.Publish.Allow != nil { c.perms.pub.allow = NewSublistWithCache() } for _, pubSubject := range perms.Publish.Allow { @@ -596,6 +608,14 @@ func (c *client) setPermissions(perms *Permissions) { } } + // Check if we are allowed to send responses. + if perms.Response != nil { + rp := *perms.Response + c.perms.resp = &rp + c.replies = make(map[string]*resp) + c.rcheck = time.Now() + } + // Loop over subscribe permissions if perms.Subscribe != nil { if len(perms.Subscribe.Allow) > 0 { @@ -2131,6 +2151,15 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { client.out.pm++ + // If we are tracking dynamic publish permissions that track reply subjects, + // do that accounting here. We only look at client.replies which will be non-nil. + if client.replies != nil && len(c.pa.reply) > 0 { + client.replies[string(c.pa.reply)] = &resp{time.Now(), 0} + if len(client.replies) > replyPermLimit { + client.pruneReplyPerms() + } + } + // Check outbound threshold and queue IO flush if needed. // This is specifically looking at situations where we are getting behind and may want // to intervene before this producer goes back to top of readloop. We are in the producer's @@ -2156,6 +2185,27 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { return true } +// pruneReplyPerms will remove any stale or expired entries +// in our reply cache. We make sure to not check too often. +func (c *client) pruneReplyPerms() { + // Make sure we do not check too often. + if c.perms.resp == nil || time.Since(c.rcheck) < replyCheckMin { + return + } + + mm := c.perms.resp.MaxMsgs + ttl := c.perms.resp.Expires + c.rcheck = time.Now() + + for k, resp := range c.replies { + if mm > 0 && resp.n >= mm { + delete(c.replies, k) + } else if ttl > 0 && c.rcheck.Sub(resp.t) > ttl { + delete(c.replies, k) + } + } +} + // pruneDenyCache will prune the deny cache via randomly // deleting items. Doing so pruneSize items at a time. // Lock must be held for this one since it is shared under @@ -2183,7 +2233,14 @@ func (c *client) prunePubPermsCache() { } // pubAllowed checks on publish permissioning. +// Lock should not be held. func (c *client) pubAllowed(subject string) bool { + return c.pubAllowedFullCheck(subject, true) +} + +// pubAllowedFullCheck checks on all publish permissioning depending +// on the flag for dynamic reply permissions. +func (c *client) pubAllowedFullCheck(subject string, fullCheck bool) bool { if c.perms == nil || (c.perms.pub.allow == nil && c.perms.pub.deny == nil) { return true } @@ -2205,11 +2262,31 @@ func (c *client) pubAllowed(subject string) bool { r := c.perms.pub.deny.Match(subject) allowed = len(r.psubs) == 0 } - // Update our cache here. - c.perms.pcache[string(subject)] = allowed - // Prune if needed. - if len(c.perms.pcache) > maxPermCacheSize { - c.prunePubPermsCache() + + // If we are currently not allowed but we are tracking reply subjects + // dynamically, check to see if we are allowed here Avoid pcache. + // We need to acquire the lock though. + if !allowed && fullCheck && c.perms.resp != nil { + c.mu.Lock() + if resp := c.replies[subject]; resp != nil { + resp.n++ + // Check if we have sent too many responses. + if c.perms.resp.MaxMsgs > 0 && resp.n > c.perms.resp.MaxMsgs { + delete(c.replies, subject) + } else if c.perms.resp.Expires > 0 && time.Since(resp.t) > c.perms.resp.Expires { + delete(c.replies, subject) + } else { + allowed = true + } + } + c.mu.Unlock() + } else { + // Update our cache here. + c.perms.pcache[string(subject)] = allowed + // Prune if needed. + if len(c.perms.pcache) > maxPermCacheSize { + c.prunePubPermsCache() + } } return allowed } diff --git a/server/configs/authorization.conf b/server/configs/authorization.conf index 3a0f43c4..5923c2de 100644 --- a/server/configs/authorization.conf +++ b/server/configs/authorization.conf @@ -23,6 +23,19 @@ authorization { subscribe = "PUBLIC.>" } + # Service can listen on the request subject and respond to any + # received reply subject. + my_service = { + subscribe = "my.service.req" + publish_allow_responses: true + } + + # Can support a map with max messages and expiration of the permission. + my_stream_service = { + subscribe = "my.service.req" + allow_responses: {max: 10, expires: "1m"} + } + # Default permissions if none presented. e.g. susan below. default_permissions: $default_user @@ -31,5 +44,7 @@ authorization { {user: alice, password: foo, permissions: $super_user} {user: bob, password: bar, permissions: $req_pub_user} {user: susan, password: baz} + {user: svca, password: pc, permissions: $my_service} + {user: svcb, password: sam, permissions: $my_stream_service} ] } diff --git a/server/const.go b/server/const.go index 6e174787..2da215dc 100644 --- a/server/const.go +++ b/server/const.go @@ -163,4 +163,12 @@ const ( // DEFAULT_RTT_MEASUREMENT_INTERVAL is how often we want to measure RTT from // this server to clients, routes, gateways or leafnode connections. DEFAULT_RTT_MEASUREMENT_INTERVAL = time.Hour + + // DEFAULT_ALLOW_RESPONSE_MAX_MSGS is the default number of responses allowed + // for a reply subject. + DEFAULT_ALLOW_RESPONSE_MAX_MSGS = 1 + + // DEFAULT_ALLOW_RESPONSE_EXPIRATION is the default time allowed for a given + // dynamic response permission. + DEFAULT_ALLOW_RESPONSE_EXPIRATION = 2 * time.Minute ) diff --git a/server/opts.go b/server/opts.go index 94bf1ebd..9aad0806 100644 --- a/server/opts.go +++ b/server/opts.go @@ -859,6 +859,12 @@ func parseCluster(v interface{}, opts *Options, errors *[]error, warnings *[]err *errors = append(*errors, err) continue } + // Dynamic response permissions do not make sense here. + if perms.Response != nil { + err := &configErr{tk, fmt.Sprintf("Cluster permissions do not support dynamic responses")} + *errors = append(*errors, err) + continue + } // This will possibly override permissions that were define in auth block setClusterPermissions(&opts.Cluster, perms) default: @@ -1909,26 +1915,49 @@ func parseUserPermissions(mv interface{}, errors, warnings *[]error) (*Permissio return nil, &configErr{tk, fmt.Sprintf("Expected permissions to be a map/struct, got %+v", mv)} } for k, v := range pm { - tk, v = unwrapValue(v) + tk, mv = unwrapValue(v) switch strings.ToLower(k) { // For routes: // Import is Publish // Export is Subscribe case "pub", "publish", "import": - perms, err := parseVariablePermissions(v, errors, warnings) + perms, err := parseVariablePermissions(mv, errors, warnings) if err != nil { *errors = append(*errors, err) continue } p.Publish = perms case "sub", "subscribe", "export": - perms, err := parseVariablePermissions(v, errors, warnings) + perms, err := parseVariablePermissions(mv, errors, warnings) if err != nil { *errors = append(*errors, err) continue } p.Subscribe = perms + case "publish_allow_responses", "allow_responses": + rp := &ResponsePermission{ + MaxMsgs: DEFAULT_ALLOW_RESPONSE_MAX_MSGS, + Expires: DEFAULT_ALLOW_RESPONSE_EXPIRATION, + } + // Try boolean first + responses, ok := mv.(bool) + if ok { + if responses { + p.Response = rp + } + } else { + p.Response = parseAllowResponses(v, errors, warnings) + } + if p.Response != nil { + if p.Publish == nil { + p.Publish = &SubjectPermission{} + } + if p.Publish.Allow == nil { + // We turn off the blanket allow statement. + p.Publish.Allow = []string{} + } + } default: if !tk.IsUsedVariable() { err := &configErr{tk, fmt.Sprintf("Unknown field %q parsing permissions", k)} @@ -1980,6 +2009,52 @@ func parseSubjects(v interface{}, errors, warnings *[]error) ([]string, error) { return subjects, nil } +// Helper function to parse a ResponsePermission. +func parseAllowResponses(v interface{}, errors, warnings *[]error) *ResponsePermission { + tk, v := unwrapValue(v) + // Check if this is a map. + pm, ok := v.(map[string]interface{}) + if !ok { + err := &configErr{tk, "error parsing response permissions, expected a boolean or a map"} + *errors = append(*errors, err) + return nil + } + + rp := &ResponsePermission{ + MaxMsgs: DEFAULT_ALLOW_RESPONSE_MAX_MSGS, + Expires: DEFAULT_ALLOW_RESPONSE_EXPIRATION, + } + + for k, v := range pm { + tk, v = unwrapValue(v) + switch strings.ToLower(k) { + case "max", "max_msgs", "max_messages", "max_responses": + rp.MaxMsgs = int(v.(int64)) + case "expires", "expiration", "ttl": + wd, ok := v.(string) + if ok { + ttl, err := time.ParseDuration(wd) + if err != nil { + err := &configErr{tk, fmt.Sprintf("error parsing expires: %v", err)} + *errors = append(*errors, err) + return nil + } + rp.Expires = ttl + } else { + err := &configErr{tk, "error parsing expires, not a duration string"} + *errors = append(*errors, err) + return nil + } + default: + if !tk.IsUsedVariable() { + err := &configErr{tk, fmt.Sprintf("Unknown field %q parsing permissions", k)} + *errors = append(*errors, err) + } + } + } + return rp +} + // Helper function to parse old style authorization configs. func parseOldPermissionStyle(v interface{}, errors, warnings *[]error) (*SubjectPermission, error) { subjects, err := parseSubjects(v, errors, warnings) diff --git a/server/opts_test.go b/server/opts_test.go index 2e78c585..fc7e6653 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -76,7 +76,7 @@ func TestOptions_RandomPort(t *testing.T) { if opts.Port != 0 { t.Fatalf("Process of options should have resolved random port to "+ - "zero.\nexpected: %d\ngot: %d\n", 0, opts.Port) + "zero.\nexpected: %d\ngot: %d", 0, opts.Port) } } @@ -111,7 +111,7 @@ func TestConfigFile(t *testing.T) { opts, err := ProcessConfigFile("./configs/test.conf") if err != nil { - t.Fatalf("Received an error reading config file: %v\n", err) + t.Fatalf("Received an error reading config file: %v", err) } checkOptionsEqual(t, golden, opts) @@ -129,7 +129,7 @@ func TestTLSConfigFile(t *testing.T) { } opts, err := ProcessConfigFile("./configs/tls.conf") if err != nil { - t.Fatalf("Received an error reading config file: %v\n", err) + t.Fatalf("Received an error reading config file: %v", err) } tlsConfig := opts.TLSConfig if tlsConfig == nil { @@ -156,13 +156,13 @@ func TestTLSConfigFile(t *testing.T) { } cert := tlsConfig.Certificates[0].Leaf if err := cert.VerifyHostname("127.0.0.1"); err != nil { - t.Fatalf("Could not verify hostname in certificate: %v\n", err) + t.Fatalf("Could not verify hostname in certificate: %v", err) } // Now test adding cipher suites. opts, err = ProcessConfigFile("./configs/tls_ciphers.conf") if err != nil { - t.Fatalf("Received an error reading config file: %v\n", err) + t.Fatalf("Received an error reading config file: %v", err) } tlsConfig = opts.TLSConfig if tlsConfig == nil { @@ -208,7 +208,7 @@ func TestTLSConfigFile(t *testing.T) { // test on a file that will load the curve preference defaults opts, err = ProcessConfigFile("./configs/tls_ciphers.conf") if err != nil { - t.Fatalf("Received an error reading config file: %v\n", err) + t.Fatalf("Received an error reading config file: %v", err) } if !reflect.DeepEqual(opts.TLSConfig.CurvePreferences, defaultCurvePreferences()) { @@ -269,7 +269,7 @@ func TestMergeOverrides(t *testing.T) { } fopts, err := ProcessConfigFile("./configs/test.conf") if err != nil { - t.Fatalf("Received an error reading config file: %v\n", err) + t.Fatalf("Received an error reading config file: %v", err) } // Overrides via flags @@ -345,7 +345,7 @@ func TestRouteFlagOverride(t *testing.T) { fopts, err := ProcessConfigFile("./configs/srv_a.conf") if err != nil { - t.Fatalf("Received an error reading config file: %v\n", err) + t.Fatalf("Received an error reading config file: %v", err) } // Overrides via flags @@ -384,7 +384,7 @@ func TestClusterFlagsOverride(t *testing.T) { fopts, err := ProcessConfigFile("./configs/srv_a.conf") if err != nil { - t.Fatalf("Received an error reading config file: %v\n", err) + t.Fatalf("Received an error reading config file: %v", err) } // Overrides via flags @@ -419,7 +419,7 @@ func TestRouteFlagOverrideWithMultiple(t *testing.T) { fopts, err := ProcessConfigFile("./configs/srv_a.conf") if err != nil { - t.Fatalf("Received an error reading config file: %v\n", err) + t.Fatalf("Received an error reading config file: %v", err) } // Overrides via flags @@ -434,23 +434,23 @@ func TestRouteFlagOverrideWithMultiple(t *testing.T) { func TestDynamicPortOnListen(t *testing.T) { opts, err := ProcessConfigFile("./configs/listen-1.conf") if err != nil { - t.Fatalf("Received an error reading config file: %v\n", err) + t.Fatalf("Received an error reading config file: %v", err) } if opts.Port != -1 { - t.Fatalf("Received incorrect port %v, expected -1\n", opts.Port) + t.Fatalf("Received incorrect port %v, expected -1", opts.Port) } if opts.HTTPPort != -1 { - t.Fatalf("Received incorrect monitoring port %v, expected -1\n", opts.HTTPPort) + t.Fatalf("Received incorrect monitoring port %v, expected -1", opts.HTTPPort) } if opts.HTTPSPort != -1 { - t.Fatalf("Received incorrect secure monitoring port %v, expected -1\n", opts.HTTPSPort) + t.Fatalf("Received incorrect secure monitoring port %v, expected -1", opts.HTTPSPort) } } func TestListenConfig(t *testing.T) { opts, err := ProcessConfigFile("./configs/listen.conf") if err != nil { - t.Fatalf("Received an error reading config file: %v\n", err) + t.Fatalf("Received an error reading config file: %v", err) } setBaselineOptions(opts) @@ -459,13 +459,13 @@ func TestListenConfig(t *testing.T) { port := 4422 monHost := "127.0.0.1" if opts.Host != host { - t.Fatalf("Received incorrect host %q, expected %q\n", opts.Host, host) + t.Fatalf("Received incorrect host %q, expected %q", opts.Host, host) } if opts.HTTPHost != monHost { - t.Fatalf("Received incorrect host %q, expected %q\n", opts.HTTPHost, monHost) + t.Fatalf("Received incorrect host %q, expected %q", opts.HTTPHost, monHost) } if opts.Port != port { - t.Fatalf("Received incorrect port %v, expected %v\n", opts.Port, port) + t.Fatalf("Received incorrect port %v, expected %v", opts.Port, port) } // Clustering @@ -473,10 +473,10 @@ func TestListenConfig(t *testing.T) { clusterPort := 4244 if opts.Cluster.Host != clusterHost { - t.Fatalf("Received incorrect cluster host %q, expected %q\n", opts.Cluster.Host, clusterHost) + t.Fatalf("Received incorrect cluster host %q, expected %q", opts.Cluster.Host, clusterHost) } if opts.Cluster.Port != clusterPort { - t.Fatalf("Received incorrect cluster port %v, expected %v\n", opts.Cluster.Port, clusterPort) + t.Fatalf("Received incorrect cluster port %v, expected %v", opts.Cluster.Port, clusterPort) } // HTTP @@ -484,56 +484,56 @@ func TestListenConfig(t *testing.T) { httpPort := 8422 if opts.HTTPHost != httpHost { - t.Fatalf("Received incorrect http host %q, expected %q\n", opts.HTTPHost, httpHost) + t.Fatalf("Received incorrect http host %q, expected %q", opts.HTTPHost, httpHost) } if opts.HTTPPort != httpPort { - t.Fatalf("Received incorrect http port %v, expected %v\n", opts.HTTPPort, httpPort) + t.Fatalf("Received incorrect http port %v, expected %v", opts.HTTPPort, httpPort) } // HTTPS httpsPort := 9443 if opts.HTTPSPort != httpsPort { - t.Fatalf("Received incorrect https port %v, expected %v\n", opts.HTTPSPort, httpsPort) + t.Fatalf("Received incorrect https port %v, expected %v", opts.HTTPSPort, httpsPort) } } func TestListenPortOnlyConfig(t *testing.T) { opts, err := ProcessConfigFile("./configs/listen_port.conf") if err != nil { - t.Fatalf("Received an error reading config file: %v\n", err) + t.Fatalf("Received an error reading config file: %v", err) } setBaselineOptions(opts) port := 8922 if opts.Host != DEFAULT_HOST { - t.Fatalf("Received incorrect host %q, expected %q\n", opts.Host, DEFAULT_HOST) + t.Fatalf("Received incorrect host %q, expected %q", opts.Host, DEFAULT_HOST) } if opts.HTTPHost != DEFAULT_HOST { - t.Fatalf("Received incorrect host %q, expected %q\n", opts.Host, DEFAULT_HOST) + t.Fatalf("Received incorrect host %q, expected %q", opts.Host, DEFAULT_HOST) } if opts.Port != port { - t.Fatalf("Received incorrect port %v, expected %v\n", opts.Port, port) + t.Fatalf("Received incorrect port %v, expected %v", opts.Port, port) } } func TestListenPortWithColonConfig(t *testing.T) { opts, err := ProcessConfigFile("./configs/listen_port_with_colon.conf") if err != nil { - t.Fatalf("Received an error reading config file: %v\n", err) + t.Fatalf("Received an error reading config file: %v", err) } setBaselineOptions(opts) port := 8922 if opts.Host != DEFAULT_HOST { - t.Fatalf("Received incorrect host %q, expected %q\n", opts.Host, DEFAULT_HOST) + t.Fatalf("Received incorrect host %q, expected %q", opts.Host, DEFAULT_HOST) } if opts.HTTPHost != DEFAULT_HOST { - t.Fatalf("Received incorrect host %q, expected %q\n", opts.Host, DEFAULT_HOST) + t.Fatalf("Received incorrect host %q, expected %q", opts.Host, DEFAULT_HOST) } if opts.Port != port { - t.Fatalf("Received incorrect port %v, expected %v\n", opts.Port, port) + t.Fatalf("Received incorrect port %v, expected %v", opts.Port, port) } } @@ -545,20 +545,20 @@ func TestListenMonitoringDefault(t *testing.T) { host := "10.0.1.22" if opts.Host != host { - t.Fatalf("Received incorrect host %q, expected %q\n", opts.Host, host) + t.Fatalf("Received incorrect host %q, expected %q", opts.Host, host) } if opts.HTTPHost != host { - t.Fatalf("Received incorrect host %q, expected %q\n", opts.Host, host) + t.Fatalf("Received incorrect host %q, expected %q", opts.Host, host) } if opts.Port != DEFAULT_PORT { - t.Fatalf("Received incorrect port %v, expected %v\n", opts.Port, DEFAULT_PORT) + t.Fatalf("Received incorrect port %v, expected %v", opts.Port, DEFAULT_PORT) } } func TestMultipleUsersConfig(t *testing.T) { opts, err := ProcessConfigFile("./configs/multiple_users.conf") if err != nil { - t.Fatalf("Received an error reading config file: %v\n", err) + t.Fatalf("Received an error reading config file: %v", err) } setBaselineOptions(opts) } @@ -568,12 +568,12 @@ func TestMultipleUsersConfig(t *testing.T) { func TestAuthorizationConfig(t *testing.T) { opts, err := ProcessConfigFile("./configs/authorization.conf") if err != nil { - t.Fatalf("Received an error reading config file: %v\n", err) + t.Fatalf("Received an error reading config file: %v", err) } setBaselineOptions(opts) lu := len(opts.Users) - if lu != 3 { - t.Fatalf("Expected 3 users, got %d\n", lu) + if lu != 5 { + t.Fatalf("Expected 5 users, got %d", lu) } // Build a map mu := make(map[string]*User) @@ -584,69 +584,153 @@ func TestAuthorizationConfig(t *testing.T) { // Alice alice, ok := mu["alice"] if !ok { - t.Fatalf("Expected to see user Alice\n") + t.Fatalf("Expected to see user Alice") } // Check for permissions details if alice.Permissions == nil { - t.Fatalf("Expected Alice's permissions to be non-nil\n") + t.Fatalf("Expected Alice's permissions to be non-nil") } if alice.Permissions.Publish == nil { - t.Fatalf("Expected Alice's publish permissions to be non-nil\n") + t.Fatalf("Expected Alice's publish permissions to be non-nil") } if len(alice.Permissions.Publish.Allow) != 1 { - t.Fatalf("Expected Alice's publish permissions to have 1 element, got %d\n", + t.Fatalf("Expected Alice's publish permissions to have 1 element, got %d", len(alice.Permissions.Publish.Allow)) } pubPerm := alice.Permissions.Publish.Allow[0] if pubPerm != "*" { - t.Fatalf("Expected Alice's publish permissions to be '*', got %q\n", pubPerm) + t.Fatalf("Expected Alice's publish permissions to be '*', got %q", pubPerm) } if alice.Permissions.Subscribe == nil { - t.Fatalf("Expected Alice's subscribe permissions to be non-nil\n") + t.Fatalf("Expected Alice's subscribe permissions to be non-nil") } if len(alice.Permissions.Subscribe.Allow) != 1 { - t.Fatalf("Expected Alice's subscribe permissions to have 1 element, got %d\n", + t.Fatalf("Expected Alice's subscribe permissions to have 1 element, got %d", len(alice.Permissions.Subscribe.Allow)) } subPerm := alice.Permissions.Subscribe.Allow[0] if subPerm != ">" { - t.Fatalf("Expected Alice's subscribe permissions to be '>', got %q\n", subPerm) + t.Fatalf("Expected Alice's subscribe permissions to be '>', got %q", subPerm) } // Bob bob, ok := mu["bob"] if !ok { - t.Fatalf("Expected to see user Bob\n") + t.Fatalf("Expected to see user Bob") } if bob.Permissions == nil { - t.Fatalf("Expected Bob's permissions to be non-nil\n") + t.Fatalf("Expected Bob's permissions to be non-nil") } // Susan susan, ok := mu["susan"] if !ok { - t.Fatalf("Expected to see user Susan\n") + t.Fatalf("Expected to see user Susan") } if susan.Permissions == nil { - t.Fatalf("Expected Susan's permissions to be non-nil\n") + t.Fatalf("Expected Susan's permissions to be non-nil") } // Check susan closely since she inherited the default permissions. if susan.Permissions == nil { - t.Fatalf("Expected Susan's permissions to be non-nil\n") + t.Fatalf("Expected Susan's permissions to be non-nil") } if susan.Permissions.Publish != nil { - t.Fatalf("Expected Susan's publish permissions to be nil\n") + t.Fatalf("Expected Susan's publish permissions to be nil") } if susan.Permissions.Subscribe == nil { - t.Fatalf("Expected Susan's subscribe permissions to be non-nil\n") + t.Fatalf("Expected Susan's subscribe permissions to be non-nil") } if len(susan.Permissions.Subscribe.Allow) != 1 { - t.Fatalf("Expected Susan's subscribe permissions to have 1 element, got %d\n", + t.Fatalf("Expected Susan's subscribe permissions to have 1 element, got %d", len(susan.Permissions.Subscribe.Allow)) } subPerm = susan.Permissions.Subscribe.Allow[0] if subPerm != "PUBLIC.>" { - t.Fatalf("Expected Susan's subscribe permissions to be 'PUBLIC.>', got %q\n", subPerm) + t.Fatalf("Expected Susan's subscribe permissions to be 'PUBLIC.>', got %q", subPerm) + } + + // Service A + svca, ok := mu["svca"] + if !ok { + t.Fatalf("Expected to see user Service A") + } + if svca.Permissions == nil { + t.Fatalf("Expected Service A's permissions to be non-nil") + } + if svca.Permissions.Subscribe == nil { + t.Fatalf("Expected Service A's subscribe permissions to be non-nil") + } + if len(svca.Permissions.Subscribe.Allow) != 1 { + t.Fatalf("Expected Service A's subscribe permissions to have 1 element, got %d", + len(svca.Permissions.Subscribe.Allow)) + } + subPerm = svca.Permissions.Subscribe.Allow[0] + if subPerm != "my.service.req" { + t.Fatalf("Expected Service A's subscribe permissions to be 'my.service.req', got %q", subPerm) + } + // We want allow_responses to essentially set deny all, or allow none in this case. + if svca.Permissions.Publish == nil { + t.Fatalf("Expected Service A's publish permissions to be non-nil") + } + if len(svca.Permissions.Publish.Allow) != 0 { + t.Fatalf("Expected Service A's publish permissions to have no elements, got %d", + len(svca.Permissions.Publish.Allow)) + } + // We should have a ResponsePermission present with default values. + if svca.Permissions.Response == nil { + t.Fatalf("Expected Service A's response permissions to be non-nil") + } + if svca.Permissions.Response.MaxMsgs != DEFAULT_ALLOW_RESPONSE_MAX_MSGS { + t.Fatalf("Expected Service A's response permissions of max msgs to be %d, got %d", + DEFAULT_ALLOW_RESPONSE_MAX_MSGS, svca.Permissions.Response.MaxMsgs, + ) + } + if svca.Permissions.Response.Expires != DEFAULT_ALLOW_RESPONSE_EXPIRATION { + t.Fatalf("Expected Service A's response permissions of expiration to be %v, got %v", + DEFAULT_ALLOW_RESPONSE_EXPIRATION, svca.Permissions.Response.Expires, + ) + } + + // Service B + svcb, ok := mu["svcb"] + if !ok { + t.Fatalf("Expected to see user Service B") + } + if svcb.Permissions == nil { + t.Fatalf("Expected Service B's permissions to be non-nil") + } + if svcb.Permissions.Subscribe == nil { + t.Fatalf("Expected Service B's subscribe permissions to be non-nil") + } + if len(svcb.Permissions.Subscribe.Allow) != 1 { + t.Fatalf("Expected Service B's subscribe permissions to have 1 element, got %d", + len(svcb.Permissions.Subscribe.Allow)) + } + subPerm = svcb.Permissions.Subscribe.Allow[0] + if subPerm != "my.service.req" { + t.Fatalf("Expected Service B's subscribe permissions to be 'my.service.req', got %q", subPerm) + } + // We want allow_responses to essentially set deny all, or allow none in this case. + if svcb.Permissions.Publish == nil { + t.Fatalf("Expected Service B's publish permissions to be non-nil") + } + if len(svcb.Permissions.Publish.Allow) != 0 { + t.Fatalf("Expected Service B's publish permissions to have no elements, got %d", + len(svcb.Permissions.Publish.Allow)) + } + // We should have a ResponsePermission present with default values. + if svcb.Permissions.Response == nil { + t.Fatalf("Expected Service B's response permissions to be non-nil") + } + if svcb.Permissions.Response.MaxMsgs != 10 { + t.Fatalf("Expected Service B's response permissions of max msgs to be %d, got %d", + 10, svcb.Permissions.Response.MaxMsgs, + ) + } + if svcb.Permissions.Response.Expires != time.Minute { + t.Fatalf("Expected Service B's response permissions of expiration to be %v, got %v", + time.Minute, svcb.Permissions.Response.Expires, + ) } } @@ -655,13 +739,13 @@ func TestAuthorizationConfig(t *testing.T) { func TestNewStyleAuthorizationConfig(t *testing.T) { opts, err := ProcessConfigFile("./configs/new_style_authorization.conf") if err != nil { - t.Fatalf("Received an error reading config file: %v\n", err) + t.Fatalf("Received an error reading config file: %v", err) } setBaselineOptions(opts) lu := len(opts.Users) if lu != 2 { - t.Fatalf("Expected 2 users, got %d\n", lu) + t.Fatalf("Expected 2 users, got %d", lu) } // Build a map mu := make(map[string]*User) @@ -671,99 +755,99 @@ func TestNewStyleAuthorizationConfig(t *testing.T) { // Alice alice, ok := mu["alice"] if !ok { - t.Fatalf("Expected to see user Alice\n") + t.Fatalf("Expected to see user Alice") } if alice.Permissions == nil { - t.Fatalf("Expected Alice's permissions to be non-nil\n") + t.Fatalf("Expected Alice's permissions to be non-nil") } if alice.Permissions.Publish == nil { - t.Fatalf("Expected Alice's publish permissions to be non-nil\n") + t.Fatalf("Expected Alice's publish permissions to be non-nil") } if len(alice.Permissions.Publish.Allow) != 3 { - t.Fatalf("Expected Alice's allowed publish permissions to have 3 elements, got %d\n", + t.Fatalf("Expected Alice's allowed publish permissions to have 3 elements, got %d", len(alice.Permissions.Publish.Allow)) } pubPerm := alice.Permissions.Publish.Allow[0] if pubPerm != "foo" { - t.Fatalf("Expected Alice's first allowed publish permission to be 'foo', got %q\n", pubPerm) + t.Fatalf("Expected Alice's first allowed publish permission to be 'foo', got %q", pubPerm) } pubPerm = alice.Permissions.Publish.Allow[1] if pubPerm != "bar" { - t.Fatalf("Expected Alice's second allowed publish permission to be 'bar', got %q\n", pubPerm) + t.Fatalf("Expected Alice's second allowed publish permission to be 'bar', got %q", pubPerm) } pubPerm = alice.Permissions.Publish.Allow[2] if pubPerm != "baz" { - t.Fatalf("Expected Alice's third allowed publish permission to be 'baz', got %q\n", pubPerm) + t.Fatalf("Expected Alice's third allowed publish permission to be 'baz', got %q", pubPerm) } if len(alice.Permissions.Publish.Deny) != 0 { - t.Fatalf("Expected Alice's denied publish permissions to have 0 elements, got %d\n", + t.Fatalf("Expected Alice's denied publish permissions to have 0 elements, got %d", len(alice.Permissions.Publish.Deny)) } if alice.Permissions.Subscribe == nil { - t.Fatalf("Expected Alice's subscribe permissions to be non-nil\n") + t.Fatalf("Expected Alice's subscribe permissions to be non-nil") } if len(alice.Permissions.Subscribe.Allow) != 0 { - t.Fatalf("Expected Alice's allowed subscribe permissions to have 0 elements, got %d\n", + t.Fatalf("Expected Alice's allowed subscribe permissions to have 0 elements, got %d", len(alice.Permissions.Subscribe.Allow)) } if len(alice.Permissions.Subscribe.Deny) != 1 { - t.Fatalf("Expected Alice's denied subscribe permissions to have 1 element, got %d\n", + t.Fatalf("Expected Alice's denied subscribe permissions to have 1 element, got %d", len(alice.Permissions.Subscribe.Deny)) } subPerm := alice.Permissions.Subscribe.Deny[0] if subPerm != "$SYSTEM.>" { - t.Fatalf("Expected Alice's only denied subscribe permission to be '$SYSTEM.>', got %q\n", subPerm) + t.Fatalf("Expected Alice's only denied subscribe permission to be '$SYSTEM.>', got %q", subPerm) } // Bob bob, ok := mu["bob"] if !ok { - t.Fatalf("Expected to see user Bob\n") + t.Fatalf("Expected to see user Bob") } if bob.Permissions == nil { - t.Fatalf("Expected Bob's permissions to be non-nil\n") + t.Fatalf("Expected Bob's permissions to be non-nil") } if bob.Permissions.Publish == nil { - t.Fatalf("Expected Bobs's publish permissions to be non-nil\n") + t.Fatalf("Expected Bobs's publish permissions to be non-nil") } if len(bob.Permissions.Publish.Allow) != 1 { - t.Fatalf("Expected Bob's allowed publish permissions to have 1 element, got %d\n", + t.Fatalf("Expected Bob's allowed publish permissions to have 1 element, got %d", len(bob.Permissions.Publish.Allow)) } pubPerm = bob.Permissions.Publish.Allow[0] if pubPerm != "$SYSTEM.>" { - t.Fatalf("Expected Bob's first allowed publish permission to be '$SYSTEM.>', got %q\n", pubPerm) + t.Fatalf("Expected Bob's first allowed publish permission to be '$SYSTEM.>', got %q", pubPerm) } if len(bob.Permissions.Publish.Deny) != 0 { - t.Fatalf("Expected Bob's denied publish permissions to have 0 elements, got %d\n", + t.Fatalf("Expected Bob's denied publish permissions to have 0 elements, got %d", len(bob.Permissions.Publish.Deny)) } if bob.Permissions.Subscribe == nil { - t.Fatalf("Expected Bob's subscribe permissions to be non-nil\n") + t.Fatalf("Expected Bob's subscribe permissions to be non-nil") } if len(bob.Permissions.Subscribe.Allow) != 0 { - t.Fatalf("Expected Bob's allowed subscribe permissions to have 0 elements, got %d\n", + t.Fatalf("Expected Bob's allowed subscribe permissions to have 0 elements, got %d", len(bob.Permissions.Subscribe.Allow)) } if len(bob.Permissions.Subscribe.Deny) != 3 { - t.Fatalf("Expected Bobs's denied subscribe permissions to have 3 elements, got %d\n", + t.Fatalf("Expected Bobs's denied subscribe permissions to have 3 elements, got %d", len(bob.Permissions.Subscribe.Deny)) } subPerm = bob.Permissions.Subscribe.Deny[0] if subPerm != "foo" { - t.Fatalf("Expected Bobs's first denied subscribe permission to be 'foo', got %q\n", subPerm) + t.Fatalf("Expected Bobs's first denied subscribe permission to be 'foo', got %q", subPerm) } subPerm = bob.Permissions.Subscribe.Deny[1] if subPerm != "bar" { - t.Fatalf("Expected Bobs's second denied subscribe permission to be 'bar', got %q\n", subPerm) + t.Fatalf("Expected Bobs's second denied subscribe permission to be 'bar', got %q", subPerm) } subPerm = bob.Permissions.Subscribe.Deny[2] if subPerm != "baz" { - t.Fatalf("Expected Bobs's third denied subscribe permission to be 'baz', got %q\n", subPerm) + t.Fatalf("Expected Bobs's third denied subscribe permission to be 'baz', got %q", subPerm) } } @@ -816,7 +900,7 @@ func TestNkeyUsersWithPermsConfig(t *testing.T) { t.Fatal("Expected to have publish permissions") } if nk.Permissions.Publish.Allow[0] != "$SYSTEM.>" { - t.Fatalf("Expected publish to allow \"$SYSTEM.>\", but got %v\n", nk.Permissions.Publish.Allow[0]) + t.Fatalf("Expected publish to allow \"$SYSTEM.>\", but got %v", nk.Permissions.Publish.Allow[0]) } if nk.Permissions.Subscribe == nil { t.Fatal("Expected to have subscribe permissions") @@ -909,7 +993,7 @@ func TestTokenWithUsers(t *testing.T) { func TestParseWriteDeadline(t *testing.T) { confFile := "test.conf" defer os.Remove(confFile) - if err := ioutil.WriteFile(confFile, []byte("write_deadline: \"1x\"\n"), 0666); err != nil { + if err := ioutil.WriteFile(confFile, []byte("write_deadline: \"1x\""), 0666); err != nil { t.Fatalf("Error writing config file: %v", err) } _, err := ProcessConfigFile(confFile) @@ -920,7 +1004,7 @@ func TestParseWriteDeadline(t *testing.T) { t.Fatalf("Expected error related to parsing, got %v", err) } os.Remove(confFile) - if err := ioutil.WriteFile(confFile, []byte("write_deadline: \"1s\"\n"), 0666); err != nil { + if err := ioutil.WriteFile(confFile, []byte("write_deadline: \"1s\""), 0666); err != nil { t.Fatalf("Error writing config file: %v", err) } opts, err := ProcessConfigFile(confFile) @@ -938,7 +1022,7 @@ func TestParseWriteDeadline(t *testing.T) { os.Stdout = oldStdout }() os.Stdout = w - if err := ioutil.WriteFile(confFile, []byte("write_deadline: 2\n"), 0666); err != nil { + if err := ioutil.WriteFile(confFile, []byte("write_deadline: 2"), 0666); err != nil { t.Fatalf("Error writing config file: %v", err) } opts, err = ProcessConfigFile(confFile) @@ -1044,14 +1128,14 @@ func TestEmptyConfig(t *testing.T) { func TestMalformedListenAddress(t *testing.T) { opts, err := ProcessConfigFile("./configs/malformed_listen_address.conf") if err == nil { - t.Fatalf("Expected an error reading config file: got %+v\n", opts) + t.Fatalf("Expected an error reading config file: got %+v", opts) } } func TestMalformedClusterAddress(t *testing.T) { opts, err := ProcessConfigFile("./configs/malformed_cluster_address.conf") if err == nil { - t.Fatalf("Expected an error reading config file: got %+v\n", opts) + t.Fatalf("Expected an error reading config file: got %+v", opts) } } @@ -1757,9 +1841,9 @@ func TestParsingGatewaysErrors(t *testing.T) { } _, err := ProcessConfigFile(file) if err == nil { - t.Fatalf("Expected to fail, did not. Content:\n%s\n", test.content) + t.Fatalf("Expected to fail, did not. Content:\n%s", test.content) } else if !strings.Contains(err.Error(), test.expectedErr) { - t.Fatalf("Expected error containing %q, got %q, for content:\n%s\n", test.expectedErr, err, test.content) + t.Fatalf("Expected error containing %q, got %q, for content:\n%s", test.expectedErr, err, test.content) } }) } diff --git a/server/route.go b/server/route.go index fdff9bc4..2a8c7bc0 100644 --- a/server/route.go +++ b/server/route.go @@ -627,7 +627,7 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) { func (c *client) canImport(subject string) bool { // Use pubAllowed() since this checks Publish permissions which // is what Import maps to. - return c.pubAllowed(subject) + return c.pubAllowedFullCheck(subject, false) } // canExport is whether or not we will accept a SUB from the remote for a given subject. diff --git a/test/configs/authorization.conf b/test/configs/authorization.conf index 356abf26..69d62299 100644 --- a/test/configs/authorization.conf +++ b/test/configs/authorization.conf @@ -16,5 +16,7 @@ authorization { {user: ns, password: $PASS, permissions: $NEW_STYLE} {user: ns-pub, password: $PASS, permissions: $NS_PUB} {user: bench-deny, password: $PASS, permissions: $BENCH_DENY} + {user: svca, password: $PASS, permissions: $MY_SERVICE} + {user: svcb, password: $PASS, permissions: $MY_STREAM_SERVICE} ] } diff --git a/test/configs/auths.conf b/test/configs/auths.conf index 545317bf..3b0dd87c 100644 --- a/test/configs/auths.conf +++ b/test/configs/auths.conf @@ -50,4 +50,19 @@ BENCH_DENY = { allow = ["foo", "*"] deny = "foo.bar" } -} \ No newline at end of file +} + +# This is for services where you only want +# responses to reply subjects to be allowed. +MY_SERVICE = { + subscribe = "my.service.req" + publish_allow_responses: true + } + +# This is a more detailed example where responses +# could be streams and you want to set the TTL +# and maximum allowed. +MY_STREAM_SERVICE = { + subscribe = "my.service.req" + allow_responses = {max: 10, ttl: "10ms"} +} diff --git a/test/user_authorization_test.go b/test/user_authorization_test.go index 03488ec1..84275fbc 100644 --- a/test/user_authorization_test.go +++ b/test/user_authorization_test.go @@ -1,4 +1,4 @@ -// Copyright 2016-2018 The NATS Authors +// Copyright 2016-2019 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -16,6 +16,7 @@ package test import ( "regexp" "testing" + "time" ) const DefaultPass = "foo" @@ -221,3 +222,95 @@ func TestUserAuthorizationProto(t *testing.T) { sendProto(t, c, "SUB SYS.bar 5\r\n") expectResult(t, c, errRe) } + +func TestUserAuthorizationAllowResponses(t *testing.T) { + srv, opts := RunServerWithConfig("./configs/authorization.conf") + defer srv.Shutdown() + + // Alice can do anything, so she will be our requestor + rc := createClientConn(t, opts.Host, opts.Port) + defer rc.Close() + expectAuthRequired(t, rc) + doAuthConnect(t, rc, "", "alice", DefaultPass) + expectResult(t, rc, okRe) + + // MY_SERVICE can subscribe to a single request subject but can + // respond to any reply subject that it receives, but only + // for one response. + c := createClientConn(t, opts.Host, opts.Port) + defer c.Close() + expectAuthRequired(t, c) + doAuthConnect(t, c, "", "svca", DefaultPass) + expectResult(t, c, okRe) + + sendProto(t, c, "SUB my.service.req 1\r\n") + expectResult(t, c, okRe) + + sendProto(t, rc, "PUB my.service.req resp.bar.22 2\r\nok\r\n") + expectResult(t, rc, okRe) + + matches := msgRe.FindAllSubmatch(expectResult(t, c, msgRe), -1) + checkMsg(t, matches[0], "my.service.req", "1", "resp.bar.22", "2", "ok") + + // This should be allowed + sendProto(t, c, "PUB resp.bar.22 2\r\nok\r\n") + expectResult(t, c, okRe) + + // This should not be allowed + sendProto(t, c, "PUB resp.bar.33 2\r\nok\r\n") + expectResult(t, c, errRe) + + // This should also not be allowed now since we already sent a response and max is 1. + sendProto(t, c, "PUB resp.bar.22 2\r\nok\r\n") + expectResult(t, c, errRe) + + c.Close() // from MY_SERVICE + + // MY_STREAM_SERVICE can subscribe to a single request subject but can + // respond to any reply subject that it receives, and send up to 10 responses. + // Each permission for a response can last up to 10ms. + c = createClientConn(t, opts.Host, opts.Port) + defer c.Close() + expectAuthRequired(t, c) + doAuthConnect(t, c, "", "svcb", DefaultPass) + expectResult(t, c, okRe) + + sendProto(t, c, "SUB my.service.req 1\r\n") + expectResult(t, c, okRe) + + // Same rules as above. + sendProto(t, rc, "PUB my.service.req resp.bar.22 2\r\nok\r\n") + expectResult(t, rc, okRe) + + matches = msgRe.FindAllSubmatch(expectResult(t, c, msgRe), -1) + checkMsg(t, matches[0], "my.service.req", "1", "resp.bar.22", "2", "ok") + + // This should be allowed + sendProto(t, c, "PUB resp.bar.22 2\r\nok\r\n") + expectResult(t, c, okRe) + + // This should not be allowed + sendProto(t, c, "PUB resp.bar.33 2\r\nok\r\n") + expectResult(t, c, errRe) + + // We should be able to send 9 more here since we are allowed 10 total + for i := 0; i < 9; i++ { + sendProto(t, c, "PUB resp.bar.22 2\r\nok\r\n") + expectResult(t, c, okRe) + } + // Now this should fail since we already sent 10 responses. + sendProto(t, c, "PUB resp.bar.22 2\r\nok\r\n") + expectResult(t, c, errRe) + + // Now test timeout. + sendProto(t, rc, "PUB my.service.req resp.bar.11 2\r\nok\r\n") + expectResult(t, rc, okRe) + + matches = msgRe.FindAllSubmatch(expectResult(t, c, msgRe), -1) + checkMsg(t, matches[0], "my.service.req", "1", "resp.bar.11", "2", "ok") + + time.Sleep(20 * time.Millisecond) + + sendProto(t, c, "PUB resp.bar.11 2\r\nok\r\n") + expectResult(t, c, errRe) +}