diff --git a/go.sum b/go.sum index d24030fd..22ec1f3f 100644 --- a/go.sum +++ b/go.sum @@ -16,8 +16,6 @@ github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc h1:SHr4MUUZJ/fAC0uSm2OzWOJYsHpapmR86mpw7q1qPXU= -github.com/nats-io/nats.go v1.13.1-0.20211122170419-d7c1d78a50fc/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d h1:GRSmEJutHkdoxKsRypP575IIdoXe7Bm6yHQF6GcDBnA= github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= diff --git a/server/accounts.go b/server/accounts.go index eae82105..38b20270 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -69,7 +69,6 @@ type Account struct { rm map[string]int32 lqws map[string]int32 usersRevoked map[string]int64 - actsRevoked map[string]int64 mappings []*mapping lleafs []*client imports importMap @@ -178,9 +177,10 @@ func (rt ServiceRespType) String() string { // exportAuth holds configured approvals or boolean indicating an // auth token is required for import. type exportAuth struct { - tokenReq bool - accountPos uint - approved map[string]*Account + tokenReq bool + accountPos uint + approved map[string]*Account + actsRevoked map[string]int64 } // streamExport @@ -2455,7 +2455,7 @@ func (a *Account) checkAuth(ea *exportAuth, account *Account, imClaim *jwt.Impor } // Check if token required if ea.tokenReq { - return a.checkActivation(account, imClaim, true) + return a.checkActivation(account, imClaim, ea, true) } if ea.approved == nil { return false @@ -2562,7 +2562,7 @@ func (a *Account) streamActivationExpired(exportAcc *Account, subject string) { } a.mu.RUnlock() - if si.acc.checkActivation(a, si.claim, false) { + if si.acc.checkActivation(a, si.claim, nil, false) { // The token has been updated most likely and we are good to go. return } @@ -2594,7 +2594,7 @@ func (a *Account) serviceActivationExpired(subject string) { } a.mu.RUnlock() - if si.acc.checkActivation(a, si.claim, false) { + if si.acc.checkActivation(a, si.claim, nil, false) { // The token has been updated most likely and we are good to go. return } @@ -2616,17 +2616,20 @@ func (a *Account) activationExpired(exportAcc *Account, subject string, kind jwt } func isRevoked(revocations map[string]int64, subject string, issuedAt int64) bool { - if revocations == nil { + if len(revocations) == 0 { return false } if t, ok := revocations[subject]; !ok || t < issuedAt { - return false + if t, ok := revocations[jwt.All]; !ok || t < issuedAt { + return false + } } return true } // checkActivation will check the activation token for validity. -func (a *Account) checkActivation(importAcc *Account, claim *jwt.Import, expTimer bool) bool { +// ea may only be nil in cases where revocation may not be checked, say triggered by expiration timer. +func (a *Account) checkActivation(importAcc *Account, claim *jwt.Import, ea *exportAuth, expTimer bool) bool { if claim == nil || claim.Token == _EMPTY_ { return false } @@ -2662,8 +2665,11 @@ func (a *Account) checkActivation(importAcc *Account, claim *jwt.Import, expTime }) } } + if ea == nil { + return true + } // Check for token revocation.. - return !isRevoked(a.actsRevoked, act.Subject, act.IssuedAt) + return !isRevoked(ea.actsRevoked, act.Subject, act.IssuedAt) } // Returns true if the activation claim is trusted. That is the issuer matches @@ -2936,9 +2942,6 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim delete(a.imports.services, k) } - // Reset any notion of export revocations. - a.actsRevoked = nil - alteredScope := map[string]struct{}{} // update account signing keys @@ -3013,6 +3016,9 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim s.checkJetStreamExports() } + streamTokenExpirationChanged := false + serviceTokenExpirationChanged := false + for _, e := range ac.Exports { switch e.Type { case jwt.Stream: @@ -3052,17 +3058,44 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim } } } - // We will track these at the account level. Should not have any collisions. - if e.Revocations != nil { - a.mu.Lock() - if a.actsRevoked == nil { - a.actsRevoked = make(map[string]int64) + + var revocationChanged *bool + var ea *exportAuth + + a.mu.Lock() + switch e.Type { + case jwt.Stream: + revocationChanged = &streamTokenExpirationChanged + if se, ok := a.exports.streams[string(e.Subject)]; ok && se != nil { + ea = &se.exportAuth } - for k, t := range e.Revocations { - a.actsRevoked[k] = t + case jwt.Service: + revocationChanged = &serviceTokenExpirationChanged + if se, ok := a.exports.services[string(e.Subject)]; ok && se != nil { + ea = &se.exportAuth } - a.mu.Unlock() } + if ea != nil { + oldRevocations := ea.actsRevoked + if len(e.Revocations) == 0 { + // remove all, no need to evaluate existing imports + ea.actsRevoked = nil + } else if len(oldRevocations) == 0 { + // add all, existing imports need to be re evaluated + ea.actsRevoked = e.Revocations + *revocationChanged = true + } else { + ea.actsRevoked = e.Revocations + // diff, existing imports need to be conditionally re evaluated, depending on: + // if a key was added, or it's timestamp increased + for k, t := range e.Revocations { + if tOld, ok := oldRevocations[k]; !ok || tOld < t { + *revocationChanged = true + } + } + } + } + a.mu.Unlock() } var incompleteImports []*jwt.Import for _, i := range ac.Imports { @@ -3116,7 +3149,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim } } // Now check if stream exports have changed. - if !a.checkStreamExportsEqual(old) || signersChanged { + if !a.checkStreamExportsEqual(old) || signersChanged || streamTokenExpirationChanged { clients := map[*client]struct{}{} // We need to check all accounts that have an import claim from this account. awcsti := map[string]struct{}{} @@ -3149,7 +3182,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim } } // Now check if service exports have changed. - if !a.checkServiceExportsEqual(old) || signersChanged { + if !a.checkServiceExportsEqual(old) || signersChanged || serviceTokenExpirationChanged { s.accounts.Range(func(k, v interface{}) bool { acc := v.(*Account) // Move to the next if this account is actually account "a". diff --git a/server/jwt_test.go b/server/jwt_test.go index a0a42386..cb81cd3e 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -4376,54 +4376,59 @@ func TestJWTJetStreamLimits(t *testing.T) { } func TestJWTUserRevocation(t *testing.T) { - createAccountAndUser := func(done chan struct{}, pubKey, jwt1, jwt2, creds1, creds2 *string) { - t.Helper() - kp, _ := nkeys.CreateAccount() - *pubKey, _ = kp.PublicKey() - claim := jwt.NewAccountClaims(*pubKey) - var err error - *jwt1, err = claim.Encode(oKp) - require_NoError(t, err) + test := func(all bool) { + createAccountAndUser := func(done chan struct{}, pubKey, jwt1, jwt2, creds1, creds2 *string) { + t.Helper() + kp, _ := nkeys.CreateAccount() + *pubKey, _ = kp.PublicKey() + claim := jwt.NewAccountClaims(*pubKey) + var err error + *jwt1, err = claim.Encode(oKp) + require_NoError(t, err) - ukp, _ := nkeys.CreateUser() - seed, _ := ukp.Seed() - upub, _ := ukp.PublicKey() - uclaim := newJWTTestUserClaims() - uclaim.Subject = upub + ukp, _ := nkeys.CreateUser() + seed, _ := ukp.Seed() + upub, _ := ukp.PublicKey() + uclaim := newJWTTestUserClaims() + uclaim.Subject = upub - ujwt1, err := uclaim.Encode(kp) - require_NoError(t, err) - *creds1 = genCredsFile(t, ujwt1, seed) + ujwt1, err := uclaim.Encode(kp) + require_NoError(t, err) + *creds1 = genCredsFile(t, ujwt1, seed) - // create updated claim need to assure that issue time differs - claim.Revoke(upub) // revokes all jwt from now on - time.Sleep(time.Millisecond * 1100) - *jwt2, err = claim.Encode(oKp) - require_NoError(t, err) + // create updated claim need to assure that issue time differs + if all { + claim.Revoke(jwt.All) // revokes all jwt from now on + } else { + claim.Revoke(upub) // revokes this jwt from now on + } + time.Sleep(time.Millisecond * 1100) + *jwt2, err = claim.Encode(oKp) + require_NoError(t, err) - ujwt2, err := uclaim.Encode(kp) - require_NoError(t, err) - *creds2 = genCredsFile(t, ujwt2, seed) + ujwt2, err := uclaim.Encode(kp) + require_NoError(t, err) + *creds2 = genCredsFile(t, ujwt2, seed) - done <- struct{}{} - } - // Create Accounts and corresponding revoked and non revoked user creds. Do so concurrently to speed up the test - doneChan := make(chan struct{}, 2) - defer close(doneChan) - var syspub, sysjwt, dummy1, sysCreds, dummyCreds string - go createAccountAndUser(doneChan, &syspub, &sysjwt, &dummy1, &sysCreds, &dummyCreds) - var apub, ajwt1, ajwt2, aCreds1, aCreds2 string - go createAccountAndUser(doneChan, &apub, &ajwt1, &ajwt2, &aCreds1, &aCreds2) - for i := 0; i < cap(doneChan); i++ { - <-doneChan - } - defer removeFile(t, sysCreds) - defer removeFile(t, dummyCreds) - defer removeFile(t, aCreds1) - defer removeFile(t, aCreds2) - dirSrv := createDir(t, "srv") - defer removeDir(t, dirSrv) - conf := createConfFile(t, []byte(fmt.Sprintf(` + done <- struct{}{} + } + // Create Accounts and corresponding revoked and non revoked user creds. Do so concurrently to speed up the test + doneChan := make(chan struct{}, 2) + defer close(doneChan) + var syspub, sysjwt, dummy1, sysCreds, dummyCreds string + go createAccountAndUser(doneChan, &syspub, &sysjwt, &dummy1, &sysCreds, &dummyCreds) + var apub, ajwt1, ajwt2, aCreds1, aCreds2 string + go createAccountAndUser(doneChan, &apub, &ajwt1, &ajwt2, &aCreds1, &aCreds2) + for i := 0; i < cap(doneChan); i++ { + <-doneChan + } + defer removeFile(t, sysCreds) + defer removeFile(t, dummyCreds) + defer removeFile(t, aCreds1) + defer removeFile(t, aCreds2) + dirSrv := createDir(t, "srv") + defer removeDir(t, dirSrv) + conf := createConfFile(t, []byte(fmt.Sprintf(` listen: 127.0.0.1:-1 operator: %s system_account: %s @@ -4432,47 +4437,189 @@ func TestJWTUserRevocation(t *testing.T) { dir: %s } `, ojwt, syspub, dirSrv))) - defer removeFile(t, conf) - srv, _ := RunServerWithConfig(conf) - defer srv.Shutdown() - updateJwt(t, srv.ClientURL(), sysCreds, sysjwt, 1) // update system account jwt - updateJwt(t, srv.ClientURL(), sysCreds, ajwt1, 1) // set account jwt without revocation - ncSys := natsConnect(t, srv.ClientURL(), nats.UserCredentials(sysCreds), nats.Name("conn name")) - defer ncSys.Close() - ncChan := make(chan *nats.Msg, 10) - defer close(ncChan) - sub, _ := ncSys.ChanSubscribe(fmt.Sprintf(disconnectEventSubj, apub), ncChan) // observe disconnect message - defer sub.Unsubscribe() - // use credentials that will be revoked ans assure that the connection will be disconnected - nc := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aCreds1), - nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) { - if err != nil && strings.Contains(err.Error(), "authentication revoked") { - doneChan <- struct{}{} - } - }), - ) - defer nc.Close() - // update account jwt to contain revocation - if updateJwt(t, srv.ClientURL(), sysCreds, ajwt2, 1) != 1 { - t.Fatalf("Expected jwt update to pass") + defer removeFile(t, conf) + srv, _ := RunServerWithConfig(conf) + defer srv.Shutdown() + updateJwt(t, srv.ClientURL(), sysCreds, sysjwt, 1) // update system account jwt + updateJwt(t, srv.ClientURL(), sysCreds, ajwt1, 1) // set account jwt without revocation + ncSys := natsConnect(t, srv.ClientURL(), nats.UserCredentials(sysCreds), nats.Name("conn name")) + defer ncSys.Close() + ncChan := make(chan *nats.Msg, 10) + defer close(ncChan) + sub, _ := ncSys.ChanSubscribe(fmt.Sprintf(disconnectEventSubj, apub), ncChan) // observe disconnect message + defer sub.Unsubscribe() + // use credentials that will be revoked ans assure that the connection will be disconnected + nc := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aCreds1), + nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) { + if err != nil && strings.Contains(err.Error(), "authentication revoked") { + doneChan <- struct{}{} + } + }), + ) + defer nc.Close() + // update account jwt to contain revocation + if updateJwt(t, srv.ClientURL(), sysCreds, ajwt2, 1) != 1 { + t.Fatalf("Expected jwt update to pass") + } + // assure that nc got disconnected due to the revocation + select { + case <-doneChan: + case <-time.After(time.Second): + t.Fatalf("Expected connection to have failed") + } + m := <-ncChan + require_Len(t, strings.Count(string(m.Data), apub), 2) + require_True(t, strings.Contains(string(m.Data), `"jwt":"eyJ0`)) + // try again with old credentials. Expected to fail + if nc1, err := nats.Connect(srv.ClientURL(), nats.UserCredentials(aCreds1)); err == nil { + nc1.Close() + t.Fatalf("Expected revoked credentials to fail") + } + // Assure new creds pass + nc2 := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aCreds2)) + defer nc2.Close() } - // assure that nc got disconnected due to the revocation - select { - case <-doneChan: - case <-time.After(time.Second): - t.Fatalf("Expected connection to have failed") + t.Run("specific-key", func(t *testing.T) { + test(false) + }) + t.Run("all-key", func(t *testing.T) { + test(true) + }) +} + +func TestJWTActivationRevocation(t *testing.T) { + test := func(all bool) { + sysKp, syspub := createKey(t) + sysJwt := encodeClaim(t, jwt.NewAccountClaims(syspub), syspub) + sysCreds := newUser(t, sysKp) + defer removeFile(t, sysCreds) + + aExpKp, aExpPub := createKey(t) + aExpClaim := jwt.NewAccountClaims(aExpPub) + aExpClaim.Name = "Export" + aExpClaim.Exports.Add(&jwt.Export{ + Subject: "foo", + Type: jwt.Stream, + TokenReq: true, + }) + aExp1Jwt := encodeClaim(t, aExpClaim, aExpPub) + aExpCreds := newUser(t, aExpKp) + + time.Sleep(1100 * time.Millisecond) + aImpKp, aImpPub := createKey(t) + + revPubKey := aImpPub + if all { + revPubKey = jwt.All + } + + aExpClaim.Exports[0].RevokeAt(revPubKey, time.Now()) + aExp2Jwt := encodeClaim(t, aExpClaim, aExpPub) + + aExpClaim.Exports[0].ClearRevocation(revPubKey) + aExp3Jwt := encodeClaim(t, aExpClaim, aExpPub) + + ac := &jwt.ActivationClaims{} + ac.Subject = aImpPub + ac.ImportSubject = "foo" + ac.ImportType = jwt.Stream + token, err := ac.Encode(aExpKp) + require_NoError(t, err) + + aImpClaim := jwt.NewAccountClaims(aImpPub) + aImpClaim.Name = "Import" + aImpClaim.Imports.Add(&jwt.Import{ + Subject: "foo", + Type: jwt.Stream, + Account: aExpPub, + Token: token, + }) + aImpJwt := encodeClaim(t, aImpClaim, aImpPub) + aImpCreds := newUser(t, aImpKp) + defer removeFile(t, aExpCreds) + defer removeFile(t, aImpCreds) + + dirSrv := createDir(t, "srv") + defer removeDir(t, dirSrv) + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: 127.0.0.1:-1 + operator: %s + system_account: %s + resolver: { + type: full + dir: %s + } + `, ojwt, syspub, dirSrv))) + defer removeFile(t, conf) + + t.Run("token-expired-on-connect", func(t *testing.T) { + srv, _ := RunServerWithConfig(conf) + defer srv.Shutdown() + defer removeDir(t, dirSrv) // clean jwt directory + + updateJwt(t, srv.ClientURL(), sysCreds, sysJwt, 1) // update system account jwt + updateJwt(t, srv.ClientURL(), sysCreds, aExp2Jwt, 1) // set account jwt without revocation + updateJwt(t, srv.ClientURL(), sysCreds, aImpJwt, 1) + + ncExp1 := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aExpCreds)) + defer ncExp1.Close() + + ncImp := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aImpCreds)) + defer ncImp.Close() + + sub, err := ncImp.SubscribeSync("foo") + require_NoError(t, err) + require_NoError(t, ncImp.Flush()) + require_NoError(t, ncExp1.Publish("foo", []byte("1"))) + _, err = sub.NextMsg(time.Second) + require_Error(t, err) + require_Equal(t, err.Error(), "nats: timeout") + }) + + t.Run("token-expired-on-update", func(t *testing.T) { + srv, _ := RunServerWithConfig(conf) + defer srv.Shutdown() + defer removeDir(t, dirSrv) // clean jwt directory + + updateJwt(t, srv.ClientURL(), sysCreds, sysJwt, 1) // update system account jwt + updateJwt(t, srv.ClientURL(), sysCreds, aExp1Jwt, 1) // set account jwt without revocation + updateJwt(t, srv.ClientURL(), sysCreds, aImpJwt, 1) + + ncExp1 := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aExpCreds)) + defer ncExp1.Close() + + ncImp := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aImpCreds)) + defer ncImp.Close() + + sub, err := ncImp.SubscribeSync("foo") + require_NoError(t, err) + require_NoError(t, ncImp.Flush()) + require_NoError(t, ncExp1.Publish("foo", []byte("1"))) + m1, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, string(m1.Data), "1") + + updateJwt(t, srv.ClientURL(), sysCreds, aExp2Jwt, 1) // set account jwt with revocation + + require_NoError(t, ncExp1.Publish("foo", []byte("2"))) + _, err = sub.NextMsg(time.Second) + require_Error(t, err) + require_Equal(t, err.Error(), "nats: timeout") + + updateJwt(t, srv.ClientURL(), sysCreds, aExp3Jwt, 1) // set account with revocation cleared + + require_NoError(t, ncExp1.Publish("foo", []byte("3"))) + m2, err := sub.NextMsg(time.Second) + require_NoError(t, err) + require_Equal(t, string(m2.Data), "3") + }) } - m := <-ncChan - require_Len(t, strings.Count(string(m.Data), apub), 2) - require_True(t, strings.Contains(string(m.Data), `"jwt":"eyJ0`)) - // try again with old credentials. Expected to fail - if nc1, err := nats.Connect(srv.ClientURL(), nats.UserCredentials(aCreds1)); err == nil { - nc1.Close() - t.Fatalf("Expected revoked credentials to fail") - } - // Assure new creds pass - nc2 := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aCreds2)) - defer nc2.Close() + t.Run("specific-key", func(t *testing.T) { + test(false) + }) + t.Run("all-key", func(t *testing.T) { + test(true) + }) } func TestJWTAccountFetchTimeout(t *testing.T) { diff --git a/server/monitor.go b/server/monitor.go index 6f604ea9..d60110fb 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2131,7 +2131,8 @@ type ExtImport struct { type ExtExport struct { jwt.Export - ApprovedAccounts []string `json:"approved_accounts,omitempty"` + ApprovedAccounts []string `json:"approved_accounts,omitempty"` + RevokedAct map[string]time.Time `json:"revoked_activations,omitempty"` } type ExtVrIssues struct { @@ -2162,7 +2163,6 @@ type AccountInfo struct { Claim *jwt.AccountClaims `json:"decoded_jwt,omitempty"` Vr []ExtVrIssues `json:"validation_result_jwt,omitempty"` RevokedUser map[string]time.Time `json:"revoked_user,omitempty"` - RevokedAct map[string]time.Time `json:"revoked_activations,omitempty"` Sublist *SublistStats `json:"sublist_stats,omitempty"` Responses map[string]ExtImport `json:"responses,omitempty"` } @@ -2260,6 +2260,13 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) { vrIssues[i] = ExtVrIssues{v.Description, v.Blocking, v.TimeCheck} } } + collectRevocations := func(revocations map[string]int64) map[string]time.Time { + rev := map[string]time.Time{} + for k, v := range a.usersRevoked { + rev[k] = time.Unix(v, 0) + } + return rev + } exports := []ExtExport{} for k, v := range a.exports.services { e := ExtExport{ @@ -2276,6 +2283,7 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) { for name := range v.approved { e.ApprovedAccounts = append(e.ApprovedAccounts, name) } + e.RevokedAct = collectRevocations(v.actsRevoked) } exports = append(exports, e) } @@ -2292,6 +2300,7 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) { for name := range v.approved { e.ApprovedAccounts = append(e.ApprovedAccounts, name) } + e.RevokedAct = collectRevocations(v.actsRevoked) } exports = append(exports, e) } @@ -2342,13 +2351,6 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) { } mappings[src] = dests } - collectRevocations := func(revocations map[string]int64) map[string]time.Time { - rev := map[string]time.Time{} - for k, v := range a.usersRevoked { - rev[k] = time.Unix(v, 0) - } - return rev - } return &AccountInfo{ accName, a.updated, @@ -2369,7 +2371,6 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) { claim, vrIssues, collectRevocations(a.usersRevoked), - collectRevocations(a.actsRevoked), a.sl.Stats(), responses, }, nil