From bb63fd5f409202ff0f63fc47850cd162125b9803 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Mon, 12 Oct 2020 18:07:07 -0400 Subject: [PATCH] Adding list/delete/update operations for jwt stored by nats-resolver Update already existed scoped by account, this exposes update without account. List returns a list of all stored accounts. Delete deletes accounts. Fix a crash on startup with non existing directory. Signed-off-by: Matthias Hanel --- server/accounts.go | 110 ++++++++++++++++++++++++++++++++++++++-- server/dirstore.go | 18 +++++++ server/dirstore_test.go | 25 +++++++++ server/events.go | 3 ++ server/jwt_test.go | 68 ++++++++++++++++++++++++- server/opts.go | 9 ++-- 6 files changed, 224 insertions(+), 9 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 050ea081..8f9416ba 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -2905,9 +2905,17 @@ func (dr *DirAccResolver) Reload() error { func respondToUpdate(s *Server, respSubj string, acc string, message string, err error) { if err == nil { - s.Debugf("%s - %s", message, acc) + if acc == "" { + s.Debugf("%s", message) + } else { + s.Debugf("%s - %s", message, acc) + } } else { - s.Errorf("%s - %s - %s", message, acc, err) + if acc == "" { + s.Errorf("%s - %s", message, err) + } else { + s.Errorf("%s - %s - %s", message, acc, err) + } } if respSubj == "" { return @@ -2917,19 +2925,65 @@ func respondToUpdate(s *Server, respSubj string, acc string, message string, err if err == nil { response["data"] = map[string]interface{}{ "code": http.StatusOK, - "account": acc, "message": message, } + if acc != "" { + response["data"].(map[string]interface{})["account"] = acc + } } else { response["error"] = map[string]interface{}{ "code": http.StatusInternalServerError, - "account": acc, "description": fmt.Sprintf("%s - %v", message, err), } + if acc != "" { + response["error"].(map[string]interface{})["account"] = acc + } } s.sendInternalMsgLocked(respSubj, _EMPTY_, server, response) } +func handleListRequest(store *DirJWTStore, s *Server, reply string) { + if reply == "" { + return + } + accIds := make([]string, 0, 1024) + if err := store.PackWalk(1, func(partialPackMsg string) { + if tk := strings.Split(partialPackMsg, "|"); len(tk) == 2 { + accIds = append(accIds, tk[0]) + } + }); err != nil { + // let them timeout + s.Errorf("list request error: %v", err) + } else { + s.Debugf("list request responded with %d account ids", len(accIds)) + server := &ServerInfo{} + response := map[string]interface{}{"server": server, "data": accIds} + s.sendInternalMsgLocked(reply, _EMPTY_, server, response) + } +} + +func handleDeleteRequest(store *DirJWTStore, s *Server, msg []byte, reply string) { + accIds := strings.Split(strings.Replace(string(msg), "\r\n", "\n", -1), "\n") + errs := []string{} + passCnt := 0 + for _, acc := range accIds { + if acc == "" { + continue + } + if err := store.delete(acc); err != nil { + errs = append(errs, err.Error()) + } else { + passCnt++ + } + } + if len(errs) == 0 { + respondToUpdate(s, reply, "", fmt.Sprintf("deleted %d accounts", passCnt), nil) + } else { + respondToUpdate(s, reply, "", fmt.Sprintf("deleted %d accounts, failed for %d", passCnt, len(errs)), + errors.New(strings.Join(errs, "<\n"))) + } +} + func (dr *DirAccResolver) Start(s *Server) error { dr.Lock() defer dr.Unlock() @@ -2970,6 +3024,17 @@ func (dr *DirAccResolver) Start(s *Server) error { return fmt.Errorf("error setting up update handling: %v", err) } } + if _, err := s.sysSubscribe(accClaimsReqSubj, func(_ *subscription, _ *client, subj, resp string, msg []byte) { + if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil { + respondToUpdate(s, resp, "n/a", "jwt update resulted in error", err) + } else if err := dr.save(claim.Subject, string(msg)); err != nil { + respondToUpdate(s, resp, claim.Subject, "jwt update resulted in error", err) + } else { + respondToUpdate(s, resp, claim.Subject, "jwt updated", nil) + } + }); err != nil { + return fmt.Errorf("error setting up update handling: %v", err) + } if _, err := s.sysSubscribe(fmt.Sprintf(accLookupReqSubj, "*"), func(_ *subscription, _ *client, subj, reply string, msg []byte) { // respond to lookups with our version if reply == "" { @@ -3008,6 +3073,17 @@ func (dr *DirAccResolver) Start(s *Server) error { } }); err != nil { return fmt.Errorf("error setting up pack request handling: %v", err) + } else if _, err = s.sysSubscribe(accListReqSubj, + // respond to list requests with one message containing all account ids + func(_ *subscription, _ *client, _, reply string, _ []byte) { + handleListRequest(dr.DirJWTStore, s, reply) + }); err != nil { + return fmt.Errorf("error setting up list request handling: %v", err) + } else if _, err := s.sysSubscribe(accDeleteReqSubj, + func(_ *subscription, _ *client, _, reply string, msg []byte) { + handleDeleteRequest(dr.DirJWTStore, s, msg, reply) + }); err != nil { + return fmt.Errorf("error setting up delete request handling: %v", err) } else if _, err = s.sysSubscribe(packRespIb, func(_ *subscription, _ *client, _, _ string, msg []byte) { // embed pack responses into store hash := dr.DirJWTStore.Hash() @@ -3181,6 +3257,32 @@ func (dr *CacheDirAccResolver) Start(s *Server) error { return fmt.Errorf("error setting up update handling: %v", err) } } + if _, err := s.sysSubscribe(accClaimsReqSubj, func(_ *subscription, _ *client, subj, resp string, msg []byte) { + if claim, err := jwt.DecodeAccountClaims(string(msg)); err != nil { + respondToUpdate(s, resp, "n/a", "jwt update cache resulted in error", err) + } else if _, ok := s.accounts.Load(claim.Subject); !ok { + respondToUpdate(s, resp, claim.Subject, "jwt update cache skipped", nil) + } else if err := dr.save(claim.Subject, string(msg)); err != nil { + respondToUpdate(s, resp, claim.Subject, "jwt update cache resulted in error", err) + } else { + respondToUpdate(s, resp, claim.Subject, "jwt updated cache", nil) + } + }); err != nil { + return fmt.Errorf("error setting up update handling: %v", err) + } + if _, err := s.sysSubscribe(accListReqSubj, + // respond to list requests with one message containing all account ids + func(_ *subscription, _ *client, _, reply string, _ []byte) { + handleListRequest(dr.DirJWTStore, s, reply) + }); err != nil { + return fmt.Errorf("error setting up list request handling: %v", err) + } + if _, err := s.sysSubscribe(accDeleteReqSubj, + func(_ *subscription, _ *client, _, reply string, msg []byte) { + handleDeleteRequest(dr.DirJWTStore, s, msg, reply) + }); err != nil { + return fmt.Errorf("error setting up list request handling: %v", err) + } s.Noticef("Managing some jwt in exclusive directory %s", dr.directory) return nil } diff --git a/server/dirstore.go b/server/dirstore.go index 97536800..87e81a61 100644 --- a/server/dirstore.go +++ b/server/dirstore.go @@ -420,6 +420,24 @@ func (store *DirJWTStore) write(path string, publicKey string, theJWT string) (b return true, nil } +func (store *DirJWTStore) delete(publicKey string) error { + if store.readonly { + return fmt.Errorf("store is read-only") + } + store.Lock() + defer store.Unlock() + if err := os.Remove(store.pathForKey(publicKey)); err != nil { + if _, ok := err.(*os.PathError); ok || err == os.ErrNotExist { + return nil + } + return err + } else { + store.expiration.unTrack(publicKey) + } + // TODO do cb + return nil +} + // Save puts the JWT in a map by public key and performs update callbacks // Assumes lock is NOT held func (store *DirJWTStore) save(publicKey string, theJWT string) error { diff --git a/server/dirstore_test.go b/server/dirstore_test.go index 31e9d042..9d13e6dc 100644 --- a/server/dirstore_test.go +++ b/server/dirstore_test.go @@ -793,6 +793,31 @@ func TestTTL(t *testing.T) { require_Len(t, len(f), 0) } +func TestRemove(t *testing.T) { + dir, err := ioutil.TempDir(os.TempDir(), "jwtstore_test") + require_NoError(t, err) + require_OneJWT := func() { + t.Helper() + f, err := ioutil.ReadDir(dir) + require_NoError(t, err) + require_Len(t, len(f), 1) + } + dirStore, err := NewExpiringDirJWTStore(dir, false, false, 0, 10, true, 0, nil) + require_NoError(t, err) + defer dirStore.Close() + + accountKey, err := nkeys.CreateAccount() + require_NoError(t, err) + pubKey, err := accountKey.PublicKey() + require_NoError(t, err) + createTestAccount(t, dirStore, 0, accountKey) + require_OneJWT() + dirStore.delete(pubKey) + f, err := ioutil.ReadDir(dir) + require_NoError(t, err) + require_Len(t, len(f), 0) +} + const infDur = time.Duration(math.MaxInt64) func TestNotificationOnPack(t *testing.T) { diff --git a/server/events.go b/server/events.go index 9b1d3a95..21b38ca7 100644 --- a/server/events.go +++ b/server/events.go @@ -36,6 +36,9 @@ const ( accLookupReqTokens = 6 accLookupReqSubj = "$SYS.REQ.ACCOUNT.%s.CLAIMS.LOOKUP" accPackReqSubj = "$SYS.REQ.CLAIMS.PACK" + accListReqSubj = "$SYS.REQ.CLAIMS.LIST" + accClaimsReqSubj = "$SYS.REQ.CLAIMS.UPDATE" + accDeleteReqSubj = "$SYS.REQ.CLAIMS.DELETE" connectEventSubj = "$SYS.ACCOUNT.%s.CONNECT" disconnectEventSubj = "$SYS.ACCOUNT.%s.DISCONNECT" diff --git a/server/jwt_test.go b/server/jwt_test.go index 3148e1ec..68591d06 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -1996,7 +1996,7 @@ func TestAccountURLResolverPermanentFetchFailure(t *testing.T) { defer sysc.Close() // push accounts natsPub(t, sysc, fmt.Sprintf(accUpdateEventSubjNew, imppub), []byte(impjwt)) - natsPub(t, sysc, fmt.Sprintf(accUpdateEventSubjNew, exppub), []byte(expjwt)) + natsPub(t, sysc, fmt.Sprintf(accUpdateEventSubjOld, exppub), []byte(expjwt)) sysc.Flush() importErrCnt := 0 tmr := time.NewTimer(500 * time.Millisecond) @@ -3210,7 +3210,7 @@ func updateJwt(t *testing.T, url string, creds string, pubKey string, jwt string sub := natsSubSync(t, c, resp) err := sub.AutoUnsubscribe(respCnt) require_NoError(t, err) - require_NoError(t, c.PublishRequest(fmt.Sprintf(accUpdateEventSubjNew, pubKey), resp, []byte(jwt))) + require_NoError(t, c.PublishRequest(accClaimsReqSubj, resp, []byte(jwt))) passCnt := 0 for i := 0; i < respCnt; i++ { if require_NextMsg(sub) { @@ -4358,3 +4358,67 @@ func TestJWTUserRevocation(t *testing.T) { nc2 := natsConnect(t, srv.ClientURL(), nats.UserCredentials(aCreds2)) defer nc2.Close() } + +func TestJWTAccountOps(t *testing.T) { + createAccountAndUser := func(pubKey, jwt1, creds1 *string) { + t.Helper() + kp, _ := nkeys.CreateAccount() + *pubKey, _ = kp.PublicKey() + claim := jwt.NewAccountClaims(*pubKey) + var err error + *jwt1, err = claim.Encode(oKp) + require_NoError(t, err) + + ukp, _ := nkeys.CreateUser() + seed, _ := ukp.Seed() + upub, _ := ukp.PublicKey() + uclaim := newJWTTestUserClaims() + uclaim.Subject = upub + + ujwt1, err := uclaim.Encode(kp) + require_NoError(t, err) + *creds1 = genCredsFile(t, ujwt1, seed) + } + var syspub, sysjwt, sysCreds string + createAccountAndUser(&syspub, &sysjwt, &sysCreds) + var apub, ajwt1, aCreds1 string + createAccountAndUser(&apub, &ajwt1, &aCreds1) + defer os.Remove(sysCreds) + defer os.Remove(aCreds1) + dirSrv := createDir(t, "srv") + defer os.RemoveAll(dirSrv) + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: -1 + operator: %s + system_account: %s + resolver: { + type: full + dir: %s + } + `, ojwt, syspub, dirSrv))) + defer os.Remove(conf) + srv, _ := RunServerWithConfig(conf) + defer srv.Shutdown() + updateJwt(t, srv.ClientURL(), sysCreds, syspub, sysjwt, 1) // update system account jwt + updateJwt(t, srv.ClientURL(), sysCreds, apub, ajwt1, 1) // set jwt + nc := natsConnect(t, srv.ClientURL(), nats.UserCredentials(sysCreds)) + defer nc.Close() + resp, err := nc.Request(accListReqSubj, nil, time.Second) + require_NoError(t, err) + require_True(t, strings.Contains(string(resp.Data), apub)) + require_True(t, strings.Contains(string(resp.Data), syspub)) + // delete nothing + resp, err = nc.Request(accDeleteReqSubj, nil, time.Second) + require_NoError(t, err) + require_True(t, strings.Contains(string(resp.Data), `"message": "deleted 0 accounts"`)) + // issue delete, twice to also delete a non existing account + for i := 0; i < 2; i++ { + resp, err = nc.Request(accDeleteReqSubj, []byte(apub), time.Second) + require_NoError(t, err) + require_True(t, strings.Contains(string(resp.Data), `"message": "deleted 1 accounts"`)) + resp, err = nc.Request(accListReqSubj, nil, time.Second) + require_False(t, strings.Contains(string(resp.Data), apub)) + require_True(t, strings.Contains(string(resp.Data), syspub)) + require_NoError(t, err) + } +} diff --git a/server/opts.go b/server/opts.go index 96ab00c1..44f244f9 100644 --- a/server/opts.go +++ b/server/opts.go @@ -872,11 +872,14 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error return } if dir == "" { - *errors = append(*errors, &configErr{tk, "dir needs to point to a directory"}) + *errors = append(*errors, &configErr{tk, "dir has no value and needs to point to a directory"}) return } - if info, err := os.Stat(dir); err != nil || !info.IsDir() || info.Mode().Perm()&(1<<(uint(7))) == 0 { - info.IsDir() + if info, err := os.Stat(dir); err != nil { + + } else if !info.IsDir() || info.Mode().Perm()&(1<<(uint(7))) == 0 { + *errors = append(*errors, &configErr{tk, "dir needs to point to an accessible directory"}) + return } var res AccountResolver switch strings.ToUpper(dirType) {