mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-17 03:24:40 -07:00
Fixed ResponsePermissions
- Ensure that defaults are set when values are 0 - Fixed some tests - Added some helpers in jwt tests to reduce copy/paste Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
This commit is contained in:
@@ -1739,16 +1739,13 @@ func buildInternalNkeyUser(uc *jwt.UserClaims, acc *Account) *NkeyUser {
|
||||
}
|
||||
if uc.Resp != nil {
|
||||
if p == nil {
|
||||
p = &Permissions{Publish: &SubjectPermission{}}
|
||||
}
|
||||
if p.Publish.Allow == nil {
|
||||
// We turn off the blanket allow statement.
|
||||
p.Publish.Allow = []string{}
|
||||
p = &Permissions{}
|
||||
}
|
||||
p.Response = &ResponsePermission{
|
||||
MaxMsgs: uc.Resp.MaxMsgs,
|
||||
Expires: uc.Resp.Expires,
|
||||
}
|
||||
validateResponsePermissions(p)
|
||||
}
|
||||
nu.Permissions = p
|
||||
return nu
|
||||
|
||||
@@ -193,6 +193,31 @@ func (s *Server) assignGlobalAccountToOrphanUsers() {
|
||||
}
|
||||
}
|
||||
|
||||
// If the given permissions has a ResponsePermission
|
||||
// set, ensure that defaults are set (if values are 0)
|
||||
// and that a Publish permission is set, and Allow
|
||||
// is disabled if not explicitly set.
|
||||
func validateResponsePermissions(p *Permissions) {
|
||||
if p == nil || p.Response == nil {
|
||||
return
|
||||
}
|
||||
if p.Publish == nil {
|
||||
p.Publish = &SubjectPermission{}
|
||||
}
|
||||
if p.Publish.Allow == nil {
|
||||
// We turn off the blanket allow statement.
|
||||
p.Publish.Allow = []string{}
|
||||
}
|
||||
// If there is a response permission, ensure
|
||||
// that if value is 0, we set the default value.
|
||||
if p.Response.MaxMsgs == 0 {
|
||||
p.Response.MaxMsgs = DEFAULT_ALLOW_RESPONSE_MAX_MSGS
|
||||
}
|
||||
if p.Response.Expires == 0 {
|
||||
p.Response.Expires = DEFAULT_ALLOW_RESPONSE_EXPIRATION
|
||||
}
|
||||
}
|
||||
|
||||
// configureAuthorization will do any setup needed for authorization.
|
||||
// Lock is assumed held.
|
||||
func (s *Server) configureAuthorization() {
|
||||
@@ -220,6 +245,9 @@ func (s *Server) configureAuthorization() {
|
||||
copy.Account = v.(*Account)
|
||||
}
|
||||
}
|
||||
if copy.Permissions != nil {
|
||||
validateResponsePermissions(copy.Permissions)
|
||||
}
|
||||
s.nkeys[u.Nkey] = copy
|
||||
}
|
||||
}
|
||||
@@ -232,6 +260,9 @@ func (s *Server) configureAuthorization() {
|
||||
copy.Account = v.(*Account)
|
||||
}
|
||||
}
|
||||
if copy.Permissions != nil {
|
||||
validateResponsePermissions(copy.Permissions)
|
||||
}
|
||||
s.users[u.Username] = copy
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1363,3 +1363,88 @@ func TestClientMaxPending(t *testing.T) {
|
||||
natsPub(t, nc, "foo", []byte("msg"))
|
||||
natsNexMsg(t, sub, 100*time.Millisecond)
|
||||
}
|
||||
|
||||
func TestResponsePermissions(t *testing.T) {
|
||||
for i, test := range []struct {
|
||||
name string
|
||||
perms *ResponsePermission
|
||||
}{
|
||||
{"max_msgs", &ResponsePermission{MaxMsgs: 2, Expires: time.Hour}},
|
||||
{"no_expire_limit", &ResponsePermission{MaxMsgs: 3, Expires: -1 * time.Millisecond}},
|
||||
{"expire", &ResponsePermission{MaxMsgs: 1000, Expires: 100 * time.Millisecond}},
|
||||
{"no_msgs_limit", &ResponsePermission{MaxMsgs: -1, Expires: 100 * time.Millisecond}},
|
||||
} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
opts := DefaultOptions()
|
||||
u1 := &User{
|
||||
Username: "service",
|
||||
Password: "pwd",
|
||||
Permissions: &Permissions{Response: test.perms},
|
||||
}
|
||||
u2 := &User{Username: "ivan", Password: "pwd"}
|
||||
opts.Users = []*User{u1, u2}
|
||||
s := RunServer(opts)
|
||||
defer s.Shutdown()
|
||||
|
||||
svcNC := natsConnect(t, fmt.Sprintf("nats://service:pwd@%s:%d", opts.Host, opts.Port))
|
||||
defer svcNC.Close()
|
||||
reqSub := natsSubSync(t, svcNC, "request")
|
||||
|
||||
nc := natsConnect(t, fmt.Sprintf("nats://ivan:pwd@%s:%d", opts.Host, opts.Port))
|
||||
defer nc.Close()
|
||||
|
||||
replySub := natsSubSync(t, nc, "reply")
|
||||
|
||||
natsPubReq(t, nc, "request", "reply", []byte("req1"))
|
||||
|
||||
req1 := natsNexMsg(t, reqSub, 100*time.Millisecond)
|
||||
|
||||
checkFailed := func(t *testing.T) {
|
||||
t.Helper()
|
||||
if reply, err := replySub.NextMsg(100 * time.Millisecond); err != nats.ErrTimeout {
|
||||
if reply != nil {
|
||||
t.Fatalf("Expected to receive timeout, got reply=%q", reply.Data)
|
||||
} else {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
switch i {
|
||||
case 0:
|
||||
// Should allow only 2 replies...
|
||||
for i := 0; i < 10; i++ {
|
||||
natsPub(t, svcNC, req1.Reply, []byte("reply"))
|
||||
}
|
||||
natsNexMsg(t, replySub, 100*time.Millisecond)
|
||||
natsNexMsg(t, replySub, 100*time.Millisecond)
|
||||
// The next should fail...
|
||||
checkFailed(t)
|
||||
case 1:
|
||||
// Expiration is set to -1ms, which should count as infinite...
|
||||
natsPub(t, svcNC, req1.Reply, []byte("reply"))
|
||||
// Sleep a bit before next send
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
natsPub(t, svcNC, req1.Reply, []byte("reply"))
|
||||
// Make sure we receive both
|
||||
natsNexMsg(t, replySub, 100*time.Millisecond)
|
||||
natsNexMsg(t, replySub, 100*time.Millisecond)
|
||||
case 2:
|
||||
fallthrough
|
||||
case 3:
|
||||
// Expire set to 100ms so make sure we wait more between
|
||||
// next publish
|
||||
natsPub(t, svcNC, req1.Reply, []byte("reply"))
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
natsPub(t, svcNC, req1.Reply, []byte("reply"))
|
||||
// Should receive one, and fail on the other
|
||||
natsNexMsg(t, replySub, 100*time.Millisecond)
|
||||
checkFailed(t)
|
||||
}
|
||||
// When testing expiration, sleep before sending next reply
|
||||
if i >= 2 {
|
||||
time.Sleep(400 * time.Millisecond)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1521,7 +1521,7 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er
|
||||
return nil
|
||||
}
|
||||
|
||||
// Parse the account imports
|
||||
// Parse the account exports
|
||||
func parseAccountExports(v interface{}, acc *Account, errors, warnings *[]error) ([]*export, []*export, error) {
|
||||
// This should be an array of objects/maps.
|
||||
tk, v := unwrapValue(v)
|
||||
@@ -1608,7 +1608,7 @@ func parseAccount(v map[string]interface{}, errors, warnings *[]error) (string,
|
||||
return accountName, subject, nil
|
||||
}
|
||||
|
||||
// Parse an import stream or service.
|
||||
// Parse an export stream or service.
|
||||
// e.g.
|
||||
// {stream: "public.>"} # No accounts means public.
|
||||
// {stream: "synadia.private.>", accounts: [cncf, natsio]}
|
||||
@@ -1621,7 +1621,9 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo
|
||||
accounts []string
|
||||
rt ServiceRespType
|
||||
rtSeen bool
|
||||
rtToken token
|
||||
lat *serviceLatency
|
||||
latToken token
|
||||
)
|
||||
tk, v := unwrapValue(v)
|
||||
vv, ok := v.(map[string]interface{})
|
||||
@@ -1637,8 +1639,13 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo
|
||||
*errors = append(*errors, err)
|
||||
continue
|
||||
}
|
||||
if rtSeen {
|
||||
err := &configErr{tk, "Detected response directive on non-service"}
|
||||
if rtToken != nil {
|
||||
err := &configErr{rtToken, "Detected response directive on non-service"}
|
||||
*errors = append(*errors, err)
|
||||
continue
|
||||
}
|
||||
if latToken != nil {
|
||||
err := &configErr{latToken, "Detected latency directive on non-service"}
|
||||
*errors = append(*errors, err)
|
||||
continue
|
||||
}
|
||||
@@ -1654,6 +1661,7 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo
|
||||
}
|
||||
case "response", "response_type":
|
||||
rtSeen = true
|
||||
rtToken = tk
|
||||
mvs, ok := mv.(string)
|
||||
if !ok {
|
||||
err := &configErr{tk, fmt.Sprintf("Expected response type to be string, got %T", mv)}
|
||||
@@ -1712,6 +1720,7 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo
|
||||
curService.accs = accounts
|
||||
}
|
||||
case "latency":
|
||||
latToken = tk
|
||||
var err error
|
||||
lat, err = parseServiceLatency(tk, mv)
|
||||
if err != nil {
|
||||
@@ -1721,6 +1730,7 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo
|
||||
if curStream != nil {
|
||||
err = &configErr{tk, "Detected latency directive on non-service"}
|
||||
*errors = append(*errors, err)
|
||||
continue
|
||||
}
|
||||
if curService != nil {
|
||||
curService.lat = lat
|
||||
@@ -2192,7 +2202,12 @@ func parseAllowResponses(v interface{}, errors, warnings *[]error) *ResponsePerm
|
||||
tk, v = unwrapValue(v)
|
||||
switch strings.ToLower(k) {
|
||||
case "max", "max_msgs", "max_messages", "max_responses":
|
||||
rp.MaxMsgs = int(v.(int64))
|
||||
max := int(v.(int64))
|
||||
// Negative values are accepted (mean infinite), and 0
|
||||
// means default value (set above).
|
||||
if max != 0 {
|
||||
rp.MaxMsgs = max
|
||||
}
|
||||
case "expires", "expiration", "ttl":
|
||||
wd, ok := v.(string)
|
||||
if ok {
|
||||
@@ -2202,7 +2217,11 @@ func parseAllowResponses(v interface{}, errors, warnings *[]error) *ResponsePerm
|
||||
*errors = append(*errors, err)
|
||||
return nil
|
||||
}
|
||||
rp.Expires = ttl
|
||||
// Negative values are accepted (mean infinite), and 0
|
||||
// means default value (set above).
|
||||
if ttl != 0 {
|
||||
rp.Expires = ttl
|
||||
}
|
||||
} else {
|
||||
err := &configErr{tk, "error parsing expires, not a duration string"}
|
||||
*errors = append(*errors, err)
|
||||
|
||||
@@ -2177,3 +2177,101 @@ func TestSublistNoCacheConfigOnAccounts(t *testing.T) {
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func TestParsingResponsePermissions(t *testing.T) {
|
||||
template := `
|
||||
listen: "127.0.0.1:-1"
|
||||
authorization {
|
||||
users [
|
||||
{
|
||||
user: ivan
|
||||
password: pwd
|
||||
permissions {
|
||||
allow_responses {
|
||||
%s
|
||||
%s
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
`
|
||||
|
||||
check := func(t *testing.T, conf string, expectedError string, expectedMaxMsgs int, expectedTTL time.Duration) {
|
||||
t.Helper()
|
||||
opts, err := ProcessConfigFile(conf)
|
||||
if expectedError != "" {
|
||||
if err == nil || !strings.Contains(err.Error(), expectedError) {
|
||||
t.Fatalf("Expected error about %q, got %q", expectedError, err)
|
||||
}
|
||||
// OK!
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("Error on process: %v", err)
|
||||
}
|
||||
u := opts.Users[0]
|
||||
p := u.Permissions.Response
|
||||
if p == nil {
|
||||
t.Fatalf("Expected response permissions to be set, it was not")
|
||||
}
|
||||
if n := p.MaxMsgs; n != expectedMaxMsgs {
|
||||
t.Fatalf("Expected response max msgs to be %v, got %v", expectedMaxMsgs, n)
|
||||
}
|
||||
if ttl := p.Expires; ttl != expectedTTL {
|
||||
t.Fatalf("Expected response ttl to be %v, got %v", expectedTTL, ttl)
|
||||
}
|
||||
}
|
||||
|
||||
// Check defaults
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(template, "", "")))
|
||||
defer os.Remove(conf)
|
||||
check(t, conf, "", DEFAULT_ALLOW_RESPONSE_MAX_MSGS, DEFAULT_ALLOW_RESPONSE_EXPIRATION)
|
||||
|
||||
conf = createConfFile(t, []byte(fmt.Sprintf(template, "max: 10", "")))
|
||||
defer os.Remove(conf)
|
||||
check(t, conf, "", 10, DEFAULT_ALLOW_RESPONSE_EXPIRATION)
|
||||
|
||||
conf = createConfFile(t, []byte(fmt.Sprintf(template, "", "ttl: 5s")))
|
||||
defer os.Remove(conf)
|
||||
check(t, conf, "", DEFAULT_ALLOW_RESPONSE_MAX_MSGS, 5*time.Second)
|
||||
|
||||
conf = createConfFile(t, []byte(fmt.Sprintf(template, "max: 0", "")))
|
||||
defer os.Remove(conf)
|
||||
check(t, conf, "", DEFAULT_ALLOW_RESPONSE_MAX_MSGS, DEFAULT_ALLOW_RESPONSE_EXPIRATION)
|
||||
|
||||
conf = createConfFile(t, []byte(fmt.Sprintf(template, "", `ttl: "0s"`)))
|
||||
defer os.Remove(conf)
|
||||
check(t, conf, "", DEFAULT_ALLOW_RESPONSE_MAX_MSGS, DEFAULT_ALLOW_RESPONSE_EXPIRATION)
|
||||
|
||||
// Check normal values
|
||||
conf = createConfFile(t, []byte(fmt.Sprintf(template, "max: 10", `ttl: "5s"`)))
|
||||
defer os.Remove(conf)
|
||||
check(t, conf, "", 10, 5*time.Second)
|
||||
|
||||
// Check negative values ok
|
||||
conf = createConfFile(t, []byte(fmt.Sprintf(template, "max: -1", `ttl: "5s"`)))
|
||||
defer os.Remove(conf)
|
||||
check(t, conf, "", -1, 5*time.Second)
|
||||
|
||||
conf = createConfFile(t, []byte(fmt.Sprintf(template, "max: 10", `ttl: "-1s"`)))
|
||||
defer os.Remove(conf)
|
||||
check(t, conf, "", 10, -1*time.Second)
|
||||
|
||||
conf = createConfFile(t, []byte(fmt.Sprintf(template, "max: -1", `ttl: "-1s"`)))
|
||||
defer os.Remove(conf)
|
||||
check(t, conf, "", -1, -1*time.Second)
|
||||
|
||||
// Check parsing errors
|
||||
conf = createConfFile(t, []byte(fmt.Sprintf(template, "unknown_field: 123", "")))
|
||||
defer os.Remove(conf)
|
||||
check(t, conf, "Unknown field", 0, 0)
|
||||
|
||||
conf = createConfFile(t, []byte(fmt.Sprintf(template, "max: 10", "ttl: 123")))
|
||||
defer os.Remove(conf)
|
||||
check(t, conf, "not a duration string", 0, 0)
|
||||
|
||||
conf = createConfFile(t, []byte(fmt.Sprintf(template, "max: 10", "ttl: xyz")))
|
||||
defer os.Remove(conf)
|
||||
check(t, conf, "error parsing expires", 0, 0)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user