mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 11:04:42 -07:00
Fix bad sys request for different account (#3382)
When a request for a system service like $SYS.REQ.ACCOUNT.*.CONNZ is imported/exported we ensured that the requesting account is identical to the account referenced in the subject. In #3250 this check was extended from CONNZ to all $SYS.REQ.ACCOUNT.*.* requests. In general this check interferes with monitoring accounts that need to query all other accounts, not just itself. There the use case is that account A sends a request with account B in the subject. The check for equal accounts prevents this. This change removes the check to support these use cases. Instead of the check, the default export now uses exportAuth tokenPos to ensure that the 4th token is the importer account id. This guarantees that an explicit export (done by user) can only import for the own account. This change also ensures that an explicit export is not overwritten by the system. This is not a problem when the export is public. Automatic imports set the account id correctly and do not use wildcards. To cover cases where the export is private, automatically added imports are not subject a token check. Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
@@ -1440,6 +1440,12 @@ func (a *Account) lowestServiceExportResponseTime() time.Duration {
|
||||
|
||||
// AddServiceImportWithClaim will add in the service import via the jwt claim.
|
||||
func (a *Account) AddServiceImportWithClaim(destination *Account, from, to string, imClaim *jwt.Import) error {
|
||||
return a.addServiceImportWithClaim(destination, from, to, imClaim, false)
|
||||
}
|
||||
|
||||
// addServiceImportWithClaim will add in the service import via the jwt claim.
|
||||
// It will also skip the authorization check in cases where internal is true
|
||||
func (a *Account) addServiceImportWithClaim(destination *Account, from, to string, imClaim *jwt.Import, internal bool) error {
|
||||
if destination == nil {
|
||||
return ErrMissingAccount
|
||||
}
|
||||
@@ -1452,7 +1458,7 @@ func (a *Account) AddServiceImportWithClaim(destination *Account, from, to strin
|
||||
}
|
||||
|
||||
// First check to see if the account has authorized us to route to the "to" subject.
|
||||
if !destination.checkServiceImportAuthorized(a, to, imClaim) {
|
||||
if !internal && !destination.checkServiceImportAuthorized(a, to, imClaim) {
|
||||
return ErrServiceImportAuthorization
|
||||
}
|
||||
|
||||
@@ -1502,6 +1508,16 @@ func (a *Account) streamImportFormsCycle(dest *Account, to string) error {
|
||||
return dest.checkStreamImportsForCycles(to, map[string]bool{a.Name: true})
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
func (a *Account) hasServiceExportMatching(to string) bool {
|
||||
for subj := range a.exports.services {
|
||||
if subjectIsSubsetMatch(to, subj) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
func (a *Account) hasStreamExportMatching(to string) bool {
|
||||
for subj := range a.exports.streams {
|
||||
|
||||
@@ -893,15 +893,7 @@ func (s *Server) initEventTracking() {
|
||||
if tk := strings.Split(subject, tsep); len(tk) != accReqTokens {
|
||||
return _EMPTY_, fmt.Errorf("subject %q is malformed", subject)
|
||||
} else {
|
||||
acc := tk[accReqAccIndex]
|
||||
if ci, _, _, _, err := c.srv.getRequestInfo(c, msg); err == nil && ci.Account != _EMPTY_ {
|
||||
// Make sure the accounts match.
|
||||
if ci.Account != acc {
|
||||
// Do not leak too much here.
|
||||
return _EMPTY_, fmt.Errorf("bad request")
|
||||
}
|
||||
}
|
||||
return acc, nil
|
||||
return tk[accReqAccIndex], nil
|
||||
}
|
||||
}
|
||||
monAccSrvc := map[string]msgHandler{
|
||||
@@ -1054,20 +1046,32 @@ func (s *Server) addSystemAccountExports(sacc *Account) {
|
||||
return
|
||||
}
|
||||
accConnzSubj := fmt.Sprintf(accDirectReqSubj, "*", "CONNZ")
|
||||
if err := sacc.AddServiceExportWithResponse(accConnzSubj, Streamed, nil); err != nil {
|
||||
s.Errorf("Error adding system service export for %q: %v", accConnzSubj, err)
|
||||
// prioritize not automatically added exports
|
||||
if !sacc.hasServiceExportMatching(accConnzSubj) {
|
||||
// pick export type that clamps importing account id into subject
|
||||
if err := sacc.addServiceExportWithResponseAndAccountPos(accConnzSubj, Streamed, nil, 4); err != nil {
|
||||
//if err := sacc.AddServiceExportWithResponse(accConnzSubj, Streamed, nil); err != nil {
|
||||
s.Errorf("Error adding system service export for %q: %v", accConnzSubj, err)
|
||||
}
|
||||
}
|
||||
// prioritize not automatically added exports
|
||||
accStatzSubj := fmt.Sprintf(accDirectReqSubj, "*", "STATZ")
|
||||
if err := sacc.AddServiceExportWithResponse(accStatzSubj, Streamed, nil); err != nil {
|
||||
s.Errorf("Error adding system service export for %q: %v", accStatzSubj, err)
|
||||
if !sacc.hasServiceExportMatching(accStatzSubj) {
|
||||
// pick export type that clamps importing account id into subject
|
||||
if err := sacc.addServiceExportWithResponseAndAccountPos(accStatzSubj, Streamed, nil, 4); err != nil {
|
||||
s.Errorf("Error adding system service export for %q: %v", accStatzSubj, err)
|
||||
}
|
||||
}
|
||||
// FIXME(dlc) - Old experiment, Remove?
|
||||
if !sacc.hasServiceExportMatching(accSubsSubj) {
|
||||
if err := sacc.AddServiceExport(accSubsSubj, nil); err != nil {
|
||||
s.Errorf("Error adding system service export for %q: %v", accSubsSubj, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Register any accounts that existed prior.
|
||||
s.registerSystemImportsForExisting()
|
||||
|
||||
// FIXME(dlc) - Old experiment, Remove?
|
||||
if err := sacc.AddServiceExport(accSubsSubj, nil); err != nil {
|
||||
s.Errorf("Error adding system service export for %q: %v", accSubsSubj, err)
|
||||
}
|
||||
// in case of a mixed mode setup, enable js exports anyway
|
||||
if s.JetStreamEnabled() || !s.standAloneMode() {
|
||||
s.checkJetStreamExports()
|
||||
@@ -1639,7 +1643,7 @@ func (s *Server) registerSystemImports(a *Account) {
|
||||
|
||||
importSrvc := func(subj, mappedSubj string) {
|
||||
if !a.serviceImportExists(subj) {
|
||||
if err := a.AddServiceImport(sacc, subj, mappedSubj); err != nil {
|
||||
if err := a.addServiceImportWithClaim(sacc, subj, mappedSubj, nil, true); err != nil {
|
||||
s.Errorf("Error setting up system service import %s -> %s for account: %v",
|
||||
subj, mappedSubj, err)
|
||||
}
|
||||
|
||||
@@ -4106,6 +4106,219 @@ func TestJWTTimeExpiration(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func NewJwtAccountClaim(name string) (nkeys.KeyPair, string, *jwt.AccountClaims) {
|
||||
sysKp, _ := nkeys.CreateAccount()
|
||||
sysPub, _ := sysKp.PublicKey()
|
||||
claim := jwt.NewAccountClaims(sysPub)
|
||||
claim.Name = name
|
||||
return sysKp, sysPub, claim
|
||||
}
|
||||
|
||||
func TestJWTSysImportForDifferentAccount(t *testing.T) {
|
||||
_, sysPub, sysClaim := NewJwtAccountClaim("SYS")
|
||||
sysClaim.Exports.Add(&jwt.Export{
|
||||
Type: jwt.Service,
|
||||
Subject: "$SYS.REQ.ACCOUNT.*.INFO",
|
||||
})
|
||||
sysJwt, err := sysClaim.Encode(oKp)
|
||||
require_NoError(t, err)
|
||||
|
||||
// create account
|
||||
aKp, aPub, claim := NewJwtAccountClaim("A")
|
||||
claim.Imports.Add(&jwt.Import{
|
||||
Type: jwt.Service,
|
||||
Subject: "$SYS.REQ.ACCOUNT.*.INFO",
|
||||
LocalSubject: "COMMON.ADVISORY.SYS.REQ.ACCOUNT.*.INFO",
|
||||
Account: sysPub,
|
||||
})
|
||||
aJwt, err := claim.Encode(oKp)
|
||||
require_NoError(t, err)
|
||||
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: 127.0.0.1:-1
|
||||
operator: %s
|
||||
system_account: %s
|
||||
resolver: MEM
|
||||
resolver_preload: {
|
||||
%s: %s
|
||||
%s: %s
|
||||
}
|
||||
`, ojwt, sysPub, sysPub, sysJwt, aPub, aJwt)))
|
||||
defer removeFile(t, conf)
|
||||
sA, _ := RunServerWithConfig(conf)
|
||||
defer sA.Shutdown()
|
||||
|
||||
nc := natsConnect(t, sA.ClientURL(), createUserCreds(t, nil, aKp))
|
||||
defer nc.Close()
|
||||
// user for account a requests for a different account, the system account
|
||||
m, err := nc.Request(fmt.Sprintf("COMMON.ADVISORY.SYS.REQ.ACCOUNT.%s.INFO", sysPub), nil, time.Second)
|
||||
require_NoError(t, err)
|
||||
resp := &ServerAPIResponse{}
|
||||
require_NoError(t, json.Unmarshal(m.Data, resp))
|
||||
require_True(t, resp.Error == nil)
|
||||
}
|
||||
|
||||
func TestJWTSysImportFromNothing(t *testing.T) {
|
||||
_, sysPub, sysClaim := NewJwtAccountClaim("SYS")
|
||||
sysJwt, err := sysClaim.Encode(oKp)
|
||||
require_NoError(t, err)
|
||||
|
||||
// create account
|
||||
aKp, aPub, claim := NewJwtAccountClaim("A")
|
||||
claim.Imports.Add(&jwt.Import{
|
||||
Type: jwt.Service,
|
||||
// fails as it's not for own account, but system account
|
||||
Subject: jwt.Subject(fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.CONNZ", sysPub)),
|
||||
LocalSubject: "fail1",
|
||||
Account: sysPub,
|
||||
})
|
||||
claim.Imports.Add(&jwt.Import{
|
||||
Type: jwt.Service,
|
||||
// fails as it's not for own account but all accounts
|
||||
Subject: "$SYS.REQ.ACCOUNT.*.CONNZ",
|
||||
LocalSubject: "fail2.*",
|
||||
Account: sysPub,
|
||||
})
|
||||
claim.Imports.Add(&jwt.Import{
|
||||
Type: jwt.Service,
|
||||
Subject: jwt.Subject(fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.CONNZ", aPub)),
|
||||
LocalSubject: "pass",
|
||||
Account: sysPub,
|
||||
})
|
||||
aJwt, err := claim.Encode(oKp)
|
||||
require_NoError(t, err)
|
||||
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: 127.0.0.1:-1
|
||||
operator: %s
|
||||
system_account: %s
|
||||
resolver: MEM
|
||||
resolver_preload: {
|
||||
%s: %s
|
||||
%s: %s
|
||||
}
|
||||
`, ojwt, sysPub, sysPub, sysJwt, aPub, aJwt)))
|
||||
defer removeFile(t, conf)
|
||||
sA, _ := RunServerWithConfig(conf)
|
||||
defer sA.Shutdown()
|
||||
|
||||
nc := natsConnect(t, sA.ClientURL(), createUserCreds(t, nil, aKp))
|
||||
defer nc.Close()
|
||||
// user for account a requests for a different account, the system account
|
||||
_, err = nc.Request("pass", nil, time.Second)
|
||||
require_NoError(t, err)
|
||||
// default import
|
||||
_, err = nc.Request("$SYS.REQ.ACCOUNT.PING.CONNZ", nil, time.Second)
|
||||
require_NoError(t, err)
|
||||
_, err = nc.Request("fail1", nil, time.Second)
|
||||
require_Error(t, err)
|
||||
require_Contains(t, err.Error(), "no responders")
|
||||
// fails even for own account, as the import itself is bad
|
||||
_, err = nc.Request("fail2."+aPub, nil, time.Second)
|
||||
require_Error(t, err)
|
||||
require_Contains(t, err.Error(), "no responders")
|
||||
}
|
||||
|
||||
func TestJWTSysImportOverwritePublic(t *testing.T) {
|
||||
_, sysPub, sysClaim := NewJwtAccountClaim("SYS")
|
||||
// this changes the export permissions to allow for requests for every account
|
||||
sysClaim.Exports.Add(&jwt.Export{
|
||||
Type: jwt.Service,
|
||||
Subject: "$SYS.REQ.ACCOUNT.*.>",
|
||||
})
|
||||
sysJwt, err := sysClaim.Encode(oKp)
|
||||
require_NoError(t, err)
|
||||
|
||||
// create account
|
||||
aKp, aPub, claim := NewJwtAccountClaim("A")
|
||||
claim.Imports.Add(&jwt.Import{
|
||||
Type: jwt.Service,
|
||||
Subject: jwt.Subject(fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.CONNZ", sysPub)),
|
||||
LocalSubject: "pass1",
|
||||
Account: sysPub,
|
||||
})
|
||||
claim.Imports.Add(&jwt.Import{
|
||||
Type: jwt.Service,
|
||||
Subject: jwt.Subject(fmt.Sprintf("$SYS.REQ.ACCOUNT.%s.CONNZ", aPub)),
|
||||
LocalSubject: "pass2",
|
||||
Account: sysPub,
|
||||
})
|
||||
claim.Imports.Add(&jwt.Import{
|
||||
Type: jwt.Service,
|
||||
Subject: "$SYS.REQ.ACCOUNT.*.CONNZ",
|
||||
LocalSubject: "pass3.*",
|
||||
Account: sysPub,
|
||||
})
|
||||
aJwt, err := claim.Encode(oKp)
|
||||
require_NoError(t, err)
|
||||
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: 127.0.0.1:-1
|
||||
operator: %s
|
||||
system_account: %s
|
||||
resolver: MEM
|
||||
resolver_preload: {
|
||||
%s: %s
|
||||
%s: %s
|
||||
}
|
||||
`, ojwt, sysPub, sysPub, sysJwt, aPub, aJwt)))
|
||||
defer removeFile(t, conf)
|
||||
sA, _ := RunServerWithConfig(conf)
|
||||
defer sA.Shutdown()
|
||||
|
||||
nc := natsConnect(t, sA.ClientURL(), createUserCreds(t, nil, aKp))
|
||||
defer nc.Close()
|
||||
// user for account a requests for a different account, the system account
|
||||
_, err = nc.Request("pass1", nil, time.Second)
|
||||
require_NoError(t, err)
|
||||
_, err = nc.Request("pass2", nil, time.Second)
|
||||
require_NoError(t, err)
|
||||
_, err = nc.Request("pass3."+sysPub, nil, time.Second)
|
||||
require_NoError(t, err)
|
||||
_, err = nc.Request("pass3."+aPub, nil, time.Second)
|
||||
require_NoError(t, err)
|
||||
_, err = nc.Request("pass3.PING", nil, time.Second)
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
func TestJWTSysImportOverwriteToken(t *testing.T) {
|
||||
_, sysPub, sysClaim := NewJwtAccountClaim("SYS")
|
||||
// this changes the export permissions in a way that the internal imports can't satisfy
|
||||
sysClaim.Exports.Add(&jwt.Export{
|
||||
Type: jwt.Service,
|
||||
Subject: "$SYS.REQ.>",
|
||||
TokenReq: true,
|
||||
})
|
||||
|
||||
sysJwt, err := sysClaim.Encode(oKp)
|
||||
require_NoError(t, err)
|
||||
|
||||
// create account
|
||||
aKp, aPub, claim := NewJwtAccountClaim("A")
|
||||
aJwt, err := claim.Encode(oKp)
|
||||
require_NoError(t, err)
|
||||
|
||||
conf := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: 127.0.0.1:-1
|
||||
operator: %s
|
||||
system_account: %s
|
||||
resolver: MEM
|
||||
resolver_preload: {
|
||||
%s: %s
|
||||
%s: %s
|
||||
}
|
||||
`, ojwt, sysPub, sysPub, sysJwt, aPub, aJwt)))
|
||||
defer removeFile(t, conf)
|
||||
sA, _ := RunServerWithConfig(conf)
|
||||
defer sA.Shutdown()
|
||||
|
||||
nc := natsConnect(t, sA.ClientURL(), createUserCreds(t, nil, aKp))
|
||||
defer nc.Close()
|
||||
// make sure the internal import still got added
|
||||
_, err = nc.Request("$SYS.REQ.ACCOUNT.PING.CONNZ", nil, time.Second)
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
func TestJWTLimits(t *testing.T) {
|
||||
doNotExpire := time.Now().AddDate(1, 0, 0)
|
||||
// create account
|
||||
|
||||
Reference in New Issue
Block a user