mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Merge pull request #1086 from nats-io/revoke
Added support for user and activation token revocation
This commit is contained in:
@@ -34,26 +34,28 @@ const globalAccountName = "$G"
|
||||
// Account are subject namespace definitions. By default no messages are shared between accounts.
|
||||
// You can share via Exports and Imports of Streams and Services.
|
||||
type Account struct {
|
||||
Name string
|
||||
Nkey string
|
||||
Issuer string
|
||||
claimJWT string
|
||||
updated time.Time
|
||||
mu sync.RWMutex
|
||||
sl *Sublist
|
||||
etmr *time.Timer
|
||||
ctmr *time.Timer
|
||||
strack map[string]sconns
|
||||
nrclients int32
|
||||
sysclients int32
|
||||
nleafs int32
|
||||
nrleafs int32
|
||||
clients map[*client]*client
|
||||
rm map[string]int32
|
||||
lqws map[string]int32
|
||||
lleafs []*client
|
||||
imports importMap
|
||||
exports exportMap
|
||||
Name string
|
||||
Nkey string
|
||||
Issuer string
|
||||
claimJWT string
|
||||
updated time.Time
|
||||
mu sync.RWMutex
|
||||
sl *Sublist
|
||||
etmr *time.Timer
|
||||
ctmr *time.Timer
|
||||
strack map[string]sconns
|
||||
nrclients int32
|
||||
sysclients int32
|
||||
nleafs int32
|
||||
nrleafs int32
|
||||
clients map[*client]*client
|
||||
rm map[string]int32
|
||||
lqws map[string]int32
|
||||
usersRevoked map[string]int64
|
||||
actsRevoked map[string]int64
|
||||
lleafs []*client
|
||||
imports importMap
|
||||
exports exportMap
|
||||
limits
|
||||
nae int32
|
||||
pruning bool
|
||||
@@ -759,6 +761,12 @@ func (a *Account) checkActivation(acc *Account, claim *jwt.Import, expTimer bool
|
||||
})
|
||||
}
|
||||
}
|
||||
// Check for token revocation..
|
||||
if a.actsRevoked != nil {
|
||||
if t, ok := a.actsRevoked[act.Subject]; ok && t <= time.Now().Unix() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
@@ -894,6 +902,19 @@ func (a *Account) clearExpirationTimer() bool {
|
||||
return stopped
|
||||
}
|
||||
|
||||
// checkUserRevoked will check if a user has been revoked.
|
||||
func (a *Account) checkUserRevoked(nkey string) bool {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
if a.usersRevoked == nil {
|
||||
return false
|
||||
}
|
||||
if t, ok := a.usersRevoked[nkey]; !ok || t > time.Now().Unix() {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Check expiration and set the proper state as needed.
|
||||
func (a *Account) checkExpiration(claims *jwt.ClaimsData) {
|
||||
a.mu.Lock()
|
||||
@@ -976,6 +997,8 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
// Reset exports and imports here.
|
||||
a.exports = exportMap{}
|
||||
a.imports = importMap{}
|
||||
// Reset any notion of export revocations.
|
||||
a.actsRevoked = nil
|
||||
|
||||
// update account signing keys
|
||||
a.signingKeys = nil
|
||||
@@ -1020,6 +1043,17 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
s.Debugf("Error adding service export to account [%s]: %v", a.Name, err.Error())
|
||||
}
|
||||
}
|
||||
// We will track these at the account level. Should not have any collisions.
|
||||
if e.Revocations != nil {
|
||||
a.mu.Lock()
|
||||
if a.actsRevoked == nil {
|
||||
a.actsRevoked = make(map[string]int64)
|
||||
}
|
||||
for k, t := range e.Revocations {
|
||||
a.actsRevoked[k] = t
|
||||
}
|
||||
a.mu.Unlock()
|
||||
}
|
||||
}
|
||||
for _, i := range ac.Imports {
|
||||
var acc *Account
|
||||
@@ -1100,6 +1134,15 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
a.mpay = int32(ac.Limits.Payload)
|
||||
a.mconns = int32(ac.Limits.Conn)
|
||||
a.mleafs = int32(ac.Limits.LeafNodeConn)
|
||||
// Check for any revocations
|
||||
if len(ac.Revocations) > 0 {
|
||||
// We will always replace whatever we had with most current, so no
|
||||
// need to look at what we have.
|
||||
a.usersRevoked = make(map[string]int64, len(ac.Revocations))
|
||||
for pk, t := range ac.Revocations {
|
||||
a.usersRevoked[pk] = t
|
||||
}
|
||||
}
|
||||
a.mu.Unlock()
|
||||
|
||||
clients := gatherClients()
|
||||
@@ -1109,6 +1152,7 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
return clients[i].start.After(clients[j].start)
|
||||
})
|
||||
}
|
||||
now := time.Now().Unix()
|
||||
for i, c := range clients {
|
||||
if a.mconns != jwt.NoLimit && i >= int(a.mconns) {
|
||||
c.maxAccountConnExceeded()
|
||||
@@ -1116,7 +1160,22 @@ func (s *Server) updateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.applyAccountLimits()
|
||||
// Check for being revoked here. We use ac one to avoid
|
||||
// the account lock.
|
||||
var nkey string
|
||||
if c.user != nil {
|
||||
nkey = c.user.Nkey
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
// Check if we have been revoked.
|
||||
if ac.Revocations != nil {
|
||||
if t, ok := ac.Revocations[nkey]; ok && now >= t {
|
||||
c.sendErrAndDebug("User Authentication Revoked")
|
||||
c.closeConnection(Revocation)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the signing keys changed, might have to evict
|
||||
|
||||
@@ -411,6 +411,11 @@ func (s *Server) isClientAuthorized(c *client) bool {
|
||||
c.Debugf("Signature not verified")
|
||||
return false
|
||||
}
|
||||
if acc.checkUserRevoked(juc.Subject) {
|
||||
c.Debugf("User authentication revoked")
|
||||
return false
|
||||
}
|
||||
|
||||
nkey = buildInternalNkeyUser(juc, acc)
|
||||
c.RegisterNkeyUser(nkey)
|
||||
|
||||
|
||||
@@ -145,6 +145,7 @@ const (
|
||||
AuthenticationExpired
|
||||
WrongGateway
|
||||
MissingAccount
|
||||
Revocation
|
||||
)
|
||||
|
||||
// Some flags passed to processMsgResultsEx
|
||||
|
||||
@@ -109,6 +109,9 @@ var (
|
||||
// ErrNoSysAccount is returned when an attempt to publish or subscribe is made
|
||||
// when there is no internal system account defined.
|
||||
ErrNoSysAccount = errors.New("system account not setup")
|
||||
|
||||
// ErrRevocation is returned when a credential has been revoked.
|
||||
ErrRevocation = errors.New("credentials have been revoked")
|
||||
)
|
||||
|
||||
// configErr is a configuration error.
|
||||
|
||||
@@ -1908,3 +1908,262 @@ func TestJWTAccountImportSignerRemoved(t *testing.T) {
|
||||
expectPong(clientReader)
|
||||
checkShadow(0)
|
||||
}
|
||||
|
||||
func TestJWTUserRevokedOnAccountUpdate(t *testing.T) {
|
||||
okp, _ := nkeys.FromSeed(oSeed)
|
||||
|
||||
akp, _ := nkeys.CreateAccount()
|
||||
apub, _ := akp.PublicKey()
|
||||
nac := jwt.NewAccountClaims(apub)
|
||||
ajwt, err := nac.Encode(okp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
|
||||
// Create a new user.
|
||||
nkp, _ := nkeys.CreateUser()
|
||||
pub, _ := nkp.PublicKey()
|
||||
nuc := jwt.NewUserClaims(pub)
|
||||
jwt, err := nuc.Encode(akp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating user JWT: %v", err)
|
||||
}
|
||||
|
||||
s := opTrustBasicSetup()
|
||||
defer s.Shutdown()
|
||||
buildMemAccResolver(s)
|
||||
addAccountToMemResolver(s, apub, ajwt)
|
||||
|
||||
c, cr, l := newClientForServer(s)
|
||||
|
||||
// Sign Nonce
|
||||
var info nonceInfo
|
||||
json.Unmarshal([]byte(l[5:]), &info)
|
||||
sigraw, _ := nkp.Sign([]byte(info.Nonce))
|
||||
sig := base64.RawURLEncoding.EncodeToString(sigraw)
|
||||
|
||||
// PING needed to flush the +OK/-ERR to us.
|
||||
cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\"}\r\nPING\r\n", jwt, sig)
|
||||
|
||||
go c.parse([]byte(cs))
|
||||
|
||||
l, _ = cr.ReadString('\n')
|
||||
if !strings.HasPrefix(l, "PONG") {
|
||||
t.Fatalf("Expected a PONG")
|
||||
}
|
||||
|
||||
// Now revoke the user.
|
||||
nac.Revoke(pub)
|
||||
|
||||
ajwt, err = nac.Encode(okp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
|
||||
// Update the account on the server.
|
||||
addAccountToMemResolver(s, apub, ajwt)
|
||||
acc, err := s.LookupAccount(apub)
|
||||
if err != nil {
|
||||
t.Fatalf("Error looking up the account: %v", err)
|
||||
}
|
||||
|
||||
// This is simulating a system update for the account claims.
|
||||
go s.updateAccountWithClaimJWT(acc, ajwt)
|
||||
|
||||
l, _ = cr.ReadString('\n')
|
||||
if !strings.HasPrefix(l, "-ERR ") {
|
||||
t.Fatalf("Expected an error")
|
||||
}
|
||||
if !strings.Contains(l, "Revoked") {
|
||||
t.Fatalf("Expected 'Revoked' to be in the error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestJWTUserRevoked(t *testing.T) {
|
||||
okp, _ := nkeys.FromSeed(oSeed)
|
||||
|
||||
// Create a new user that we will make sure has been revoked.
|
||||
nkp, _ := nkeys.CreateUser()
|
||||
pub, _ := nkp.PublicKey()
|
||||
nuc := jwt.NewUserClaims(pub)
|
||||
|
||||
akp, _ := nkeys.CreateAccount()
|
||||
apub, _ := akp.PublicKey()
|
||||
nac := jwt.NewAccountClaims(apub)
|
||||
// Revoke the user right away.
|
||||
nac.Revoke(pub)
|
||||
ajwt, err := nac.Encode(okp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
|
||||
// Sign for the user.
|
||||
jwt, err := nuc.Encode(akp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating user JWT: %v", err)
|
||||
}
|
||||
|
||||
s := opTrustBasicSetup()
|
||||
defer s.Shutdown()
|
||||
buildMemAccResolver(s)
|
||||
addAccountToMemResolver(s, apub, ajwt)
|
||||
|
||||
c, cr, l := newClientForServer(s)
|
||||
|
||||
// Sign Nonce
|
||||
var info nonceInfo
|
||||
json.Unmarshal([]byte(l[5:]), &info)
|
||||
sigraw, _ := nkp.Sign([]byte(info.Nonce))
|
||||
sig := base64.RawURLEncoding.EncodeToString(sigraw)
|
||||
|
||||
// PING needed to flush the +OK/-ERR to us.
|
||||
cs := fmt.Sprintf("CONNECT {\"jwt\":%q,\"sig\":\"%s\"}\r\nPING\r\n", jwt, sig)
|
||||
|
||||
go c.parse([]byte(cs))
|
||||
|
||||
l, _ = cr.ReadString('\n')
|
||||
if !strings.HasPrefix(l, "-ERR ") {
|
||||
t.Fatalf("Expected an error")
|
||||
}
|
||||
if !strings.Contains(l, "Authorization") {
|
||||
t.Fatalf("Expected 'Revoked' to be in the error")
|
||||
}
|
||||
}
|
||||
|
||||
// Test that an account update that revokes an import authorization cancels the import.
|
||||
func TestJWTImportTokenRevokedAfter(t *testing.T) {
|
||||
s := opTrustBasicSetup()
|
||||
defer s.Shutdown()
|
||||
buildMemAccResolver(s)
|
||||
|
||||
okp, _ := nkeys.FromSeed(oSeed)
|
||||
|
||||
// Create accounts and imports/exports.
|
||||
fooKP, _ := nkeys.CreateAccount()
|
||||
fooPub, _ := fooKP.PublicKey()
|
||||
fooAC := jwt.NewAccountClaims(fooPub)
|
||||
|
||||
// Now create Exports.
|
||||
export := &jwt.Export{Subject: "foo.private", Type: jwt.Stream, TokenReq: true}
|
||||
|
||||
fooAC.Exports.Add(export)
|
||||
fooJWT, err := fooAC.Encode(okp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
|
||||
addAccountToMemResolver(s, fooPub, fooJWT)
|
||||
|
||||
barKP, _ := nkeys.CreateAccount()
|
||||
barPub, _ := barKP.PublicKey()
|
||||
barAC := jwt.NewAccountClaims(barPub)
|
||||
simport := &jwt.Import{Account: fooPub, Subject: "foo.private", Type: jwt.Stream}
|
||||
|
||||
activation := jwt.NewActivationClaims(barPub)
|
||||
activation.ImportSubject = "foo.private"
|
||||
activation.ImportType = jwt.Stream
|
||||
actJWT, err := activation.Encode(fooKP)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating activation token: %v", err)
|
||||
}
|
||||
|
||||
simport.Token = actJWT
|
||||
barAC.Imports.Add(simport)
|
||||
barJWT, err := barAC.Encode(okp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
addAccountToMemResolver(s, barPub, barJWT)
|
||||
|
||||
// Now revoke the export.
|
||||
decoded, _ := jwt.DecodeActivationClaims(actJWT)
|
||||
export.Revoke(decoded.Subject)
|
||||
|
||||
fooJWT, err = fooAC.Encode(okp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
|
||||
addAccountToMemResolver(s, fooPub, fooJWT)
|
||||
|
||||
fooAcc, _ := s.LookupAccount(fooPub)
|
||||
if fooAcc == nil {
|
||||
t.Fatalf("Expected to retrieve the account")
|
||||
}
|
||||
|
||||
// Now lookup bar account and make sure it was revoked.
|
||||
acc, _ := s.LookupAccount(barPub)
|
||||
if acc == nil {
|
||||
t.Fatalf("Expected to retrieve the account")
|
||||
}
|
||||
if les := len(acc.imports.streams); les != 0 {
|
||||
t.Fatalf("Expected imports streams len of 0, got %d", les)
|
||||
}
|
||||
}
|
||||
|
||||
// Test that an account update that revokes an import authorization cancels the import.
|
||||
func TestJWTImportTokenRevokedBefore(t *testing.T) {
|
||||
s := opTrustBasicSetup()
|
||||
defer s.Shutdown()
|
||||
buildMemAccResolver(s)
|
||||
|
||||
okp, _ := nkeys.FromSeed(oSeed)
|
||||
|
||||
// Create accounts and imports/exports.
|
||||
fooKP, _ := nkeys.CreateAccount()
|
||||
fooPub, _ := fooKP.PublicKey()
|
||||
fooAC := jwt.NewAccountClaims(fooPub)
|
||||
|
||||
// Now create Exports.
|
||||
export := &jwt.Export{Subject: "foo.private", Type: jwt.Stream, TokenReq: true}
|
||||
|
||||
fooAC.Exports.Add(export)
|
||||
|
||||
// Import account
|
||||
barKP, _ := nkeys.CreateAccount()
|
||||
barPub, _ := barKP.PublicKey()
|
||||
barAC := jwt.NewAccountClaims(barPub)
|
||||
simport := &jwt.Import{Account: fooPub, Subject: "foo.private", Type: jwt.Stream}
|
||||
|
||||
activation := jwt.NewActivationClaims(barPub)
|
||||
activation.ImportSubject = "foo.private"
|
||||
activation.ImportType = jwt.Stream
|
||||
actJWT, err := activation.Encode(fooKP)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating activation token: %v", err)
|
||||
}
|
||||
|
||||
simport.Token = actJWT
|
||||
barAC.Imports.Add(simport)
|
||||
|
||||
// Now revoke the export.
|
||||
decoded, _ := jwt.DecodeActivationClaims(actJWT)
|
||||
export.Revoke(decoded.Subject)
|
||||
|
||||
fooJWT, err := fooAC.Encode(okp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
|
||||
addAccountToMemResolver(s, fooPub, fooJWT)
|
||||
|
||||
barJWT, err := barAC.Encode(okp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
addAccountToMemResolver(s, barPub, barJWT)
|
||||
|
||||
fooAcc, _ := s.LookupAccount(fooPub)
|
||||
if fooAcc == nil {
|
||||
t.Fatalf("Expected to retrieve the account")
|
||||
}
|
||||
|
||||
// Now lookup bar account and make sure it was revoked.
|
||||
acc, _ := s.LookupAccount(barPub)
|
||||
if acc == nil {
|
||||
t.Fatalf("Expected to retrieve the account")
|
||||
}
|
||||
if les := len(acc.imports.streams); les != 0 {
|
||||
t.Fatalf("Expected imports streams len of 0, got %d", les)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1595,6 +1595,8 @@ func (reason ClosedState) String() string {
|
||||
return "Wrong Gateway"
|
||||
case MissingAccount:
|
||||
return "Missing Account"
|
||||
case Revocation:
|
||||
return "Credentials Revoked"
|
||||
}
|
||||
return "Unknown State"
|
||||
}
|
||||
|
||||
@@ -190,7 +190,7 @@ func TestUnpromptedPong(t *testing.T) {
|
||||
|
||||
func TestPingSuppresion(t *testing.T) {
|
||||
pingInterval := 100 * time.Millisecond
|
||||
|
||||
highWater := 130 * time.Millisecond
|
||||
opts := DefaultTestOptions
|
||||
opts.Port = PING_TEST_PORT
|
||||
opts.PingInterval = pingInterval
|
||||
@@ -214,7 +214,7 @@ func TestPingSuppresion(t *testing.T) {
|
||||
t.Fatalf("pingTime too low: %v", pingTime)
|
||||
}
|
||||
// +5 is just for fudging in case things are slow in the testing system.
|
||||
if pingTime > pingInterval+(pingInterval/5)+5 {
|
||||
if pingTime > highWater {
|
||||
t.Fatalf("pingTime too high: %v", pingTime)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user