Adding list/delete/update operations for jwt stored by nats-resolver

Update already existed scoped by account, this exposes update without account.
List returns a list of all stored accounts.
Delete deletes accounts.
Fix a crash on startup with non existing directory.

Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
Matthias Hanel
2020-10-12 18:07:07 -04:00
parent aead978be9
commit bb63fd5f40
6 changed files with 224 additions and 9 deletions

View File

@@ -2905,9 +2905,17 @@ func (dr *DirAccResolver) Reload() error {
func respondToUpdate(s *Server, respSubj string, acc string, message string, err error) {
if err == nil {
s.Debugf("%s - %s", message, acc)
if acc == "" {
s.Debugf("%s", message)
} else {
s.Debugf("%s - %s", message, acc)
}
} else {
s.Errorf("%s - %s - %s", message, acc, err)
if acc == "" {
s.Errorf("%s - %s", message, err)
} else {
s.Errorf("%s - %s - %s", message, acc, err)
}
}
if respSubj == "" {
return
@@ -2917,19 +2925,65 @@ func respondToUpdate(s *Server, respSubj string, acc string, message string, err
if err == nil {
response["data"] = map[string]interface{}{
"code": http.StatusOK,
"account": acc,
"message": message,
}
if acc != "" {
response["data"].(map[string]interface{})["account"] = acc
}
} else {
response["error"] = map[string]interface{}{
"code": http.StatusInternalServerError,
"account": acc,
"description": fmt.Sprintf("%s - %v", message, err),
}
if acc != "" {
response["error"].(map[string]interface{})["account"] = acc
}
}
s.sendInternalMsgLocked(respSubj, _EMPTY_, server, response)
}
func handleListRequest(store *DirJWTStore, s *Server, reply string) {
if reply == "" {
return
}
accIds := make([]string, 0, 1024)
if err := store.PackWalk(1, func(partialPackMsg string) {
if tk := strings.Split(partialPackMsg, "|"); len(tk) == 2 {
accIds = append(accIds, tk[0])
}
}); err != nil {
// let them timeout
s.Errorf("list request error: %v", err)
} else {
s.Debugf("list request responded with %d account ids", len(accIds))
server := &ServerInfo{}
response := map[string]interface{}{"server": server, "data": accIds}
s.sendInternalMsgLocked(reply, _EMPTY_, server, response)
}
}
func handleDeleteRequest(store *DirJWTStore, s *Server, msg []byte, reply string) {
accIds := strings.Split(strings.Replace(string(msg), "\r\n", "\n", -1), "\n")
errs := []string{}
passCnt := 0
for _, acc := range accIds {
if acc == "" {
continue
}
if err := store.delete(acc); err != nil {
errs = append(errs, err.Error())
} else {
passCnt++
}
}
if len(errs) == 0 {
respondToUpdate(s, reply, "", fmt.Sprintf("deleted %d accounts", passCnt), nil)
} else {
respondToUpdate(s, reply, "", fmt.Sprintf("deleted %d accounts, failed for %d", passCnt, len(errs)),
errors.New(strings.Join(errs, "<\n")))
}
}
func (dr *DirAccResolver) Start(s *Server) error {
dr.Lock()
defer dr.Unlock()
@@ -2970,6 +3024,17 @@ func (dr *DirAccResolver) Start(s *Server) error {
return fmt.Errorf("error setting up update handling: %v", err)
}
}
if _, err := s.sysSubscribe(accClaimsReqSubj, func(_ *subscription, _ *client, subj, resp string, msg []byte) {
if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil {
respondToUpdate(s, resp, "n/a", "jwt update resulted in error", err)
} else if err := dr.save(claim.Subject, string(msg)); err != nil {
respondToUpdate(s, resp, claim.Subject, "jwt update resulted in error", err)
} else {
respondToUpdate(s, resp, claim.Subject, "jwt updated", nil)
}
}); err != nil {
return fmt.Errorf("error setting up update handling: %v", err)
}
if _, err := s.sysSubscribe(fmt.Sprintf(accLookupReqSubj, "*"), func(_ *subscription, _ *client, subj, reply string, msg []byte) {
// respond to lookups with our version
if reply == "" {
@@ -3008,6 +3073,17 @@ func (dr *DirAccResolver) Start(s *Server) error {
}
}); err != nil {
return fmt.Errorf("error setting up pack request handling: %v", err)
} else if _, err = s.sysSubscribe(accListReqSubj,
// respond to list requests with one message containing all account ids
func(_ *subscription, _ *client, _, reply string, _ []byte) {
handleListRequest(dr.DirJWTStore, s, reply)
}); err != nil {
return fmt.Errorf("error setting up list request handling: %v", err)
} else if _, err := s.sysSubscribe(accDeleteReqSubj,
func(_ *subscription, _ *client, _, reply string, msg []byte) {
handleDeleteRequest(dr.DirJWTStore, s, msg, reply)
}); err != nil {
return fmt.Errorf("error setting up delete request handling: %v", err)
} else if _, err = s.sysSubscribe(packRespIb, func(_ *subscription, _ *client, _, _ string, msg []byte) {
// embed pack responses into store
hash := dr.DirJWTStore.Hash()
@@ -3181,6 +3257,32 @@ func (dr *CacheDirAccResolver) Start(s *Server) error {
return fmt.Errorf("error setting up update handling: %v", err)
}
}
if _, err := s.sysSubscribe(accClaimsReqSubj, func(_ *subscription, _ *client, subj, resp string, msg []byte) {
if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil {
respondToUpdate(s, resp, "n/a", "jwt update cache resulted in error", err)
} else if _, ok := s.accounts.Load(claim.Subject); !ok {
respondToUpdate(s, resp, claim.Subject, "jwt update cache skipped", nil)
} else if err := dr.save(claim.Subject, string(msg)); err != nil {
respondToUpdate(s, resp, claim.Subject, "jwt update cache resulted in error", err)
} else {
respondToUpdate(s, resp, claim.Subject, "jwt updated cache", nil)
}
}); err != nil {
return fmt.Errorf("error setting up update handling: %v", err)
}
if _, err := s.sysSubscribe(accListReqSubj,
// respond to list requests with one message containing all account ids
func(_ *subscription, _ *client, _, reply string, _ []byte) {
handleListRequest(dr.DirJWTStore, s, reply)
}); err != nil {
return fmt.Errorf("error setting up list request handling: %v", err)
}
if _, err := s.sysSubscribe(accDeleteReqSubj,
func(_ *subscription, _ *client, _, reply string, msg []byte) {
handleDeleteRequest(dr.DirJWTStore, s, msg, reply)
}); err != nil {
return fmt.Errorf("error setting up list request handling: %v", err)
}
s.Noticef("Managing some jwt in exclusive directory %s", dr.directory)
return nil
}

View File

@@ -420,6 +420,24 @@ func (store *DirJWTStore) write(path string, publicKey string, theJWT string) (b
return true, nil
}
func (store *DirJWTStore) delete(publicKey string) error {
if store.readonly {
return fmt.Errorf("store is read-only")
}
store.Lock()
defer store.Unlock()
if err := os.Remove(store.pathForKey(publicKey)); err != nil {
if _, ok := err.(*os.PathError); ok || err == os.ErrNotExist {
return nil
}
return err
} else {
store.expiration.unTrack(publicKey)
}
// TODO do cb
return nil
}
// Save puts the JWT in a map by public key and performs update callbacks
// Assumes lock is NOT held
func (store *DirJWTStore) save(publicKey string, theJWT string) error {

View File

@@ -793,6 +793,31 @@ func TestTTL(t *testing.T) {
require_Len(t, len(f), 0)
}
func TestRemove(t *testing.T) {
dir, err := ioutil.TempDir(os.TempDir(), "jwtstore_test")
require_NoError(t, err)
require_OneJWT := func() {
t.Helper()
f, err := ioutil.ReadDir(dir)
require_NoError(t, err)
require_Len(t, len(f), 1)
}
dirStore, err := NewExpiringDirJWTStore(dir, false, false, 0, 10, true, 0, nil)
require_NoError(t, err)
defer dirStore.Close()
accountKey, err := nkeys.CreateAccount()
require_NoError(t, err)
pubKey, err := accountKey.PublicKey()
require_NoError(t, err)
createTestAccount(t, dirStore, 0, accountKey)
require_OneJWT()
dirStore.delete(pubKey)
f, err := ioutil.ReadDir(dir)
require_NoError(t, err)
require_Len(t, len(f), 0)
}
const infDur = time.Duration(math.MaxInt64)
func TestNotificationOnPack(t *testing.T) {

View File

@@ -36,6 +36,9 @@ const (
accLookupReqTokens = 6
accLookupReqSubj = "$SYS.REQ.ACCOUNT.%s.CLAIMS.LOOKUP"
accPackReqSubj = "$SYS.REQ.CLAIMS.PACK"
accListReqSubj = "$SYS.REQ.CLAIMS.LIST"
accClaimsReqSubj = "$SYS.REQ.CLAIMS.UPDATE"
accDeleteReqSubj = "$SYS.REQ.CLAIMS.DELETE"
connectEventSubj = "$SYS.ACCOUNT.%s.CONNECT"
disconnectEventSubj = "$SYS.ACCOUNT.%s.DISCONNECT"

View File

@@ -1996,7 +1996,7 @@ func TestAccountURLResolverPermanentFetchFailure(t *testing.T) {
defer sysc.Close()
// push accounts
natsPub(t, sysc, fmt.Sprintf(accUpdateEventSubjNew, imppub), []byte(impjwt))
natsPub(t, sysc, fmt.Sprintf(accUpdateEventSubjNew, exppub), []byte(expjwt))
natsPub(t, sysc, fmt.Sprintf(accUpdateEventSubjOld, exppub), []byte(expjwt))
sysc.Flush()
importErrCnt := 0
tmr := time.NewTimer(500 * time.Millisecond)
@@ -3210,7 +3210,7 @@ func updateJwt(t *testing.T, url string, creds string, pubKey string, jwt string
sub := natsSubSync(t, c, resp)
err := sub.AutoUnsubscribe(respCnt)
require_NoError(t, err)
require_NoError(t, c.PublishRequest(fmt.Sprintf(accUpdateEventSubjNew, pubKey), resp, []byte(jwt)))
require_NoError(t, c.PublishRequest(accClaimsReqSubj, resp, []byte(jwt)))
passCnt := 0
for i := 0; i < respCnt; i++ {
if require_NextMsg(sub) {
@@ -4358,3 +4358,67 @@ func TestJWTUserRevocation(t *testing.T) {
nc2 := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aCreds2))
defer nc2.Close()
}
func TestJWTAccountOps(t *testing.T) {
createAccountAndUser := func(pubKey, jwt1, creds1 *string) {
t.Helper()
kp, _ := nkeys.CreateAccount()
*pubKey, _ = kp.PublicKey()
claim := jwt.NewAccountClaims(*pubKey)
var err error
*jwt1, err = claim.Encode(oKp)
require_NoError(t, err)
ukp, _ := nkeys.CreateUser()
seed, _ := ukp.Seed()
upub, _ := ukp.PublicKey()
uclaim := newJWTTestUserClaims()
uclaim.Subject = upub
ujwt1, err := uclaim.Encode(kp)
require_NoError(t, err)
*creds1 = genCredsFile(t, ujwt1, seed)
}
var syspub, sysjwt, sysCreds string
createAccountAndUser(&syspub, &sysjwt, &sysCreds)
var apub, ajwt1, aCreds1 string
createAccountAndUser(&apub, &ajwt1, &aCreds1)
defer os.Remove(sysCreds)
defer os.Remove(aCreds1)
dirSrv := createDir(t, "srv")
defer os.RemoveAll(dirSrv)
conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: -1
operator: %s
system_account: %s
resolver: {
type: full
dir: %s
}
`, ojwt, syspub, dirSrv)))
defer os.Remove(conf)
srv, _ := RunServerWithConfig(conf)
defer srv.Shutdown()
updateJwt(t, srv.ClientURL(), sysCreds, syspub, sysjwt, 1) // update system account jwt
updateJwt(t, srv.ClientURL(), sysCreds, apub, ajwt1, 1) // set jwt
nc := natsConnect(t, srv.ClientURL(), nats.UserCredentials(sysCreds))
defer nc.Close()
resp, err := nc.Request(accListReqSubj, nil, time.Second)
require_NoError(t, err)
require_True(t, strings.Contains(string(resp.Data), apub))
require_True(t, strings.Contains(string(resp.Data), syspub))
// delete nothing
resp, err = nc.Request(accDeleteReqSubj, nil, time.Second)
require_NoError(t, err)
require_True(t, strings.Contains(string(resp.Data), `"message": "deleted 0 accounts"`))
// issue delete, twice to also delete a non existing account
for i := 0; i < 2; i++ {
resp, err = nc.Request(accDeleteReqSubj, []byte(apub), time.Second)
require_NoError(t, err)
require_True(t, strings.Contains(string(resp.Data), `"message": "deleted 1 accounts"`))
resp, err = nc.Request(accListReqSubj, nil, time.Second)
require_False(t, strings.Contains(string(resp.Data), apub))
require_True(t, strings.Contains(string(resp.Data), syspub))
require_NoError(t, err)
}
}

View File

@@ -872,11 +872,14 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error
return
}
if dir == "" {
*errors = append(*errors, &configErr{tk, "dir needs to point to a directory"})
*errors = append(*errors, &configErr{tk, "dir has no value and needs to point to a directory"})
return
}
if info, err := os.Stat(dir); err != nil || !info.IsDir() || info.Mode().Perm()&(1<<(uint(7))) == 0 {
info.IsDir()
if info, err := os.Stat(dir); err != nil {
} else if !info.IsDir() || info.Mode().Perm()&(1<<(uint(7))) == 0 {
*errors = append(*errors, &configErr{tk, "dir needs to point to an accessible directory"})
return
}
var res AccountResolver
switch strings.ToUpper(dirType) {