Merge pull request #2816 from nats-io/revocation-issue-442

Fix jwt based user/activation token revocation and granularity
This commit is contained in:
Ivan Kozlovic
2022-01-25 13:42:14 -07:00
committed by GitHub
4 changed files with 301 additions and 118 deletions

2
go.sum
View File

@@ -16,8 +16,6 @@ github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc h1:SHr4MUUZJ/fAC0uSm2OzWOJYsHpapmR86mpw7q1qPXU=
github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d h1:GRSmEJutHkdoxKsRypP575IIdoXe7Bm6yHQF6GcDBnA=
github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=

View File

@@ -69,7 +69,6 @@ type Account struct {
rm map[string]int32
lqws map[string]int32
usersRevoked map[string]int64
actsRevoked map[string]int64
mappings []*mapping
lleafs []*client
imports importMap
@@ -178,9 +177,10 @@ func (rt ServiceRespType) String() string {
// exportAuth holds configured approvals or boolean indicating an
// auth token is required for import.
type exportAuth struct {
tokenReq bool
accountPos uint
approved map[string]*Account
tokenReq bool
accountPos uint
approved map[string]*Account
actsRevoked map[string]int64
}
// streamExport
@@ -2455,7 +2455,7 @@ func (a *Account) checkAuth(ea *exportAuth, account *Account, imClaim *jwt.Impor
}
// Check if token required
if ea.tokenReq {
return a.checkActivation(account, imClaim, true)
return a.checkActivation(account, imClaim, ea, true)
}
if ea.approved == nil {
return false
@@ -2562,7 +2562,7 @@ func (a *Account) streamActivationExpired(exportAcc *Account, subject string) {
}
a.mu.RUnlock()
if si.acc.checkActivation(a, si.claim, false) {
if si.acc.checkActivation(a, si.claim, nil, false) {
// The token has been updated most likely and we are good to go.
return
}
@@ -2594,7 +2594,7 @@ func (a *Account) serviceActivationExpired(subject string) {
}
a.mu.RUnlock()
if si.acc.checkActivation(a, si.claim, false) {
if si.acc.checkActivation(a, si.claim, nil, false) {
// The token has been updated most likely and we are good to go.
return
}
@@ -2616,17 +2616,20 @@ func (a *Account) activationExpired(exportAcc *Account, subject string, kind jwt
}
func isRevoked(revocations map[string]int64, subject string, issuedAt int64) bool {
if revocations == nil {
if len(revocations) == 0 {
return false
}
if t, ok := revocations[subject]; !ok || t < issuedAt {
return false
if t, ok := revocations[jwt.All]; !ok || t < issuedAt {
return false
}
}
return true
}
// checkActivation will check the activation token for validity.
func (a *Account) checkActivation(importAcc *Account, claim *jwt.Import, expTimer bool) bool {
// ea may only be nil in cases where revocation may not be checked, say triggered by expiration timer.
func (a *Account) checkActivation(importAcc *Account, claim *jwt.Import, ea *exportAuth, expTimer bool) bool {
if claim == nil || claim.Token == _EMPTY_ {
return false
}
@@ -2662,8 +2665,11 @@ func (a *Account) checkActivation(importAcc *Account, claim *jwt.Import, expTime
})
}
}
if ea == nil {
return true
}
// Check for token revocation..
return !isRevoked(a.actsRevoked, act.Subject, act.IssuedAt)
return !isRevoked(ea.actsRevoked, act.Subject, act.IssuedAt)
}
// Returns true if the activation claim is trusted. That is the issuer matches
@@ -2936,9 +2942,6 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
delete(a.imports.services, k)
}
// Reset any notion of export revocations.
a.actsRevoked = nil
alteredScope := map[string]struct{}{}
// update account signing keys
@@ -3013,6 +3016,9 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
s.checkJetStreamExports()
}
streamTokenExpirationChanged := false
serviceTokenExpirationChanged := false
for _, e := range ac.Exports {
switch e.Type {
case jwt.Stream:
@@ -3052,17 +3058,44 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
}
}
}
// We will track these at the account level. Should not have any collisions.
if e.Revocations != nil {
a.mu.Lock()
if a.actsRevoked == nil {
a.actsRevoked = make(map[string]int64)
var revocationChanged *bool
var ea *exportAuth
a.mu.Lock()
switch e.Type {
case jwt.Stream:
revocationChanged = &streamTokenExpirationChanged
if se, ok := a.exports.streams[string(e.Subject)]; ok && se != nil {
ea = &se.exportAuth
}
for k, t := range e.Revocations {
a.actsRevoked[k] = t
case jwt.Service:
revocationChanged = &serviceTokenExpirationChanged
if se, ok := a.exports.services[string(e.Subject)]; ok && se != nil {
ea = &se.exportAuth
}
a.mu.Unlock()
}
if ea != nil {
oldRevocations := ea.actsRevoked
if len(e.Revocations) == 0 {
// remove all, no need to evaluate existing imports
ea.actsRevoked = nil
} else if len(oldRevocations) == 0 {
// add all, existing imports need to be re evaluated
ea.actsRevoked = e.Revocations
*revocationChanged = true
} else {
ea.actsRevoked = e.Revocations
// diff, existing imports need to be conditionally re evaluated, depending on:
// if a key was added, or it's timestamp increased
for k, t := range e.Revocations {
if tOld, ok := oldRevocations[k]; !ok || tOld < t {
*revocationChanged = true
}
}
}
}
a.mu.Unlock()
}
var incompleteImports []*jwt.Import
for _, i := range ac.Imports {
@@ -3116,7 +3149,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
}
}
// Now check if stream exports have changed.
if !a.checkStreamExportsEqual(old) || signersChanged {
if !a.checkStreamExportsEqual(old) || signersChanged || streamTokenExpirationChanged {
clients := map[*client]struct{}{}
// We need to check all accounts that have an import claim from this account.
awcsti := map[string]struct{}{}
@@ -3149,7 +3182,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
}
}
// Now check if service exports have changed.
if !a.checkServiceExportsEqual(old) || signersChanged {
if !a.checkServiceExportsEqual(old) || signersChanged || serviceTokenExpirationChanged {
s.accounts.Range(func(k, v interface{}) bool {
acc := v.(*Account)
// Move to the next if this account is actually account "a".

View File

@@ -4376,54 +4376,59 @@ func TestJWTJetStreamLimits(t *testing.T) {
}
func TestJWTUserRevocation(t *testing.T) {
createAccountAndUser := func(done chan struct{}, pubKey, jwt1, jwt2, creds1, creds2 *string) {
t.Helper()
kp, _ := nkeys.CreateAccount()
*pubKey, _ = kp.PublicKey()
claim := jwt.NewAccountClaims(*pubKey)
var err error
*jwt1, err = claim.Encode(oKp)
require_NoError(t, err)
test := func(all bool) {
createAccountAndUser := func(done chan struct{}, pubKey, jwt1, jwt2, creds1, creds2 *string) {
t.Helper()
kp, _ := nkeys.CreateAccount()
*pubKey, _ = kp.PublicKey()
claim := jwt.NewAccountClaims(*pubKey)
var err error
*jwt1, err = claim.Encode(oKp)
require_NoError(t, err)
ukp, _ := nkeys.CreateUser()
seed, _ := ukp.Seed()
upub, _ := ukp.PublicKey()
uclaim := newJWTTestUserClaims()
uclaim.Subject = upub
ukp, _ := nkeys.CreateUser()
seed, _ := ukp.Seed()
upub, _ := ukp.PublicKey()
uclaim := newJWTTestUserClaims()
uclaim.Subject = upub
ujwt1, err := uclaim.Encode(kp)
require_NoError(t, err)
*creds1 = genCredsFile(t, ujwt1, seed)
ujwt1, err := uclaim.Encode(kp)
require_NoError(t, err)
*creds1 = genCredsFile(t, ujwt1, seed)
// create updated claim need to assure that issue time differs
claim.Revoke(upub) // revokes all jwt from now on
time.Sleep(time.Millisecond * 1100)
*jwt2, err = claim.Encode(oKp)
require_NoError(t, err)
// create updated claim need to assure that issue time differs
if all {
claim.Revoke(jwt.All) // revokes all jwt from now on
} else {
claim.Revoke(upub) // revokes this jwt from now on
}
time.Sleep(time.Millisecond * 1100)
*jwt2, err = claim.Encode(oKp)
require_NoError(t, err)
ujwt2, err := uclaim.Encode(kp)
require_NoError(t, err)
*creds2 = genCredsFile(t, ujwt2, seed)
ujwt2, err := uclaim.Encode(kp)
require_NoError(t, err)
*creds2 = genCredsFile(t, ujwt2, seed)
done <- struct{}{}
}
// Create Accounts and corresponding revoked and non revoked user creds. Do so concurrently to speed up the test
doneChan := make(chan struct{}, 2)
defer close(doneChan)
var syspub, sysjwt, dummy1, sysCreds, dummyCreds string
go createAccountAndUser(doneChan, &syspub, &sysjwt, &dummy1, &sysCreds, &dummyCreds)
var apub, ajwt1, ajwt2, aCreds1, aCreds2 string
go createAccountAndUser(doneChan, &apub, &ajwt1, &ajwt2, &aCreds1, &aCreds2)
for i := 0; i < cap(doneChan); i++ {
<-doneChan
}
defer removeFile(t, sysCreds)
defer removeFile(t, dummyCreds)
defer removeFile(t, aCreds1)
defer removeFile(t, aCreds2)
dirSrv := createDir(t, "srv")
defer removeDir(t, dirSrv)
conf := createConfFile(t, []byte(fmt.Sprintf(`
done <- struct{}{}
}
// Create Accounts and corresponding revoked and non revoked user creds. Do so concurrently to speed up the test
doneChan := make(chan struct{}, 2)
defer close(doneChan)
var syspub, sysjwt, dummy1, sysCreds, dummyCreds string
go createAccountAndUser(doneChan, &syspub, &sysjwt, &dummy1, &sysCreds, &dummyCreds)
var apub, ajwt1, ajwt2, aCreds1, aCreds2 string
go createAccountAndUser(doneChan, &apub, &ajwt1, &ajwt2, &aCreds1, &aCreds2)
for i := 0; i < cap(doneChan); i++ {
<-doneChan
}
defer removeFile(t, sysCreds)
defer removeFile(t, dummyCreds)
defer removeFile(t, aCreds1)
defer removeFile(t, aCreds2)
dirSrv := createDir(t, "srv")
defer removeDir(t, dirSrv)
conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
operator: %s
system_account: %s
@@ -4432,47 +4437,189 @@ func TestJWTUserRevocation(t *testing.T) {
dir: %s
}
`, ojwt, syspub, dirSrv)))
defer removeFile(t, conf)
srv, _ := RunServerWithConfig(conf)
defer srv.Shutdown()
updateJwt(t, srv.ClientURL(), sysCreds, sysjwt, 1) // update system account jwt
updateJwt(t, srv.ClientURL(), sysCreds, ajwt1, 1) // set account jwt without revocation
ncSys := natsConnect(t, srv.ClientURL(), nats.UserCredentials(sysCreds), nats.Name("conn name"))
defer ncSys.Close()
ncChan := make(chan *nats.Msg, 10)
defer close(ncChan)
sub, _ := ncSys.ChanSubscribe(fmt.Sprintf(disconnectEventSubj, apub), ncChan) // observe disconnect message
defer sub.Unsubscribe()
// use credentials that will be revoked ans assure that the connection will be disconnected
nc := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aCreds1),
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
if err != nil && strings.Contains(err.Error(), "authentication revoked") {
doneChan <- struct{}{}
}
}),
)
defer nc.Close()
// update account jwt to contain revocation
if updateJwt(t, srv.ClientURL(), sysCreds, ajwt2, 1) != 1 {
t.Fatalf("Expected jwt update to pass")
defer removeFile(t, conf)
srv, _ := RunServerWithConfig(conf)
defer srv.Shutdown()
updateJwt(t, srv.ClientURL(), sysCreds, sysjwt, 1) // update system account jwt
updateJwt(t, srv.ClientURL(), sysCreds, ajwt1, 1) // set account jwt without revocation
ncSys := natsConnect(t, srv.ClientURL(), nats.UserCredentials(sysCreds), nats.Name("conn name"))
defer ncSys.Close()
ncChan := make(chan *nats.Msg, 10)
defer close(ncChan)
sub, _ := ncSys.ChanSubscribe(fmt.Sprintf(disconnectEventSubj, apub), ncChan) // observe disconnect message
defer sub.Unsubscribe()
// use credentials that will be revoked ans assure that the connection will be disconnected
nc := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aCreds1),
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
if err != nil && strings.Contains(err.Error(), "authentication revoked") {
doneChan <- struct{}{}
}
}),
)
defer nc.Close()
// update account jwt to contain revocation
if updateJwt(t, srv.ClientURL(), sysCreds, ajwt2, 1) != 1 {
t.Fatalf("Expected jwt update to pass")
}
// assure that nc got disconnected due to the revocation
select {
case <-doneChan:
case <-time.After(time.Second):
t.Fatalf("Expected connection to have failed")
}
m := <-ncChan
require_Len(t, strings.Count(string(m.Data), apub), 2)
require_True(t, strings.Contains(string(m.Data), `"jwt":"eyJ0`))
// try again with old credentials. Expected to fail
if nc1, err := nats.Connect(srv.ClientURL(), nats.UserCredentials(aCreds1)); err == nil {
nc1.Close()
t.Fatalf("Expected revoked credentials to fail")
}
// Assure new creds pass
nc2 := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aCreds2))
defer nc2.Close()
}
// assure that nc got disconnected due to the revocation
select {
case <-doneChan:
case <-time.After(time.Second):
t.Fatalf("Expected connection to have failed")
t.Run("specific-key", func(t *testing.T) {
test(false)
})
t.Run("all-key", func(t *testing.T) {
test(true)
})
}
func TestJWTActivationRevocation(t *testing.T) {
test := func(all bool) {
sysKp, syspub := createKey(t)
sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub)
sysCreds := newUser(t, sysKp)
defer removeFile(t, sysCreds)
aExpKp, aExpPub := createKey(t)
aExpClaim := jwt.NewAccountClaims(aExpPub)
aExpClaim.Name = "Export"
aExpClaim.Exports.Add(&jwt.Export{
Subject: "foo",
Type: jwt.Stream,
TokenReq: true,
})
aExp1Jwt := encodeClaim(t, aExpClaim, aExpPub)
aExpCreds := newUser(t, aExpKp)
time.Sleep(1100 * time.Millisecond)
aImpKp, aImpPub := createKey(t)
revPubKey := aImpPub
if all {
revPubKey = jwt.All
}
aExpClaim.Exports[0].RevokeAt(revPubKey, time.Now())
aExp2Jwt := encodeClaim(t, aExpClaim, aExpPub)
aExpClaim.Exports[0].ClearRevocation(revPubKey)
aExp3Jwt := encodeClaim(t, aExpClaim, aExpPub)
ac := &jwt.ActivationClaims{}
ac.Subject = aImpPub
ac.ImportSubject = "foo"
ac.ImportType = jwt.Stream
token, err := ac.Encode(aExpKp)
require_NoError(t, err)
aImpClaim := jwt.NewAccountClaims(aImpPub)
aImpClaim.Name = "Import"
aImpClaim.Imports.Add(&jwt.Import{
Subject: "foo",
Type: jwt.Stream,
Account: aExpPub,
Token: token,
})
aImpJwt := encodeClaim(t, aImpClaim, aImpPub)
aImpCreds := newUser(t, aImpKp)
defer removeFile(t, aExpCreds)
defer removeFile(t, aImpCreds)
dirSrv := createDir(t, "srv")
defer removeDir(t, dirSrv)
conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:-1
operator: %s
system_account: %s
resolver: {
type: full
dir: %s
}
`, ojwt, syspub, dirSrv)))
defer removeFile(t, conf)
t.Run("token-expired-on-connect", func(t *testing.T) {
srv, _ := RunServerWithConfig(conf)
defer srv.Shutdown()
defer removeDir(t, dirSrv) // clean jwt directory
updateJwt(t, srv.ClientURL(), sysCreds, sysJwt, 1) // update system account jwt
updateJwt(t, srv.ClientURL(), sysCreds, aExp2Jwt, 1) // set account jwt without revocation
updateJwt(t, srv.ClientURL(), sysCreds, aImpJwt, 1)
ncExp1 := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aExpCreds))
defer ncExp1.Close()
ncImp := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aImpCreds))
defer ncImp.Close()
sub, err := ncImp.SubscribeSync("foo")
require_NoError(t, err)
require_NoError(t, ncImp.Flush())
require_NoError(t, ncExp1.Publish("foo", []byte("1")))
_, err = sub.NextMsg(time.Second)
require_Error(t, err)
require_Equal(t, err.Error(), "nats: timeout")
})
t.Run("token-expired-on-update", func(t *testing.T) {
srv, _ := RunServerWithConfig(conf)
defer srv.Shutdown()
defer removeDir(t, dirSrv) // clean jwt directory
updateJwt(t, srv.ClientURL(), sysCreds, sysJwt, 1) // update system account jwt
updateJwt(t, srv.ClientURL(), sysCreds, aExp1Jwt, 1) // set account jwt without revocation
updateJwt(t, srv.ClientURL(), sysCreds, aImpJwt, 1)
ncExp1 := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aExpCreds))
defer ncExp1.Close()
ncImp := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aImpCreds))
defer ncImp.Close()
sub, err := ncImp.SubscribeSync("foo")
require_NoError(t, err)
require_NoError(t, ncImp.Flush())
require_NoError(t, ncExp1.Publish("foo", []byte("1")))
m1, err := sub.NextMsg(time.Second)
require_NoError(t, err)
require_Equal(t, string(m1.Data), "1")
updateJwt(t, srv.ClientURL(), sysCreds, aExp2Jwt, 1) // set account jwt with revocation
require_NoError(t, ncExp1.Publish("foo", []byte("2")))
_, err = sub.NextMsg(time.Second)
require_Error(t, err)
require_Equal(t, err.Error(), "nats: timeout")
updateJwt(t, srv.ClientURL(), sysCreds, aExp3Jwt, 1) // set account with revocation cleared
require_NoError(t, ncExp1.Publish("foo", []byte("3")))
m2, err := sub.NextMsg(time.Second)
require_NoError(t, err)
require_Equal(t, string(m2.Data), "3")
})
}
m := <-ncChan
require_Len(t, strings.Count(string(m.Data), apub), 2)
require_True(t, strings.Contains(string(m.Data), `"jwt":"eyJ0`))
// try again with old credentials. Expected to fail
if nc1, err := nats.Connect(srv.ClientURL(), nats.UserCredentials(aCreds1)); err == nil {
nc1.Close()
t.Fatalf("Expected revoked credentials to fail")
}
// Assure new creds pass
nc2 := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aCreds2))
defer nc2.Close()
t.Run("specific-key", func(t *testing.T) {
test(false)
})
t.Run("all-key", func(t *testing.T) {
test(true)
})
}
func TestJWTAccountFetchTimeout(t *testing.T) {

View File

@@ -2133,7 +2133,8 @@ type ExtImport struct {
type ExtExport struct {
jwt.Export
ApprovedAccounts []string `json:"approved_accounts,omitempty"`
ApprovedAccounts []string `json:"approved_accounts,omitempty"`
RevokedAct map[string]time.Time `json:"revoked_activations,omitempty"`
}
type ExtVrIssues struct {
@@ -2164,7 +2165,6 @@ type AccountInfo struct {
Claim *jwt.AccountClaims `json:"decoded_jwt,omitempty"`
Vr []ExtVrIssues `json:"validation_result_jwt,omitempty"`
RevokedUser map[string]time.Time `json:"revoked_user,omitempty"`
RevokedAct map[string]time.Time `json:"revoked_activations,omitempty"`
Sublist *SublistStats `json:"sublist_stats,omitempty"`
Responses map[string]ExtImport `json:"responses,omitempty"`
}
@@ -2262,6 +2262,17 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) {
vrIssues[i] = ExtVrIssues{v.Description, v.Blocking, v.TimeCheck}
}
}
collectRevocations := func(revocations map[string]int64) map[string]time.Time {
l := len(revocations)
if l == 0 {
return nil
}
rev := make(map[string]time.Time, l)
for k, v := range revocations {
rev[k] = time.Unix(v, 0)
}
return rev
}
exports := []ExtExport{}
for k, v := range a.exports.services {
e := ExtExport{
@@ -2278,6 +2289,7 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) {
for name := range v.approved {
e.ApprovedAccounts = append(e.ApprovedAccounts, name)
}
e.RevokedAct = collectRevocations(v.actsRevoked)
}
exports = append(exports, e)
}
@@ -2294,6 +2306,7 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) {
for name := range v.approved {
e.ApprovedAccounts = append(e.ApprovedAccounts, name)
}
e.RevokedAct = collectRevocations(v.actsRevoked)
}
exports = append(exports, e)
}
@@ -2344,13 +2357,6 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) {
}
mappings[src] = dests
}
collectRevocations := func(revocations map[string]int64) map[string]time.Time {
rev := map[string]time.Time{}
for k, v := range a.usersRevoked {
rev[k] = time.Unix(v, 0)
}
return rev
}
return &AccountInfo{
accName,
a.updated,
@@ -2371,7 +2377,6 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) {
claim,
vrIssues,
collectRevocations(a.usersRevoked),
collectRevocations(a.actsRevoked),
a.sl.Stats(),
responses,
}, nil