[Adding] support for account_token_position (#1874)

This change does 4 things:
Refactor to only have one function to validate imports.
Have this function support the jwt field account_token_position.
For completeness make this value configurable as well.
unit tests.

Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
Matthias Hanel
2021-02-01 19:51:36 -05:00
committed by GitHub
parent ea68afe5e2
commit 3799b90011
6 changed files with 418 additions and 54 deletions

8
go.sum
View File

@@ -3,9 +3,11 @@ github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:x
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.11.7 h1:0hzRabrMN4tSTvMfnL3SCv1ZGeAP23ynzodBgaHeMeg=
github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
@@ -27,10 +29,6 @@ github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5H
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I=
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
github.com/nats-io/nats.go v1.10.1-0.20201021145452-94be476ad6e0/go.mod h1:VU2zERjp8xmF+Lw2NH4u2t5qWZxwc7jB3+7HVMWQXPI=
github.com/nats-io/nats.go v1.10.1-0.20210122204956-b8ea7fc17ea6 h1:cpS+9uyfHXvRG/Q+WcDd3KXRgPa9fo9tDbIeDHCxYAg=
github.com/nats-io/nats.go v1.10.1-0.20210122204956-b8ea7fc17ea6/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI=
github.com/nats-io/nats.go v1.10.1-0.20210123004354-58bf69ad2df8 h1:yxExhj0DStfAEN5lGy6pyL4WJE+J8aKn50xoKt9hFdA=
github.com/nats-io/nats.go v1.10.1-0.20210123004354-58bf69ad2df8/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI=
github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a h1:EjwBk6T/arS7o0ZGdMgdzYrQHeUITT1GHf3cFQFtr3I=
github.com/nats-io/nats.go v1.10.1-0.20210127212649-5b4924938a9a/go.mod h1:Sa3kLIonafChP5IF0b55i9uvGR10I3hPETFbi4+9kOI=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
@@ -54,10 +52,12 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=

View File

@@ -171,8 +171,9 @@ func (rt ServiceRespType) String() string {
// exportAuth holds configured approvals or boolean indicating an
// auth token is required for import.
type exportAuth struct {
tokenReq bool
approved map[string]*Account
tokenReq bool
accountPos uint
approved map[string]*Account
}
// streamExport
@@ -877,13 +878,49 @@ func (a *Account) removeClient(c *client) int {
return n
}
func setExportAuth(ea *exportAuth, subject string, accounts []*Account, accountPos uint) error {
if accountPos > 0 {
token := strings.Split(subject, ".")
if len(token) < int(accountPos) || token[accountPos-1] != "*" {
return ErrInvalidSubject
}
}
ea.accountPos = accountPos
// empty means auth required but will be import token.
if accounts == nil {
return nil
}
if len(accounts) == 0 {
ea.tokenReq = true
return nil
}
if ea.approved == nil {
ea.approved = make(map[string]*Account, len(accounts))
}
for _, acc := range accounts {
ea.approved[acc.Name] = acc
}
return nil
}
// AddServiceExport will configure the account with the defined export.
func (a *Account) AddServiceExport(subject string, accounts []*Account) error {
return a.AddServiceExportWithResponse(subject, Singleton, accounts)
return a.addServiceExportWithResponseAndAccountPos(subject, Singleton, accounts, 0)
}
// AddServiceExport will configure the account with the defined export.
func (a *Account) addServiceExportWithAccountPos(subject string, accounts []*Account, accountPos uint) error {
return a.addServiceExportWithResponseAndAccountPos(subject, Singleton, accounts, accountPos)
}
// AddServiceExportWithResponse will configure the account with the defined export and response type.
func (a *Account) AddServiceExportWithResponse(subject string, respType ServiceRespType, accounts []*Account) error {
return a.addServiceExportWithResponseAndAccountPos(subject, respType, accounts, 0)
}
// AddServiceExportWithresponse will configure the account with the defined export and response type.
func (a *Account) addServiceExportWithResponseAndAccountPos(
subject string, respType ServiceRespType, accounts []*Account, accountPos uint) error {
if a == nil {
return ErrMissingAccount
}
@@ -905,17 +942,12 @@ func (a *Account) AddServiceExportWithResponse(subject string, respType ServiceR
se.respType = respType
}
if accounts != nil {
// empty means auth required but will be import token.
if len(accounts) == 0 {
se.tokenReq = true
} else {
if se.approved == nil {
se.approved = make(map[string]*Account, len(accounts))
}
for _, acc := range accounts {
se.approved[acc.Name] = acc
}
if accounts != nil || accountPos > 0 {
if se == nil {
se = &serviceExport{}
}
if err := setExportAuth(&se.exportAuth, subject, accounts, accountPos); err != nil {
return err
}
}
lrt := a.lowestServiceExportResponseTime()
@@ -2280,8 +2312,16 @@ func (a *Account) AddStreamImport(account *Account, from, prefix string) error {
var IsPublicExport = []*Account(nil)
// AddStreamExport will add an export to the account. If accounts is nil
// it will signify a public export, meaning anyone can impoort.
// it will signify a public export, meaning anyone can import.
func (a *Account) AddStreamExport(subject string, accounts []*Account) error {
return a.addStreamExportWithAccountPos(subject, accounts, 0)
}
// AddStreamExport will add an export to the account. If accounts is nil
// it will signify a public export, meaning anyone can import.
// if accountPos is > 0, all imports will be granted where the following holds:
// strings.Split(subject, ".")[accountPos] == account id will be granted.
func (a *Account) addStreamExportWithAccountPos(subject string, accounts []*Account, accountPos uint) error {
if a == nil {
return ErrMissingAccount
}
@@ -2293,20 +2333,12 @@ func (a *Account) AddStreamExport(subject string, accounts []*Account) error {
a.exports.streams = make(map[string]*streamExport)
}
ea := a.exports.streams[subject]
if accounts != nil {
if accounts != nil || accountPos > 0 {
if ea == nil {
ea = &streamExport{}
}
// empty means auth required but will be import token.
if len(accounts) == 0 {
ea.tokenReq = true
} else {
if ea.approved == nil {
ea.approved = make(map[string]*Account, len(accounts))
}
for _, acc := range accounts {
ea.approved[acc.Name] = acc
}
if err := setExportAuth(&ea.exportAuth, subject, accounts, accountPos); err != nil {
return err
}
}
a.exports.streams[subject] = ea
@@ -2329,15 +2361,22 @@ func (a *Account) checkStreamImportAuthorizedNoLock(account *Account, subject st
return a.checkStreamExportApproved(account, subject, imClaim)
}
func (a *Account) checkAuth(ea *exportAuth, account *Account, imClaim *jwt.Import) bool {
func (a *Account) checkAuth(ea *exportAuth, account *Account, imClaim *jwt.Import, tokens []string) bool {
// if ea is nil or ea.approved is nil, that denotes a public export
if ea == nil || (ea.approved == nil && !ea.tokenReq) {
if ea == nil || (ea.approved == nil && !ea.tokenReq && ea.accountPos == 0) {
return true
}
// Check if the export is protected and enforces presence of importing account identity
if ea.accountPos > 0 {
return ea.accountPos <= uint(len(tokens)) && tokens[ea.accountPos-1] == account.Name
}
// Check if token required
if ea.tokenReq {
return a.checkActivation(account, imClaim, true)
}
if ea.approved == nil {
return false
}
// If we have a matching account we are authorized
_, ok := ea.approved[account.Name]
return ok
@@ -2347,10 +2386,11 @@ func (a *Account) checkStreamExportApproved(account *Account, subject string, im
// Check direct match of subject first
ea, ok := a.exports.streams[subject]
if ok {
// if ea is nil or eq.approved is nil, that denotes a public export
if ea == nil {
return true
}
return a.checkAuth(&ea.exportAuth, account, imClaim)
return a.checkAuth(&ea.exportAuth, account, imClaim, nil)
}
// ok if we are here we did not match directly so we need to test each one.
// The import subject arg has to take precedence, meaning the export
@@ -2362,7 +2402,7 @@ func (a *Account) checkStreamExportApproved(account *Account, subject string, im
if ea == nil {
return true
}
return a.checkAuth(&ea.exportAuth, account, imClaim)
return a.checkAuth(&ea.exportAuth, account, imClaim, tokens)
}
}
return false
@@ -2372,17 +2412,11 @@ func (a *Account) checkServiceExportApproved(account *Account, subject string, i
// Check direct match of subject first
se, ok := a.exports.services[subject]
if ok {
// if se is nil or eq.approved is nil, that denotes a public export
if se == nil || (se.approved == nil && !se.tokenReq) {
// if ea is nil or eq.approved is nil, that denotes a public export
if se == nil {
return true
}
// Check if token required
if se.tokenReq {
return a.checkActivation(account, imClaim, true)
}
// If we have a matching account we are authorized
_, ok := se.approved[account.Name]
return ok
return a.checkAuth(&se.exportAuth, account, imClaim, nil)
}
// ok if we are here we did not match directly so we need to test each one.
// The import subject arg has to take precedence, meaning the export
@@ -2391,15 +2425,10 @@ func (a *Account) checkServiceExportApproved(account *Account, subject string, i
tokens := strings.Split(subject, tsep)
for subj, se := range a.exports.services {
if isSubsetMatch(tokens, subj) {
if se == nil || (se.approved == nil && !se.tokenReq) {
if se == nil {
return true
}
// Check if token required
if se.tokenReq {
return a.checkActivation(account, imClaim, true)
}
_, ok := se.approved[account.Name]
return ok
return a.checkAuth(&se.exportAuth, account, imClaim, tokens)
}
}
return false
@@ -2868,7 +2897,8 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
switch e.Type {
case jwt.Stream:
s.Debugf("Adding stream export %q for %s", e.Subject, a.Name)
if err := a.AddStreamExport(string(e.Subject), authAccounts(e.TokenReq)); err != nil {
if err := a.addStreamExportWithAccountPos(
string(e.Subject), authAccounts(e.TokenReq), e.AccountTokenPosition); err != nil {
s.Debugf("Error adding stream export to account [%s]: %v", a.Name, err.Error())
}
case jwt.Service:
@@ -2880,7 +2910,8 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
case jwt.ResponseTypeChunked:
rt = Chunked
}
if err := a.AddServiceExportWithResponse(string(e.Subject), rt, authAccounts(e.TokenReq)); err != nil {
if err := a.addServiceExportWithResponseAndAccountPos(
string(e.Subject), rt, authAccounts(e.TokenReq), e.AccountTokenPosition); err != nil {
s.Debugf("Error adding service export to account [%s]: %v", a.Name, err)
continue
}

View File

@@ -594,6 +594,31 @@ func TestImportAuthorized(t *testing.T) {
checkBool(foo.checkStreamImportAuthorized(bar, "*.*", nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, "*.>", nil), true, t)
_, foo, bar = simpleAccountServer(t)
foo.addStreamExportWithAccountPos("foo.*", []*Account{}, 2)
foo.addStreamExportWithAccountPos("bar.*.foo", []*Account{}, 2)
if err := foo.addStreamExportWithAccountPos("baz.*.>", []*Account{}, 3); err == nil {
t.Fatal("expected error")
}
checkBool(foo.checkStreamImportAuthorized(bar, fmt.Sprintf("foo.%s", bar.Name), nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, fmt.Sprintf("bar.%s.foo", bar.Name), nil), true, t)
checkBool(foo.checkStreamImportAuthorized(bar, fmt.Sprintf("baz.foo.%s", bar.Name), nil), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "foo.X", nil), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "bar.X.foo", nil), false, t)
checkBool(foo.checkStreamImportAuthorized(bar, "baz.foo.X", nil), false, t)
foo.addServiceExportWithAccountPos("a.*", []*Account{}, 2)
foo.addServiceExportWithAccountPos("b.*.a", []*Account{}, 2)
if err := foo.addServiceExportWithAccountPos("c.*.>", []*Account{}, 3); err == nil {
t.Fatal("expected error")
}
checkBool(foo.checkServiceImportAuthorized(bar, fmt.Sprintf("a.%s", bar.Name), nil), true, t)
checkBool(foo.checkServiceImportAuthorized(bar, fmt.Sprintf("b.%s.a", bar.Name), nil), true, t)
checkBool(foo.checkServiceImportAuthorized(bar, fmt.Sprintf("c.a.%s", bar.Name), nil), false, t)
checkBool(foo.checkServiceImportAuthorized(bar, "a.X", nil), false, t)
checkBool(foo.checkServiceImportAuthorized(bar, "b.X.a", nil), false, t)
checkBool(foo.checkServiceImportAuthorized(bar, "c.a.X", nil), false, t)
// Reset and test pwc and fwc
s, foo, bar := simpleAccountServer(t)
foo.AddStreamExport("foo.*.baz.>", []*Account{bar})

View File

@@ -5249,3 +5249,183 @@ func TestJWTStrictSigningKeys(t *testing.T) {
connectTest(s.ClientURL())
})
}
func TestJWTAccountProtectedImport(t *testing.T) {
srvFmt := `
port: -1
operator = %s
resolver: MEMORY
resolver_preload = {
%s : "%s"
%s : "%s"
} `
setupAccounts := func(pass bool) (nkeys.KeyPair, string, string, string, nkeys.KeyPair, string, string, string, string) {
// Create accounts and imports/exports.
exportKP, _ := nkeys.CreateAccount()
exportPub, _ := exportKP.PublicKey()
exportAC := jwt.NewAccountClaims(exportPub)
exportAC.Exports.Add(&jwt.Export{Subject: "service.*", Type: jwt.Service, AccountTokenPosition: 2})
exportAC.Exports.Add(&jwt.Export{Subject: "stream.*", Type: jwt.Stream, AccountTokenPosition: 2})
exportJWT, err := exportAC.Encode(oKp)
require_NoError(t, err)
// create alternative exporter jwt without account token pos set
exportAC.Exports = jwt.Exports{}
exportAC.Exports.Add(&jwt.Export{Subject: "service.*", Type: jwt.Service})
exportAC.Exports.Add(&jwt.Export{Subject: "stream.*", Type: jwt.Stream})
exportJWTNoPos, err := exportAC.Encode(oKp)
require_NoError(t, err)
importKP, _ := nkeys.CreateAccount()
importPub, _ := importKP.PublicKey()
importAc := jwt.NewAccountClaims(importPub)
srvcSub, strmSub := "service.foo", "stream.foo"
if pass {
srvcSub = fmt.Sprintf("service.%s", importPub)
strmSub = fmt.Sprintf("stream.%s", importPub)
}
importAc.Imports.Add(&jwt.Import{Account: exportPub, Subject: jwt.Subject(srvcSub), Type: jwt.Service})
importAc.Imports.Add(&jwt.Import{Account: exportPub, Subject: jwt.Subject(strmSub), Type: jwt.Stream})
importJWT, err := importAc.Encode(oKp)
require_NoError(t, err)
return exportKP, exportPub, exportJWT, exportJWTNoPos, importKP, importPub, importJWT, srvcSub, strmSub
}
t.Run("pass", func(t *testing.T) {
exportKp, exportPub, exportJWT, _, importKp, importPub, importJWT, srvcSub, strmSub := setupAccounts(true)
cf := createConfFile(t, []byte(fmt.Sprintf(srvFmt, ojwt, exportPub, exportJWT, importPub, importJWT)))
defer os.Remove(cf)
s, _ := RunServerWithConfig(cf)
defer s.Shutdown()
ncExp := natsConnect(t, s.ClientURL(), createUserCreds(t, s, exportKp))
ncImp := natsConnect(t, s.ClientURL(), createUserCreds(t, s, importKp))
t.Run("service", func(t *testing.T) {
sub, err := ncExp.Subscribe("service.*", func(msg *nats.Msg) {
msg.Respond([]byte("world"))
})
defer sub.Unsubscribe()
require_NoError(t, err)
ncExp.Flush()
msg, err := ncImp.Request(srvcSub, []byte("hello"), time.Second)
require_NoError(t, err)
require_Equal(t, string(msg.Data), "world")
})
t.Run("stream", func(t *testing.T) {
msgChan := make(chan *nats.Msg, 4)
defer close(msgChan)
sub, err := ncImp.ChanSubscribe(strmSub, msgChan)
defer sub.Unsubscribe()
require_NoError(t, err)
ncImp.Flush()
err = ncExp.Publish("stream.foo", []byte("hello"))
require_NoError(t, err)
err = ncExp.Publish(strmSub, []byte("hello"))
require_NoError(t, err)
msg := <-msgChan
require_Equal(t, string(msg.Data), "hello")
require_True(t, len(msgChan) == 0)
})
})
t.Run("fail", func(t *testing.T) {
exportKp, exportPub, exportJWT, _, importKp, importPub, importJWT, srvcSub, strmSub := setupAccounts(false)
cf := createConfFile(t, []byte(fmt.Sprintf(srvFmt, ojwt, exportPub, exportJWT, importPub, importJWT)))
defer os.Remove(cf)
s, _ := RunServerWithConfig(cf)
defer s.Shutdown()
ncExp := natsConnect(t, s.ClientURL(), createUserCreds(t, s, exportKp))
ncImp := natsConnect(t, s.ClientURL(), createUserCreds(t, s, importKp))
t.Run("service", func(t *testing.T) {
sub, err := ncExp.Subscribe("service.*", func(msg *nats.Msg) {
msg.Respond([]byte("world"))
})
defer sub.Unsubscribe()
require_NoError(t, err)
ncExp.Flush()
_, err = ncImp.Request(srvcSub, []byte("hello"), time.Second)
require_Error(t, err)
require_Contains(t, err.Error(), "no responders available for request")
})
t.Run("stream", func(t *testing.T) {
msgChan := make(chan *nats.Msg, 4)
defer close(msgChan)
_, err := ncImp.ChanSubscribe(strmSub, msgChan)
require_NoError(t, err)
ncImp.Flush()
err = ncExp.Publish("stream.foo", []byte("hello"))
require_NoError(t, err)
err = ncExp.Publish(strmSub, []byte("hello"))
require_NoError(t, err)
select {
case <-msgChan:
t.Fatal("did not expect a message")
case <-time.After(250 * time.Millisecond):
}
require_True(t, len(msgChan) == 0)
})
})
t.Run("reload-off-2-on", func(t *testing.T) {
exportKp, exportPub, exportJWTOn, exportJWTOff, importKp, _, importJWT, srvcSub, strmSub := setupAccounts(false)
dirSrv := createDir(t, "srv")
defer os.RemoveAll(dirSrv)
// set up system account. Relying bootstrapping system account to not create JWT
sysAcc, err := nkeys.CreateAccount()
require_NoError(t, err)
sysPub, err := sysAcc.PublicKey()
require_NoError(t, err)
sysUsrCreds := newUserEx(t, sysAcc, false, sysPub)
defer os.Remove(sysUsrCreds)
cf := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
operator = %s
system_account = %s
resolver: {
type: full
dir: %s
}`, ojwt, sysPub, dirSrv)))
defer os.Remove(cf)
s, _ := RunServerWithConfig(cf)
defer s.Shutdown()
updateJwt(t, s.ClientURL(), sysUsrCreds, importJWT, 1)
updateJwt(t, s.ClientURL(), sysUsrCreds, exportJWTOff, 1)
ncExp := natsConnect(t, s.ClientURL(), createUserCreds(t, s, exportKp))
ncImp := natsConnect(t, s.ClientURL(), createUserCreds(t, s, importKp))
msgChan := make(chan *nats.Msg, 4)
defer close(msgChan)
// ensure service passes
subSrvc, err := ncExp.Subscribe("service.*", func(msg *nats.Msg) {
msg.Respond([]byte("world"))
})
defer subSrvc.Unsubscribe()
require_NoError(t, err)
ncExp.Flush()
respMst, err := ncImp.Request(srvcSub, []byte("hello"), time.Second)
require_NoError(t, err)
require_Equal(t, string(respMst.Data), "world")
// ensure stream passes
subStrm, err := ncImp.ChanSubscribe(strmSub, msgChan)
defer subStrm.Unsubscribe()
require_NoError(t, err)
ncImp.Flush()
err = ncExp.Publish(strmSub, []byte("hello"))
require_NoError(t, err)
msg := <-msgChan
require_Equal(t, string(msg.Data), "hello")
require_True(t, len(msgChan) == 0)
updateJwt(t, s.ClientURL(), sysUsrCreds, exportJWTOn, 1)
// ensure service fails
_, err = ncImp.Request(srvcSub, []byte("hello"), time.Second)
require_Error(t, err)
require_Contains(t, err.Error(), "timeout")
s.AccountResolver().Store(exportPub, exportJWTOn)
// ensure stream fails
err = ncExp.Publish(strmSub, []byte("hello"))
require_NoError(t, err)
select {
case <-msgChan:
t.Fatal("did not expect a message")
case <-time.After(250 * time.Millisecond):
}
require_True(t, len(msgChan) == 0)
})
}

View File

@@ -1941,6 +1941,7 @@ type export struct {
rt ServiceRespType
lat *serviceLatency
rthr time.Duration
tPos uint
}
type importStream struct {
@@ -2278,7 +2279,7 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er
}
accounts = append(accounts, ta)
}
if err := stream.acc.AddStreamExport(stream.sub, accounts); err != nil {
if err := stream.acc.addStreamExportWithAccountPos(stream.sub, accounts, stream.tPos); err != nil {
msg := fmt.Sprintf("Error adding stream export %q: %v", stream.sub, err)
*errors = append(*errors, &configErr{tk, msg})
continue
@@ -2296,7 +2297,7 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er
}
accounts = append(accounts, ta)
}
if err := service.acc.AddServiceExportWithResponse(service.sub, service.rt, accounts); err != nil {
if err := service.acc.addServiceExportWithResponseAndAccountPos(service.sub, service.rt, accounts, service.tPos); err != nil {
msg := fmt.Sprintf("Error adding service export %q: %v", service.sub, err)
*errors = append(*errors, &configErr{tk, msg})
continue
@@ -2497,6 +2498,7 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo
thresh time.Duration
latToken token
lt token
accTokPos uint
)
defer convertPanicToErrorList(&lt, errors)
@@ -2645,6 +2647,8 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo
if curService != nil {
curService.lat = lat
}
case "account_token_position":
accTokPos = uint(mv.(int64))
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
@@ -2657,6 +2661,12 @@ func parseExportStreamOrService(v interface{}, errors, warnings *[]error) (*expo
}
}
}
if curStream != nil {
curStream.tPos = accTokPos
}
if curService != nil {
curService.tPos = accTokPos
}
return curStream, curService, nil
}

View File

@@ -24,6 +24,7 @@ import (
"reflect"
"runtime"
"strings"
"sync"
"testing"
"time"
@@ -1815,6 +1816,123 @@ func TestParseServiceLatency(t *testing.T) {
}
}
func TestParseExport(t *testing.T) {
conf := `
system_account: sys
accounts {
sys {
exports [{
stream "$SYS.SERVER.ACCOUNT.*.CONNS"
account_token_position 4
}]
}
accE {
exports [{
service foo.*
account_token_position 2
}]
users [{
user ue
password pwd
}],
}
accI1 {
imports [{
service {
account accE
subject foo.accI1
}
to foo
},{
stream {
account sys
subject "$SYS.SERVER.ACCOUNT.accI1.CONNS"
}
}],
users [{
user u1
password pwd
}],
}
accI2 {
imports [{
service {
account accE
subject foo.accI2
}
to foo
},{
stream {
account sys
subject "$SYS.SERVER.ACCOUNT.accI2.CONNS"
}
}],
users [{
user u2
password pwd
}],
}
}`
f := createConfFile(t, []byte(conf))
s, o := RunServerWithConfig(f)
if s == nil {
t.Fatal("Failed startup")
}
defer s.Shutdown()
defer os.Remove(f)
connect := func(user string) *nats.Conn {
nc, err := nats.Connect(fmt.Sprintf("nats://%s:pwd@%s:%d", user, o.Host, o.Port))
require_NoError(t, err)
return nc
}
nc1 := connect("u1")
defer nc1.Close()
nc2 := connect("u2")
defer nc1.Close()
subscribe := func(nc *nats.Conn, msgs int, subj string) (*sync.WaitGroup, *nats.Subscription) {
wg := sync.WaitGroup{}
wg.Add(msgs)
sub, err := nc.Subscribe(subj, func(msg *nats.Msg) {
if msg.Reply != _EMPTY_ {
msg.Respond(msg.Data)
}
wg.Done()
})
require_NoError(t, err)
nc.Flush()
return &wg, sub
}
//Subscribe to CONNS events
wg1, s1 := subscribe(nc1, 2, "$SYS.SERVER.ACCOUNT.accI1.CONNS")
defer s1.Unsubscribe()
wg2, s2 := subscribe(nc2, 2, "$SYS.SERVER.ACCOUNT.accI2.CONNS")
defer s2.Unsubscribe()
// Trigger 2 CONNS event
nc3 := connect("u1")
nc3.Close()
nc4 := connect("u2")
nc4.Close()
// test service
ncE := connect("ue")
defer ncE.Close()
wge, se := subscribe(ncE, 2, "foo.*")
defer se.Unsubscribe()
request := func(nc *nats.Conn, msg string) {
if m, err := nc.Request("foo", []byte(msg), time.Second); err != nil {
t.Fatal("Failed request ", msg, err)
} else if m == nil {
t.Fatal("No response msg")
} else if string(m.Data) != msg {
t.Fatal("Wrong response msg", string(m.Data))
}
}
request(nc1, "1")
request(nc2, "1")
for _, wg := range []*sync.WaitGroup{wge, wg1, wg2} {
wg.Wait()
}
}
func TestAccountUsersLoadedProperly(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: "127.0.0.1:-1"