mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
Adding fetch on missing jwt of full nats based resolver
Full nats based resolver sync within a cluster. This functionality addresses syncing between cluster. Fixing deadlock when more than one server responds to lookup. Fixing crash when shutdown and pack happen at the same time.
This commit is contained in:
@@ -2892,6 +2892,7 @@ func (ur *URLAccResolver) Fetch(name string) (string, error) {
|
||||
// Resolver based on nats for synchronization and backing directory for storage.
|
||||
type DirAccResolver struct {
|
||||
*DirJWTStore
|
||||
*Server
|
||||
syncInterval time.Duration
|
||||
}
|
||||
|
||||
@@ -2899,6 +2900,10 @@ func (dr *DirAccResolver) IsTrackingUpdate() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (dr *DirAccResolver) Reload() error {
|
||||
return dr.DirJWTStore.Reload()
|
||||
}
|
||||
|
||||
func respondToUpdate(s *Server, respSubj string, acc string, message string, err error) {
|
||||
if err == nil {
|
||||
s.Debugf("%s - %s", message, acc)
|
||||
@@ -2929,6 +2934,7 @@ func respondToUpdate(s *Server, respSubj string, acc string, message string, err
|
||||
func (dr *DirAccResolver) Start(s *Server) error {
|
||||
dr.Lock()
|
||||
defer dr.Unlock()
|
||||
dr.Server = s
|
||||
dr.DirJWTStore.changed = func(pubKey string) {
|
||||
if v, ok := s.accounts.Load(pubKey); !ok {
|
||||
} else if jwt, err := dr.LoadAcc(pubKey); err != nil {
|
||||
@@ -2998,7 +3004,7 @@ func (dr *DirAccResolver) Start(s *Server) error {
|
||||
// let them timeout
|
||||
s.Errorf("pack request error: %v", err)
|
||||
} else {
|
||||
s.Debugf("pack request hash %x - finished responding with hash %x")
|
||||
s.Debugf("pack request hash %x - finished responding with hash %x", theirHash, ourHash)
|
||||
s.sendInternalMsgLocked(reply, "", nil, []byte{})
|
||||
}
|
||||
}); err != nil {
|
||||
@@ -3039,7 +3045,18 @@ func (dr *DirAccResolver) Start(s *Server) error {
|
||||
}
|
||||
|
||||
func (dr *DirAccResolver) Fetch(name string) (string, error) {
|
||||
return dr.LoadAcc(name)
|
||||
if theJWT, err := dr.LoadAcc(name); theJWT != "" {
|
||||
return theJWT, nil
|
||||
} else {
|
||||
dr.Lock()
|
||||
srv := dr.Server
|
||||
dr.Unlock()
|
||||
if srv == nil {
|
||||
return "", err
|
||||
} else {
|
||||
return srv.fetch(dr, name) // lookup from other server
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (dr *DirAccResolver) Store(name, jwt string) error {
|
||||
@@ -3057,33 +3074,27 @@ func NewDirAccResolver(path string, limit int64, syncInterval time.Duration) (*D
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &DirAccResolver{store, syncInterval}, nil
|
||||
return &DirAccResolver{store, nil, syncInterval}, nil
|
||||
}
|
||||
|
||||
// Caching resolver using nats for lookups and making use of a directory for storage
|
||||
type CacheDirAccResolver struct {
|
||||
DirAccResolver
|
||||
*Server
|
||||
ttl time.Duration
|
||||
}
|
||||
|
||||
func (dr *CacheDirAccResolver) Fetch(name string) (string, error) {
|
||||
if theJWT, _ := dr.LoadAcc(name); theJWT != "" {
|
||||
return theJWT, nil
|
||||
}
|
||||
// lookup from other server
|
||||
s := dr.Server
|
||||
func (s *Server) fetch(res AccountResolver, name string) (string, error) {
|
||||
if s == nil {
|
||||
return "", ErrNoAccountResolver
|
||||
}
|
||||
respC := make(chan []byte, 1)
|
||||
accountLookupRequest := fmt.Sprintf(accLookupReqSubj, name)
|
||||
s.mu.Lock()
|
||||
replySubj := s.newRespInbox()
|
||||
if s.sys == nil || s.sys.replies == nil {
|
||||
s.mu.Unlock()
|
||||
return "", fmt.Errorf("eventing shut down")
|
||||
}
|
||||
replySubj := s.newRespInbox()
|
||||
replies := s.sys.replies
|
||||
// Store our handler.
|
||||
replies[replySubj] = func(sub *subscription, _ *client, subject, _ string, msg []byte) {
|
||||
@@ -3091,7 +3102,10 @@ func (dr *CacheDirAccResolver) Fetch(name string) (string, error) {
|
||||
copy(clone, msg)
|
||||
s.mu.Lock()
|
||||
if _, ok := replies[replySubj]; ok {
|
||||
respC <- clone // only send if there is still interest
|
||||
select {
|
||||
case respC <- clone: // only use first response and only if there is still interest
|
||||
default:
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
@@ -3106,7 +3120,7 @@ func (dr *CacheDirAccResolver) Fetch(name string) (string, error) {
|
||||
case <-time.After(fetchTimeout):
|
||||
err = errors.New("fetching jwt timed out")
|
||||
case m := <-respC:
|
||||
if err = dr.Store(name, string(m)); err == nil {
|
||||
if err = res.Store(name, string(m)); err == nil {
|
||||
theJWT = string(m)
|
||||
}
|
||||
}
|
||||
@@ -3125,7 +3139,7 @@ func NewCacheDirAccResolver(path string, limit int64, ttl time.Duration) (*Cache
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &CacheDirAccResolver{DirAccResolver{store, 0}, nil, ttl}, nil
|
||||
return &CacheDirAccResolver{DirAccResolver{store, nil, 0}, ttl}, nil
|
||||
}
|
||||
|
||||
func (dr *CacheDirAccResolver) Start(s *Server) error {
|
||||
@@ -3172,7 +3186,3 @@ func (dr *CacheDirAccResolver) Start(s *Server) error {
|
||||
s.Noticef("Managing some jwt in exclusive directory %s", dr.directory)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dr *CacheDirAccResolver) Reload() error {
|
||||
return dr.DirAccResolver.Reload()
|
||||
}
|
||||
|
||||
@@ -261,11 +261,11 @@ func (store *DirJWTStore) PackWalk(maxJWTs int, cb func(partialPackMsg string))
|
||||
exp := store.expiration
|
||||
store.Unlock()
|
||||
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
|
||||
if !info.IsDir() && strings.HasSuffix(path, fileExtension) { // this is a JWT
|
||||
if info != nil && !info.IsDir() && strings.HasSuffix(path, fileExtension) { // this is a JWT
|
||||
pubKey := strings.TrimSuffix(filepath.Base(path), fileExtension)
|
||||
store.Lock()
|
||||
if exp != nil {
|
||||
if _, ok := store.expiration.idx[pubKey]; !ok {
|
||||
if _, ok := exp.idx[pubKey]; !ok {
|
||||
store.Unlock()
|
||||
return nil // only include indexed files
|
||||
}
|
||||
|
||||
@@ -3183,11 +3183,10 @@ func TestExpiredUserCredentialsRenewal(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
origEventsHBInterval := eventsHBInterval
|
||||
eventsHBInterval = 50 * time.Millisecond // speed up eventing
|
||||
defer func() { eventsHBInterval = origEventsHBInterval }()
|
||||
func updateJwt(t *testing.T, url string, creds string, pubKey string, jwt string, respCnt int) int {
|
||||
t.Helper()
|
||||
require_NextMsg := func(sub *nats.Subscription) bool {
|
||||
t.Helper()
|
||||
msg := natsNexMsg(t, sub, time.Second)
|
||||
content := make(map[string]interface{})
|
||||
json.Unmarshal(msg.Data, &content)
|
||||
@@ -3196,23 +3195,68 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
}
|
||||
return false
|
||||
}
|
||||
require_FileAbsent := func(dir string, pub string) {
|
||||
t.Helper()
|
||||
_, err := os.Stat(filepath.Join(dir, pub+".jwt"))
|
||||
require_Error(t, err)
|
||||
require_True(t, os.IsNotExist(err))
|
||||
}
|
||||
require_FilePresent := func(dir string, pub string) {
|
||||
t.Helper()
|
||||
_, err := os.Stat(filepath.Join(dir, pub+".jwt"))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
require_FileEqual := func(dir string, pub string, jwt string) {
|
||||
t.Helper()
|
||||
content, err := ioutil.ReadFile(filepath.Join(dir, pub+".jwt"))
|
||||
require_NoError(t, err)
|
||||
require_Equal(t, string(content), jwt)
|
||||
c := natsConnect(t, url, nats.UserCredentials(creds),
|
||||
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
|
||||
if err != nil {
|
||||
t.Fatal("error not expected in this test", err)
|
||||
}
|
||||
}),
|
||||
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
|
||||
t.Fatal("error not expected in this test", err)
|
||||
}),
|
||||
)
|
||||
defer c.Close()
|
||||
resp := c.NewRespInbox()
|
||||
sub := natsSubSync(t, c, resp)
|
||||
err := sub.AutoUnsubscribe(respCnt)
|
||||
require_NoError(t, err)
|
||||
require_NoError(t, c.PublishRequest(fmt.Sprintf(accUpdateEventSubjNew, pubKey), resp, []byte(jwt)))
|
||||
passCnt := 0
|
||||
for i := 0; i < respCnt; i++ {
|
||||
if require_NextMsg(sub) {
|
||||
passCnt++
|
||||
}
|
||||
}
|
||||
return passCnt
|
||||
}
|
||||
|
||||
func require_JWTAbsent(t *testing.T, dir string, pub string) {
|
||||
t.Helper()
|
||||
_, err := os.Stat(filepath.Join(dir, pub+".jwt"))
|
||||
require_Error(t, err)
|
||||
require_True(t, os.IsNotExist(err))
|
||||
}
|
||||
|
||||
func require_JWTPresent(t *testing.T, dir string, pub string) {
|
||||
t.Helper()
|
||||
_, err := os.Stat(filepath.Join(dir, pub+".jwt"))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
func require_JWTEqual(t *testing.T, dir string, pub string, jwt string) {
|
||||
t.Helper()
|
||||
content, err := ioutil.ReadFile(filepath.Join(dir, pub+".jwt"))
|
||||
require_NoError(t, err)
|
||||
require_Equal(t, string(content), jwt)
|
||||
}
|
||||
|
||||
func createDir(t *testing.T, prefix string) string {
|
||||
t.Helper()
|
||||
dir, err := ioutil.TempDir("", prefix)
|
||||
require_NoError(t, err)
|
||||
return dir
|
||||
}
|
||||
|
||||
func writeJWT(t *testing.T, dir string, pub string, jwt string) {
|
||||
t.Helper()
|
||||
err := ioutil.WriteFile(filepath.Join(dir, pub+".jwt"), []byte(jwt), 0644)
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
origEventsHBInterval := eventsHBInterval
|
||||
eventsHBInterval = 50 * time.Millisecond // speed up eventing
|
||||
defer func() { eventsHBInterval = origEventsHBInterval }()
|
||||
require_NoLocalOrRemoteConnections := func(account string, srvs ...*Server) {
|
||||
t.Helper()
|
||||
for _, srv := range srvs {
|
||||
@@ -3253,17 +3297,6 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
}()
|
||||
require_NoLocalOrRemoteConnections(acc, srvs...)
|
||||
}
|
||||
writeFile := func(dir string, pub string, jwt string) {
|
||||
t.Helper()
|
||||
err := ioutil.WriteFile(filepath.Join(dir, pub+".jwt"), []byte(jwt), 0644)
|
||||
require_NoError(t, err)
|
||||
}
|
||||
createDir := func(prefix string) string {
|
||||
t.Helper()
|
||||
dir, err := ioutil.TempDir("", prefix)
|
||||
require_NoError(t, err)
|
||||
return dir
|
||||
}
|
||||
connect := func(url string, credsfile string, acc string, srvs ...*Server) {
|
||||
t.Helper()
|
||||
nc := natsConnect(t, url, nats.UserCredentials(credsfile))
|
||||
@@ -3299,36 +3332,6 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
*creds = genCredsFile(t, ujwt, seed)
|
||||
done <- struct{}{}
|
||||
}
|
||||
updateJwt := func(url string, creds string, pubKey string, jwt string) int {
|
||||
t.Helper()
|
||||
c := natsConnect(t, url, nats.UserCredentials(creds),
|
||||
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
|
||||
if err != nil {
|
||||
t.Fatal("error not expected in this test", err)
|
||||
}
|
||||
}),
|
||||
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
|
||||
t.Fatal("error not expected in this test", err)
|
||||
}),
|
||||
)
|
||||
defer c.Close()
|
||||
resp := c.NewRespInbox()
|
||||
sub := natsSubSync(t, c, resp)
|
||||
err := sub.AutoUnsubscribe(3)
|
||||
require_NoError(t, err)
|
||||
require_NoError(t, c.PublishRequest(fmt.Sprintf(accUpdateEventSubjNew, pubKey), resp, []byte(jwt)))
|
||||
passCnt := 0
|
||||
if require_NextMsg(sub) {
|
||||
passCnt++
|
||||
}
|
||||
if require_NextMsg(sub) {
|
||||
passCnt++
|
||||
}
|
||||
if require_NextMsg(sub) {
|
||||
passCnt++
|
||||
}
|
||||
return passCnt
|
||||
}
|
||||
// Create Accounts and corresponding user creds. Do so concurrently to speed up the test
|
||||
doneChan := make(chan struct{}, 5)
|
||||
defer close(doneChan)
|
||||
@@ -3342,30 +3345,28 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
go createAccountAndUser(true, doneChan, &cpub, &cjwt1, &cjwt2, &cCreds)
|
||||
var dpub, djwt1, dummy2, dCreds string // extra user used later in the test in order to test limits
|
||||
go createAccountAndUser(true, doneChan, &dpub, &djwt1, &dummy2, &dCreds)
|
||||
<-doneChan
|
||||
<-doneChan
|
||||
<-doneChan
|
||||
<-doneChan
|
||||
<-doneChan
|
||||
for i := 0; i < cap(doneChan); i++ {
|
||||
<-doneChan
|
||||
}
|
||||
defer os.Remove(sysCreds)
|
||||
defer os.Remove(aCreds)
|
||||
defer os.Remove(bCreds)
|
||||
defer os.Remove(cCreds)
|
||||
defer os.Remove(dCreds)
|
||||
// Create one directory for each server
|
||||
dirA := createDir("srv-a")
|
||||
dirA := createDir(t, "srv-a")
|
||||
defer os.RemoveAll(dirA)
|
||||
dirB := createDir("srv-b")
|
||||
dirB := createDir(t, "srv-b")
|
||||
defer os.RemoveAll(dirB)
|
||||
dirC := createDir("srv-c")
|
||||
dirC := createDir(t, "srv-c")
|
||||
defer os.RemoveAll(dirC)
|
||||
// simulate a restart of the server by storing files in them
|
||||
// Server A/B will completely sync, so after startup each server
|
||||
// will contain the union off all stored/configured jwt
|
||||
// Server C will send out lookup requests for jwt it does not store itself
|
||||
writeFile(dirA, apub, ajwt1)
|
||||
writeFile(dirB, bpub, bjwt1)
|
||||
writeFile(dirC, cpub, cjwt1)
|
||||
writeJWT(t, dirA, apub, ajwt1)
|
||||
writeJWT(t, dirB, bpub, bjwt1)
|
||||
writeJWT(t, dirC, cpub, cjwt1)
|
||||
// Create seed server A (using no_advertise to prevent fail over)
|
||||
confA := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
@@ -3391,7 +3392,7 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
sA, _ := RunServerWithConfig(confA)
|
||||
defer sA.Shutdown()
|
||||
// during startup resolver_preload causes the directory to contain data
|
||||
require_FilePresent(dirA, cpub)
|
||||
require_JWTPresent(t, dirA, cpub)
|
||||
// Create Server B (using no_advertise to prevent fail over)
|
||||
confB := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
@@ -3445,35 +3446,35 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
checkClusterFormed(t, sA, sB, sC)
|
||||
time.Sleep(500 * time.Millisecond) // wait for the protocol to converge
|
||||
// Check all accounts
|
||||
require_FilePresent(dirA, apub) // was already present on startup
|
||||
require_FilePresent(dirB, apub) // was copied from server A
|
||||
require_FileAbsent(dirC, apub)
|
||||
require_FilePresent(dirA, bpub) // was copied from server B
|
||||
require_FilePresent(dirB, bpub) // was already present on startup
|
||||
require_FileAbsent(dirC, bpub)
|
||||
require_FilePresent(dirA, cpub) // was present in preload
|
||||
require_FilePresent(dirB, cpub) // was copied from server A
|
||||
require_FilePresent(dirC, cpub) // was already present on startup
|
||||
require_JWTPresent(t, dirA, apub) // was already present on startup
|
||||
require_JWTPresent(t, dirB, apub) // was copied from server A
|
||||
require_JWTAbsent(t, dirC, apub)
|
||||
require_JWTPresent(t, dirA, bpub) // was copied from server B
|
||||
require_JWTPresent(t, dirB, bpub) // was already present on startup
|
||||
require_JWTAbsent(t, dirC, bpub)
|
||||
require_JWTPresent(t, dirA, cpub) // was present in preload
|
||||
require_JWTPresent(t, dirB, cpub) // was copied from server A
|
||||
require_JWTPresent(t, dirC, cpub) // was already present on startup
|
||||
// This is to test that connecting to it still works
|
||||
require_FileAbsent(dirA, syspub)
|
||||
require_FileAbsent(dirB, syspub)
|
||||
require_FileAbsent(dirC, syspub)
|
||||
require_JWTAbsent(t, dirA, syspub)
|
||||
require_JWTAbsent(t, dirB, syspub)
|
||||
require_JWTAbsent(t, dirC, syspub)
|
||||
// system account client can connect to every server
|
||||
connect(sA.ClientURL(), sysCreds, "")
|
||||
connect(sB.ClientURL(), sysCreds, "")
|
||||
connect(sC.ClientURL(), sysCreds, "")
|
||||
checkClusterFormed(t, sA, sB, sC)
|
||||
// upload system account and require a response from each server
|
||||
passCnt := updateJwt(sA.ClientURL(), sysCreds, syspub, sysjwt)
|
||||
passCnt := updateJwt(t, sA.ClientURL(), sysCreds, syspub, sysjwt, 3)
|
||||
require_True(t, passCnt == 3)
|
||||
require_FilePresent(dirA, syspub) // was just received
|
||||
require_FilePresent(dirB, syspub) // was just received
|
||||
require_FilePresent(dirC, syspub) // was just received
|
||||
require_JWTPresent(t, dirA, syspub) // was just received
|
||||
require_JWTPresent(t, dirB, syspub) // was just received
|
||||
require_JWTPresent(t, dirC, syspub) // was just received
|
||||
// Only files missing are in C, which is only caching
|
||||
connect(sC.ClientURL(), aCreds, apub, sA, sB, sC)
|
||||
connect(sC.ClientURL(), bCreds, bpub, sA, sB, sC)
|
||||
require_FilePresent(dirC, apub) // was looked up form A or B
|
||||
require_FilePresent(dirC, bpub) // was looked up from A or B
|
||||
require_JWTPresent(t, dirC, apub) // was looked up form A or B
|
||||
require_JWTPresent(t, dirC, bpub) // was looked up from A or B
|
||||
// Check limits and update jwt B connecting to server A
|
||||
for port, v := range map[string]struct{ pub, jwt, creds string }{
|
||||
sB.ClientURL(): {bpub, bjwt2, bCreds},
|
||||
@@ -3482,19 +3483,19 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
require_1Connection(sA.ClientURL(), v.creds, v.pub, sA, sB, sC)
|
||||
require_1Connection(sB.ClientURL(), v.creds, v.pub, sA, sB, sC)
|
||||
require_1Connection(sC.ClientURL(), v.creds, v.pub, sA, sB, sC)
|
||||
passCnt := updateJwt(port, sysCreds, v.pub, v.jwt)
|
||||
passCnt := updateJwt(t, port, sysCreds, v.pub, v.jwt, 3)
|
||||
require_True(t, passCnt == 3)
|
||||
require_2Connection(sA.ClientURL(), v.creds, v.pub, sA, sB, sC)
|
||||
require_2Connection(sB.ClientURL(), v.creds, v.pub, sA, sB, sC)
|
||||
require_2Connection(sC.ClientURL(), v.creds, v.pub, sA, sB, sC)
|
||||
require_FileEqual(dirA, v.pub, v.jwt)
|
||||
require_FileEqual(dirB, v.pub, v.jwt)
|
||||
require_FileEqual(dirC, v.pub, v.jwt)
|
||||
require_JWTEqual(t, dirA, v.pub, v.jwt)
|
||||
require_JWTEqual(t, dirB, v.pub, v.jwt)
|
||||
require_JWTEqual(t, dirC, v.pub, v.jwt)
|
||||
}
|
||||
// Simulates A having missed an update
|
||||
// shutting B down as it has it will directly connect to A and connect right away
|
||||
sB.Shutdown()
|
||||
writeFile(dirB, apub, ajwt2) // this will be copied to server A
|
||||
writeJWT(t, dirB, apub, ajwt2) // this will be copied to server A
|
||||
sB, _ = RunServerWithConfig(confB)
|
||||
defer sB.Shutdown()
|
||||
checkClusterFormed(t, sA, sB, sC)
|
||||
@@ -3502,9 +3503,9 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
// Restart server C. this is a workaround to force C to do a lookup in the absence of account cleanup
|
||||
sC.Shutdown()
|
||||
sC, _ = RunServerWithConfig(confClongTTL) //TODO remove this once we clean up accounts
|
||||
require_FileEqual(dirA, apub, ajwt2) // was copied from server B
|
||||
require_FileEqual(dirB, apub, ajwt2) // was restarted with this
|
||||
require_FileEqual(dirC, apub, ajwt1) // still contains old cached value
|
||||
require_JWTEqual(t, dirA, apub, ajwt2) // was copied from server B
|
||||
require_JWTEqual(t, dirB, apub, ajwt2) // was restarted with this
|
||||
require_JWTEqual(t, dirC, apub, ajwt1) // still contains old cached value
|
||||
require_2Connection(sA.ClientURL(), aCreds, apub, sA, sB, sC)
|
||||
require_2Connection(sB.ClientURL(), aCreds, apub, sA, sB, sC)
|
||||
require_1Connection(sC.ClientURL(), aCreds, apub, sA, sB, sC)
|
||||
@@ -3512,7 +3513,7 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
sC.Shutdown()
|
||||
sC, _ = RunServerWithConfig(confCshortTTL) //TODO remove this once we clean up accounts
|
||||
defer sC.Shutdown()
|
||||
require_FileEqual(dirC, apub, ajwt1) // still contains old cached value
|
||||
require_JWTEqual(t, dirC, apub, ajwt1) // still contains old cached value
|
||||
checkClusterFormed(t, sA, sB, sC)
|
||||
// Force next connect to do a lookup exceeds ttl
|
||||
fname := filepath.Join(dirC, apub+".jwt")
|
||||
@@ -3524,14 +3525,226 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
return fmt.Errorf("File not removed in time")
|
||||
})
|
||||
connect(sC.ClientURL(), aCreds, apub, sA, sB, sC) // When lookup happens
|
||||
require_FileEqual(dirC, apub, ajwt2) // was looked up form A or B
|
||||
require_JWTEqual(t, dirC, apub, ajwt2) // was looked up form A or B
|
||||
require_2Connection(sC.ClientURL(), aCreds, apub, sA, sB, sC)
|
||||
// Test exceeding limit. For the exclusive directory resolver, limit is a stop gap measure.
|
||||
// It is not expected to be hit. When hit the administrator is supposed to take action.
|
||||
passCnt = updateJwt(sA.ClientURL(), sysCreds, dpub, djwt1)
|
||||
passCnt = updateJwt(t, sA.ClientURL(), sysCreds, dpub, djwt1, 3)
|
||||
require_True(t, passCnt == 1) // Only Server C updated
|
||||
}
|
||||
|
||||
func TestAccountNATSResolverCrossClusterFetch(t *testing.T) {
|
||||
connect := func(url string, credsfile string) {
|
||||
t.Helper()
|
||||
nc := natsConnect(t, url, nats.UserCredentials(credsfile))
|
||||
nc.Close()
|
||||
}
|
||||
createAccountAndUser := func(done chan struct{}, pubKey, jwt1, jwt2, creds *string) {
|
||||
t.Helper()
|
||||
kp, _ := nkeys.CreateAccount()
|
||||
*pubKey, _ = kp.PublicKey()
|
||||
claim := jwt.NewAccountClaims(*pubKey)
|
||||
var err error
|
||||
*jwt1, err = claim.Encode(oKp)
|
||||
require_NoError(t, err)
|
||||
// need to assure that create time differs (resolution is sec)
|
||||
time.Sleep(time.Millisecond * 1100)
|
||||
// create updated claim
|
||||
claim.Tags.Add("tag")
|
||||
*jwt2, err = claim.Encode(oKp)
|
||||
require_NoError(t, err)
|
||||
ukp, _ := nkeys.CreateUser()
|
||||
seed, _ := ukp.Seed()
|
||||
upub, _ := ukp.PublicKey()
|
||||
uclaim := newJWTTestUserClaims()
|
||||
uclaim.Subject = upub
|
||||
ujwt, err := uclaim.Encode(kp)
|
||||
require_NoError(t, err)
|
||||
*creds = genCredsFile(t, ujwt, seed)
|
||||
done <- struct{}{}
|
||||
}
|
||||
// Create Accounts and corresponding user creds. Do so concurrently to speed up the test
|
||||
doneChan := make(chan struct{}, 3)
|
||||
defer close(doneChan)
|
||||
var syspub, sysjwt, dummy1, sysCreds string
|
||||
go createAccountAndUser(doneChan, &syspub, &sysjwt, &dummy1, &sysCreds)
|
||||
var apub, ajwt1, ajwt2, aCreds string
|
||||
go createAccountAndUser(doneChan, &apub, &ajwt1, &ajwt2, &aCreds)
|
||||
var bpub, bjwt1, bjwt2, bCreds string
|
||||
go createAccountAndUser(doneChan, &bpub, &bjwt1, &bjwt2, &bCreds)
|
||||
for i := 0; i < cap(doneChan); i++ {
|
||||
<-doneChan
|
||||
}
|
||||
defer os.Remove(sysCreds)
|
||||
defer os.Remove(aCreds)
|
||||
defer os.Remove(bCreds)
|
||||
// Create one directory for each server
|
||||
dirAA := createDir(t, "srv-a-a")
|
||||
defer os.RemoveAll(dirAA)
|
||||
dirAB := createDir(t, "srv-a-b")
|
||||
defer os.RemoveAll(dirAB)
|
||||
dirBA := createDir(t, "srv-b-a")
|
||||
defer os.RemoveAll(dirBA)
|
||||
dirBB := createDir(t, "srv-b-b")
|
||||
defer os.RemoveAll(dirBB)
|
||||
// simulate a restart of the server by storing files in them
|
||||
// Server AA & AB will completely sync
|
||||
// Server BA & BB will completely sync
|
||||
// Be aware that no syncing will occur between cluster
|
||||
writeJWT(t, dirAA, apub, ajwt1)
|
||||
writeJWT(t, dirBA, bpub, bjwt1)
|
||||
// Create seed server A (using no_advertise to prevent fail over)
|
||||
confAA := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
server_name: srv-A-A
|
||||
operator: %s
|
||||
system_account: %s
|
||||
resolver: {
|
||||
type: full
|
||||
dir: %s
|
||||
interval: "200ms"
|
||||
}
|
||||
gateway: {
|
||||
name: "clust-A"
|
||||
listen: -1
|
||||
}
|
||||
cluster {
|
||||
name: clust-A
|
||||
listen: -1
|
||||
no_advertise: true
|
||||
}
|
||||
`, ojwt, syspub, dirAA)))
|
||||
defer os.Remove(confAA)
|
||||
sAA, _ := RunServerWithConfig(confAA)
|
||||
defer sAA.Shutdown()
|
||||
// Create Server B (using no_advertise to prevent fail over)
|
||||
confAB := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
server_name: srv-A-B
|
||||
operator: %s
|
||||
system_account: %s
|
||||
resolver: {
|
||||
type: full
|
||||
dir: %s
|
||||
interval: "200ms"
|
||||
}
|
||||
gateway: {
|
||||
name: "clust-A"
|
||||
listen: -1
|
||||
}
|
||||
cluster {
|
||||
name: clust-A
|
||||
listen: -1
|
||||
no_advertise: true
|
||||
routes [
|
||||
nats-route://localhost:%d
|
||||
]
|
||||
}
|
||||
`, ojwt, syspub, dirAB, sAA.opts.Cluster.Port)))
|
||||
defer os.Remove(confAB)
|
||||
sAB, _ := RunServerWithConfig(confAB)
|
||||
defer sAB.Shutdown()
|
||||
// Create Server C (using no_advertise to prevent fail over)
|
||||
confBA := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
server_name: srv-B-A
|
||||
operator: %s
|
||||
system_account: %s
|
||||
resolver: {
|
||||
type: full
|
||||
dir: %s
|
||||
interval: "200ms"
|
||||
}
|
||||
gateway: {
|
||||
name: "clust-B"
|
||||
listen: -1
|
||||
gateways: [
|
||||
{name: "clust-A", url: "nats://localhost:%d"},
|
||||
]
|
||||
}
|
||||
cluster {
|
||||
name: clust-B
|
||||
listen: -1
|
||||
no_advertise: true
|
||||
}
|
||||
`, ojwt, syspub, dirBA, sAA.opts.Gateway.Port)))
|
||||
defer os.Remove(confBA)
|
||||
sBA, _ := RunServerWithConfig(confBA)
|
||||
defer sBA.Shutdown()
|
||||
// Create Sever BA (using no_advertise to prevent fail over)
|
||||
confBB := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
server_name: srv-B-B
|
||||
operator: %s
|
||||
system_account: %s
|
||||
resolver: {
|
||||
type: full
|
||||
dir: %s
|
||||
interval: "200ms"
|
||||
}
|
||||
cluster {
|
||||
name: clust-B
|
||||
listen: -1
|
||||
no_advertise: true
|
||||
routes [
|
||||
nats-route://localhost:%d
|
||||
]
|
||||
}
|
||||
gateway: {
|
||||
name: "clust-B"
|
||||
listen: -1
|
||||
gateways: [
|
||||
{name: "clust-A", url: "nats://localhost:%d"},
|
||||
]
|
||||
}
|
||||
`, ojwt, syspub, dirBB, sBA.opts.Cluster.Port, sAA.opts.Cluster.Port)))
|
||||
defer os.Remove(confBB)
|
||||
sBB, _ := RunServerWithConfig(confBB)
|
||||
defer sBB.Shutdown()
|
||||
// Assert topology
|
||||
checkClusterFormed(t, sAA, sAB)
|
||||
checkClusterFormed(t, sBA, sBB)
|
||||
waitForOutboundGateways(t, sAA, 1, 5*time.Second)
|
||||
waitForOutboundGateways(t, sAB, 1, 5*time.Second)
|
||||
waitForOutboundGateways(t, sBA, 1, 5*time.Second)
|
||||
waitForOutboundGateways(t, sBB, 1, 5*time.Second)
|
||||
time.Sleep(500 * time.Millisecond) // wait for the protocol to converge
|
||||
updateJwt(t, sAA.ClientURL(), sysCreds, syspub, sysjwt, 4) // update system account jwt on all server
|
||||
require_JWTEqual(t, dirAA, syspub, sysjwt) // assure this update made it to every server
|
||||
require_JWTEqual(t, dirAB, syspub, sysjwt) // assure this update made it to every server
|
||||
require_JWTEqual(t, dirBA, syspub, sysjwt) // assure this update made it to every server
|
||||
require_JWTEqual(t, dirBB, syspub, sysjwt) // assure this update made it to every server
|
||||
require_JWTAbsent(t, dirAA, bpub) // assure that jwt are not synced across cluster
|
||||
require_JWTAbsent(t, dirAB, bpub) // assure that jwt are not synced across cluster
|
||||
require_JWTAbsent(t, dirBA, apub) // assure that jwt are not synced across cluster
|
||||
require_JWTAbsent(t, dirBB, apub) // assure that jwt are not synced across cluster
|
||||
connect(sAA.ClientURL(), aCreds) // connect to cluster where jwt was initially stored
|
||||
connect(sAB.ClientURL(), aCreds) // connect to cluster where jwt was initially stored
|
||||
connect(sBA.ClientURL(), bCreds) // connect to cluster where jwt was initially stored
|
||||
connect(sBB.ClientURL(), bCreds) // connect to cluster where jwt was initially stored
|
||||
time.Sleep(500 * time.Millisecond) // wait for the protocol to (NOT) converge
|
||||
require_JWTAbsent(t, dirAA, bpub) // assure that jwt are still not synced across cluster
|
||||
require_JWTAbsent(t, dirAB, bpub) // assure that jwt are still not synced across cluster
|
||||
require_JWTAbsent(t, dirBA, apub) // assure that jwt are still not synced across cluster
|
||||
require_JWTAbsent(t, dirBB, apub) // assure that jwt are still not synced across cluster
|
||||
// We have verified that account B does not exist in cluster A, neither does account A in cluster B
|
||||
// Despite that clients from account B can connect to server A, same for account A in cluster B
|
||||
connect(sAA.ClientURL(), bCreds) // connect to cluster where jwt was not initially stored
|
||||
connect(sAB.ClientURL(), bCreds) // connect to cluster where jwt was not initially stored
|
||||
connect(sBA.ClientURL(), aCreds) // connect to cluster where jwt was not initially stored
|
||||
connect(sBB.ClientURL(), aCreds) // connect to cluster where jwt was not initially stored
|
||||
require_JWTEqual(t, dirAA, bpub, bjwt1) // assure that now jwt used in connect is stored
|
||||
require_JWTEqual(t, dirAB, bpub, bjwt1) // assure that now jwt used in connect is stored
|
||||
require_JWTEqual(t, dirBA, apub, ajwt1) // assure that now jwt used in connect is stored
|
||||
require_JWTEqual(t, dirBB, apub, ajwt1) // assure that now jwt used in connect is stored
|
||||
updateJwt(t, sAA.ClientURL(), sysCreds, bpub, bjwt2, 4) // update bjwt, expect updates from everywhere
|
||||
updateJwt(t, sBA.ClientURL(), sysCreds, apub, ajwt2, 4) // update ajwt, expect updates from everywhere
|
||||
require_JWTEqual(t, dirAA, bpub, bjwt2) // assure that jwt got updated accordingly
|
||||
require_JWTEqual(t, dirAB, bpub, bjwt2) // assure that jwt got updated accordingly
|
||||
require_JWTEqual(t, dirBA, apub, ajwt2) // assure that jwt got updated accordingly
|
||||
require_JWTEqual(t, dirBB, apub, ajwt2) // assure that jwt got updated accordingly
|
||||
}
|
||||
|
||||
func newTimeRange(start time.Time, dur time.Duration) jwt.TimeRange {
|
||||
return jwt.TimeRange{Start: start.Format("15:04:05"), End: start.Add(dur).Format("15:04:05")}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user