mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 18:20:42 -07:00
Merge pull request #1616 from nats-io/nats-resolver-gateway
Adding fetch on missing jwt of full nats based resolver
This commit is contained in:
@@ -2892,6 +2892,7 @@ func (ur *URLAccResolver) Fetch(name string) (string, error) {
|
||||
// Resolver based on nats for synchronization and backing directory for storage.
|
||||
type DirAccResolver struct {
|
||||
*DirJWTStore
|
||||
*Server
|
||||
syncInterval time.Duration
|
||||
}
|
||||
|
||||
@@ -2899,6 +2900,10 @@ func (dr *DirAccResolver) IsTrackingUpdate() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (dr *DirAccResolver) Reload() error {
|
||||
return dr.DirJWTStore.Reload()
|
||||
}
|
||||
|
||||
func respondToUpdate(s *Server, respSubj string, acc string, message string, err error) {
|
||||
if err == nil {
|
||||
s.Debugf("%s - %s", message, acc)
|
||||
@@ -2929,6 +2934,7 @@ func respondToUpdate(s *Server, respSubj string, acc string, message string, err
|
||||
func (dr *DirAccResolver) Start(s *Server) error {
|
||||
dr.Lock()
|
||||
defer dr.Unlock()
|
||||
dr.Server = s
|
||||
dr.DirJWTStore.changed = func(pubKey string) {
|
||||
if v, ok := s.accounts.Load(pubKey); !ok {
|
||||
} else if jwt, err := dr.LoadAcc(pubKey); err != nil {
|
||||
@@ -2998,7 +3004,7 @@ func (dr *DirAccResolver) Start(s *Server) error {
|
||||
// let them timeout
|
||||
s.Errorf("pack request error: %v", err)
|
||||
} else {
|
||||
s.Debugf("pack request hash %x - finished responding with hash %x")
|
||||
s.Debugf("pack request hash %x - finished responding with hash %x", theirHash, ourHash)
|
||||
s.sendInternalMsgLocked(reply, "", nil, []byte{})
|
||||
}
|
||||
}); err != nil {
|
||||
@@ -3039,7 +3045,17 @@ func (dr *DirAccResolver) Start(s *Server) error {
|
||||
}
|
||||
|
||||
func (dr *DirAccResolver) Fetch(name string) (string, error) {
|
||||
return dr.LoadAcc(name)
|
||||
if theJWT, err := dr.LoadAcc(name); theJWT != "" {
|
||||
return theJWT, nil
|
||||
} else {
|
||||
dr.Lock()
|
||||
srv := dr.Server
|
||||
dr.Unlock()
|
||||
if srv == nil {
|
||||
return "", err
|
||||
}
|
||||
return srv.fetch(dr, name) // lookup from other server
|
||||
}
|
||||
}
|
||||
|
||||
func (dr *DirAccResolver) Store(name, jwt string) error {
|
||||
@@ -3057,33 +3073,27 @@ func NewDirAccResolver(path string, limit int64, syncInterval time.Duration) (*D
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &DirAccResolver{store, syncInterval}, nil
|
||||
return &DirAccResolver{store, nil, syncInterval}, nil
|
||||
}
|
||||
|
||||
// Caching resolver using nats for lookups and making use of a directory for storage
|
||||
type CacheDirAccResolver struct {
|
||||
DirAccResolver
|
||||
*Server
|
||||
ttl time.Duration
|
||||
}
|
||||
|
||||
func (dr *CacheDirAccResolver) Fetch(name string) (string, error) {
|
||||
if theJWT, _ := dr.LoadAcc(name); theJWT != "" {
|
||||
return theJWT, nil
|
||||
}
|
||||
// lookup from other server
|
||||
s := dr.Server
|
||||
func (s *Server) fetch(res AccountResolver, name string) (string, error) {
|
||||
if s == nil {
|
||||
return "", ErrNoAccountResolver
|
||||
}
|
||||
respC := make(chan []byte, 1)
|
||||
accountLookupRequest := fmt.Sprintf(accLookupReqSubj, name)
|
||||
s.mu.Lock()
|
||||
replySubj := s.newRespInbox()
|
||||
if s.sys == nil || s.sys.replies == nil {
|
||||
s.mu.Unlock()
|
||||
return "", fmt.Errorf("eventing shut down")
|
||||
}
|
||||
replySubj := s.newRespInbox()
|
||||
replies := s.sys.replies
|
||||
// Store our handler.
|
||||
replies[replySubj] = func(sub *subscription, _ *client, subject, _ string, msg []byte) {
|
||||
@@ -3091,7 +3101,10 @@ func (dr *CacheDirAccResolver) Fetch(name string) (string, error) {
|
||||
copy(clone, msg)
|
||||
s.mu.Lock()
|
||||
if _, ok := replies[replySubj]; ok {
|
||||
respC <- clone // only send if there is still interest
|
||||
select {
|
||||
case respC <- clone: // only use first response and only if there is still interest
|
||||
default:
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
@@ -3106,7 +3119,7 @@ func (dr *CacheDirAccResolver) Fetch(name string) (string, error) {
|
||||
case <-time.After(fetchTimeout):
|
||||
err = errors.New("fetching jwt timed out")
|
||||
case m := <-respC:
|
||||
if err = dr.Store(name, string(m)); err == nil {
|
||||
if err = res.Store(name, string(m)); err == nil {
|
||||
theJWT = string(m)
|
||||
}
|
||||
}
|
||||
@@ -3125,7 +3138,7 @@ func NewCacheDirAccResolver(path string, limit int64, ttl time.Duration) (*Cache
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &CacheDirAccResolver{DirAccResolver{store, 0}, nil, ttl}, nil
|
||||
return &CacheDirAccResolver{DirAccResolver{store, nil, 0}, ttl}, nil
|
||||
}
|
||||
|
||||
func (dr *CacheDirAccResolver) Start(s *Server) error {
|
||||
@@ -3172,7 +3185,3 @@ func (dr *CacheDirAccResolver) Start(s *Server) error {
|
||||
s.Noticef("Managing some jwt in exclusive directory %s", dr.directory)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dr *CacheDirAccResolver) Reload() error {
|
||||
return dr.DirAccResolver.Reload()
|
||||
}
|
||||
|
||||
@@ -261,11 +261,11 @@ func (store *DirJWTStore) PackWalk(maxJWTs int, cb func(partialPackMsg string))
|
||||
exp := store.expiration
|
||||
store.Unlock()
|
||||
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
|
||||
if !info.IsDir() && strings.HasSuffix(path, fileExtension) { // this is a JWT
|
||||
if info != nil && !info.IsDir() && strings.HasSuffix(path, fileExtension) { // this is a JWT
|
||||
pubKey := strings.TrimSuffix(filepath.Base(path), fileExtension)
|
||||
store.Lock()
|
||||
if exp != nil {
|
||||
if _, ok := store.expiration.idx[pubKey]; !ok {
|
||||
if _, ok := exp.idx[pubKey]; !ok {
|
||||
store.Unlock()
|
||||
return nil // only include indexed files
|
||||
}
|
||||
|
||||
@@ -3183,11 +3183,10 @@ func TestExpiredUserCredentialsRenewal(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
origEventsHBInterval := eventsHBInterval
|
||||
eventsHBInterval = 50 * time.Millisecond // speed up eventing
|
||||
defer func() { eventsHBInterval = origEventsHBInterval }()
|
||||
func updateJwt(t *testing.T, url string, creds string, pubKey string, jwt string, respCnt int) int {
|
||||
t.Helper()
|
||||
require_NextMsg := func(sub *nats.Subscription) bool {
|
||||
t.Helper()
|
||||
msg := natsNexMsg(t, sub, time.Second)
|
||||
content := make(map[string]interface{})
|
||||
json.Unmarshal(msg.Data, &content)
|
||||
@@ -3196,23 +3195,68 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
}
|
||||
return false
|
||||
}
|
||||
require_FileAbsent := func(dir string, pub string) {
|
||||
t.Helper()
|
||||
_, err := os.Stat(filepath.Join(dir, pub+".jwt"))
|
||||
require_Error(t, err)
|
||||
require_True(t, os.IsNotExist(err))
|
||||
}
|
||||
require_FilePresent := func(dir string, pub string) {
|
||||
t.Helper()
|
||||
_, err := os.Stat(filepath.Join(dir, pub+".jwt"))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
require_FileEqual := func(dir string, pub string, jwt string) {
|
||||
t.Helper()
|
||||
content, err := ioutil.ReadFile(filepath.Join(dir, pub+".jwt"))
|
||||
require_NoError(t, err)
|
||||
require_Equal(t, string(content), jwt)
|
||||
c := natsConnect(t, url, nats.UserCredentials(creds),
|
||||
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
|
||||
if err != nil {
|
||||
t.Fatal("error not expected in this test", err)
|
||||
}
|
||||
}),
|
||||
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
|
||||
t.Fatal("error not expected in this test", err)
|
||||
}),
|
||||
)
|
||||
defer c.Close()
|
||||
resp := c.NewRespInbox()
|
||||
sub := natsSubSync(t, c, resp)
|
||||
err := sub.AutoUnsubscribe(respCnt)
|
||||
require_NoError(t, err)
|
||||
require_NoError(t, c.PublishRequest(fmt.Sprintf(accUpdateEventSubjNew, pubKey), resp, []byte(jwt)))
|
||||
passCnt := 0
|
||||
for i := 0; i < respCnt; i++ {
|
||||
if require_NextMsg(sub) {
|
||||
passCnt++
|
||||
}
|
||||
}
|
||||
return passCnt
|
||||
}
|
||||
|
||||
func require_JWTAbsent(t *testing.T, dir string, pub string) {
|
||||
t.Helper()
|
||||
_, err := os.Stat(filepath.Join(dir, pub+".jwt"))
|
||||
require_Error(t, err)
|
||||
require_True(t, os.IsNotExist(err))
|
||||
}
|
||||
|
||||
func require_JWTPresent(t *testing.T, dir string, pub string) {
|
||||
t.Helper()
|
||||
_, err := os.Stat(filepath.Join(dir, pub+".jwt"))
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
func require_JWTEqual(t *testing.T, dir string, pub string, jwt string) {
|
||||
t.Helper()
|
||||
content, err := ioutil.ReadFile(filepath.Join(dir, pub+".jwt"))
|
||||
require_NoError(t, err)
|
||||
require_Equal(t, string(content), jwt)
|
||||
}
|
||||
|
||||
func createDir(t *testing.T, prefix string) string {
|
||||
t.Helper()
|
||||
dir, err := ioutil.TempDir("", prefix)
|
||||
require_NoError(t, err)
|
||||
return dir
|
||||
}
|
||||
|
||||
func writeJWT(t *testing.T, dir string, pub string, jwt string) {
|
||||
t.Helper()
|
||||
err := ioutil.WriteFile(filepath.Join(dir, pub+".jwt"), []byte(jwt), 0644)
|
||||
require_NoError(t, err)
|
||||
}
|
||||
|
||||
func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
origEventsHBInterval := eventsHBInterval
|
||||
eventsHBInterval = 50 * time.Millisecond // speed up eventing
|
||||
defer func() { eventsHBInterval = origEventsHBInterval }()
|
||||
require_NoLocalOrRemoteConnections := func(account string, srvs ...*Server) {
|
||||
t.Helper()
|
||||
for _, srv := range srvs {
|
||||
@@ -3253,17 +3297,6 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
}()
|
||||
require_NoLocalOrRemoteConnections(acc, srvs...)
|
||||
}
|
||||
writeFile := func(dir string, pub string, jwt string) {
|
||||
t.Helper()
|
||||
err := ioutil.WriteFile(filepath.Join(dir, pub+".jwt"), []byte(jwt), 0644)
|
||||
require_NoError(t, err)
|
||||
}
|
||||
createDir := func(prefix string) string {
|
||||
t.Helper()
|
||||
dir, err := ioutil.TempDir("", prefix)
|
||||
require_NoError(t, err)
|
||||
return dir
|
||||
}
|
||||
connect := func(url string, credsfile string, acc string, srvs ...*Server) {
|
||||
t.Helper()
|
||||
nc := natsConnect(t, url, nats.UserCredentials(credsfile))
|
||||
@@ -3299,36 +3332,6 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
*creds = genCredsFile(t, ujwt, seed)
|
||||
done <- struct{}{}
|
||||
}
|
||||
updateJwt := func(url string, creds string, pubKey string, jwt string) int {
|
||||
t.Helper()
|
||||
c := natsConnect(t, url, nats.UserCredentials(creds),
|
||||
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
|
||||
if err != nil {
|
||||
t.Fatal("error not expected in this test", err)
|
||||
}
|
||||
}),
|
||||
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
|
||||
t.Fatal("error not expected in this test", err)
|
||||
}),
|
||||
)
|
||||
defer c.Close()
|
||||
resp := c.NewRespInbox()
|
||||
sub := natsSubSync(t, c, resp)
|
||||
err := sub.AutoUnsubscribe(3)
|
||||
require_NoError(t, err)
|
||||
require_NoError(t, c.PublishRequest(fmt.Sprintf(accUpdateEventSubjNew, pubKey), resp, []byte(jwt)))
|
||||
passCnt := 0
|
||||
if require_NextMsg(sub) {
|
||||
passCnt++
|
||||
}
|
||||
if require_NextMsg(sub) {
|
||||
passCnt++
|
||||
}
|
||||
if require_NextMsg(sub) {
|
||||
passCnt++
|
||||
}
|
||||
return passCnt
|
||||
}
|
||||
// Create Accounts and corresponding user creds. Do so concurrently to speed up the test
|
||||
doneChan := make(chan struct{}, 5)
|
||||
defer close(doneChan)
|
||||
@@ -3342,30 +3345,28 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
go createAccountAndUser(true, doneChan, &cpub, &cjwt1, &cjwt2, &cCreds)
|
||||
var dpub, djwt1, dummy2, dCreds string // extra user used later in the test in order to test limits
|
||||
go createAccountAndUser(true, doneChan, &dpub, &djwt1, &dummy2, &dCreds)
|
||||
<-doneChan
|
||||
<-doneChan
|
||||
<-doneChan
|
||||
<-doneChan
|
||||
<-doneChan
|
||||
for i := 0; i < cap(doneChan); i++ {
|
||||
<-doneChan
|
||||
}
|
||||
defer os.Remove(sysCreds)
|
||||
defer os.Remove(aCreds)
|
||||
defer os.Remove(bCreds)
|
||||
defer os.Remove(cCreds)
|
||||
defer os.Remove(dCreds)
|
||||
// Create one directory for each server
|
||||
dirA := createDir("srv-a")
|
||||
dirA := createDir(t, "srv-a")
|
||||
defer os.RemoveAll(dirA)
|
||||
dirB := createDir("srv-b")
|
||||
dirB := createDir(t, "srv-b")
|
||||
defer os.RemoveAll(dirB)
|
||||
dirC := createDir("srv-c")
|
||||
dirC := createDir(t, "srv-c")
|
||||
defer os.RemoveAll(dirC)
|
||||
// simulate a restart of the server by storing files in them
|
||||
// Server A/B will completely sync, so after startup each server
|
||||
// will contain the union off all stored/configured jwt
|
||||
// Server C will send out lookup requests for jwt it does not store itself
|
||||
writeFile(dirA, apub, ajwt1)
|
||||
writeFile(dirB, bpub, bjwt1)
|
||||
writeFile(dirC, cpub, cjwt1)
|
||||
writeJWT(t, dirA, apub, ajwt1)
|
||||
writeJWT(t, dirB, bpub, bjwt1)
|
||||
writeJWT(t, dirC, cpub, cjwt1)
|
||||
// Create seed server A (using no_advertise to prevent fail over)
|
||||
confA := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
@@ -3391,7 +3392,7 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
sA, _ := RunServerWithConfig(confA)
|
||||
defer sA.Shutdown()
|
||||
// during startup resolver_preload causes the directory to contain data
|
||||
require_FilePresent(dirA, cpub)
|
||||
require_JWTPresent(t, dirA, cpub)
|
||||
// Create Server B (using no_advertise to prevent fail over)
|
||||
confB := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
@@ -3415,6 +3416,7 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
`, ojwt, syspub, dirB, sA.opts.Cluster.Port)))
|
||||
defer os.Remove(confB)
|
||||
sB, _ := RunServerWithConfig(confB)
|
||||
defer sB.Shutdown()
|
||||
// Create Server C (using no_advertise to prevent fail over)
|
||||
fmtC := `
|
||||
listen: -1
|
||||
@@ -3441,39 +3443,40 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
confCshortTTL := createConfFile(t, []byte(fmt.Sprintf(fmtC, ojwt, syspub, dirC, 1000, sA.opts.Cluster.Port)))
|
||||
defer os.Remove(confCshortTTL)
|
||||
sC, _ := RunServerWithConfig(confClongTTL) // use long ttl to assure it is not kicking
|
||||
defer sC.Shutdown()
|
||||
// startup cluster
|
||||
checkClusterFormed(t, sA, sB, sC)
|
||||
time.Sleep(500 * time.Millisecond) // wait for the protocol to converge
|
||||
// Check all accounts
|
||||
require_FilePresent(dirA, apub) // was already present on startup
|
||||
require_FilePresent(dirB, apub) // was copied from server A
|
||||
require_FileAbsent(dirC, apub)
|
||||
require_FilePresent(dirA, bpub) // was copied from server B
|
||||
require_FilePresent(dirB, bpub) // was already present on startup
|
||||
require_FileAbsent(dirC, bpub)
|
||||
require_FilePresent(dirA, cpub) // was present in preload
|
||||
require_FilePresent(dirB, cpub) // was copied from server A
|
||||
require_FilePresent(dirC, cpub) // was already present on startup
|
||||
require_JWTPresent(t, dirA, apub) // was already present on startup
|
||||
require_JWTPresent(t, dirB, apub) // was copied from server A
|
||||
require_JWTAbsent(t, dirC, apub)
|
||||
require_JWTPresent(t, dirA, bpub) // was copied from server B
|
||||
require_JWTPresent(t, dirB, bpub) // was already present on startup
|
||||
require_JWTAbsent(t, dirC, bpub)
|
||||
require_JWTPresent(t, dirA, cpub) // was present in preload
|
||||
require_JWTPresent(t, dirB, cpub) // was copied from server A
|
||||
require_JWTPresent(t, dirC, cpub) // was already present on startup
|
||||
// This is to test that connecting to it still works
|
||||
require_FileAbsent(dirA, syspub)
|
||||
require_FileAbsent(dirB, syspub)
|
||||
require_FileAbsent(dirC, syspub)
|
||||
require_JWTAbsent(t, dirA, syspub)
|
||||
require_JWTAbsent(t, dirB, syspub)
|
||||
require_JWTAbsent(t, dirC, syspub)
|
||||
// system account client can connect to every server
|
||||
connect(sA.ClientURL(), sysCreds, "")
|
||||
connect(sB.ClientURL(), sysCreds, "")
|
||||
connect(sC.ClientURL(), sysCreds, "")
|
||||
checkClusterFormed(t, sA, sB, sC)
|
||||
// upload system account and require a response from each server
|
||||
passCnt := updateJwt(sA.ClientURL(), sysCreds, syspub, sysjwt)
|
||||
passCnt := updateJwt(t, sA.ClientURL(), sysCreds, syspub, sysjwt, 3)
|
||||
require_True(t, passCnt == 3)
|
||||
require_FilePresent(dirA, syspub) // was just received
|
||||
require_FilePresent(dirB, syspub) // was just received
|
||||
require_FilePresent(dirC, syspub) // was just received
|
||||
require_JWTPresent(t, dirA, syspub) // was just received
|
||||
require_JWTPresent(t, dirB, syspub) // was just received
|
||||
require_JWTPresent(t, dirC, syspub) // was just received
|
||||
// Only files missing are in C, which is only caching
|
||||
connect(sC.ClientURL(), aCreds, apub, sA, sB, sC)
|
||||
connect(sC.ClientURL(), bCreds, bpub, sA, sB, sC)
|
||||
require_FilePresent(dirC, apub) // was looked up form A or B
|
||||
require_FilePresent(dirC, bpub) // was looked up from A or B
|
||||
require_JWTPresent(t, dirC, apub) // was looked up form A or B
|
||||
require_JWTPresent(t, dirC, bpub) // was looked up from A or B
|
||||
// Check limits and update jwt B connecting to server A
|
||||
for port, v := range map[string]struct{ pub, jwt, creds string }{
|
||||
sB.ClientURL(): {bpub, bjwt2, bCreds},
|
||||
@@ -3482,19 +3485,19 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
require_1Connection(sA.ClientURL(), v.creds, v.pub, sA, sB, sC)
|
||||
require_1Connection(sB.ClientURL(), v.creds, v.pub, sA, sB, sC)
|
||||
require_1Connection(sC.ClientURL(), v.creds, v.pub, sA, sB, sC)
|
||||
passCnt := updateJwt(port, sysCreds, v.pub, v.jwt)
|
||||
passCnt := updateJwt(t, port, sysCreds, v.pub, v.jwt, 3)
|
||||
require_True(t, passCnt == 3)
|
||||
require_2Connection(sA.ClientURL(), v.creds, v.pub, sA, sB, sC)
|
||||
require_2Connection(sB.ClientURL(), v.creds, v.pub, sA, sB, sC)
|
||||
require_2Connection(sC.ClientURL(), v.creds, v.pub, sA, sB, sC)
|
||||
require_FileEqual(dirA, v.pub, v.jwt)
|
||||
require_FileEqual(dirB, v.pub, v.jwt)
|
||||
require_FileEqual(dirC, v.pub, v.jwt)
|
||||
require_JWTEqual(t, dirA, v.pub, v.jwt)
|
||||
require_JWTEqual(t, dirB, v.pub, v.jwt)
|
||||
require_JWTEqual(t, dirC, v.pub, v.jwt)
|
||||
}
|
||||
// Simulates A having missed an update
|
||||
// shutting B down as it has it will directly connect to A and connect right away
|
||||
sB.Shutdown()
|
||||
writeFile(dirB, apub, ajwt2) // this will be copied to server A
|
||||
writeJWT(t, dirB, apub, ajwt2) // this will be copied to server A
|
||||
sB, _ = RunServerWithConfig(confB)
|
||||
defer sB.Shutdown()
|
||||
checkClusterFormed(t, sA, sB, sC)
|
||||
@@ -3502,9 +3505,10 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
// Restart server C. this is a workaround to force C to do a lookup in the absence of account cleanup
|
||||
sC.Shutdown()
|
||||
sC, _ = RunServerWithConfig(confClongTTL) //TODO remove this once we clean up accounts
|
||||
require_FileEqual(dirA, apub, ajwt2) // was copied from server B
|
||||
require_FileEqual(dirB, apub, ajwt2) // was restarted with this
|
||||
require_FileEqual(dirC, apub, ajwt1) // still contains old cached value
|
||||
defer sC.Shutdown()
|
||||
require_JWTEqual(t, dirA, apub, ajwt2) // was copied from server B
|
||||
require_JWTEqual(t, dirB, apub, ajwt2) // was restarted with this
|
||||
require_JWTEqual(t, dirC, apub, ajwt1) // still contains old cached value
|
||||
require_2Connection(sA.ClientURL(), aCreds, apub, sA, sB, sC)
|
||||
require_2Connection(sB.ClientURL(), aCreds, apub, sA, sB, sC)
|
||||
require_1Connection(sC.ClientURL(), aCreds, apub, sA, sB, sC)
|
||||
@@ -3512,7 +3516,7 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
sC.Shutdown()
|
||||
sC, _ = RunServerWithConfig(confCshortTTL) //TODO remove this once we clean up accounts
|
||||
defer sC.Shutdown()
|
||||
require_FileEqual(dirC, apub, ajwt1) // still contains old cached value
|
||||
require_JWTEqual(t, dirC, apub, ajwt1) // still contains old cached value
|
||||
checkClusterFormed(t, sA, sB, sC)
|
||||
// Force next connect to do a lookup exceeds ttl
|
||||
fname := filepath.Join(dirC, apub+".jwt")
|
||||
@@ -3524,12 +3528,237 @@ func TestAccountNATSResolverFetch(t *testing.T) {
|
||||
return fmt.Errorf("File not removed in time")
|
||||
})
|
||||
connect(sC.ClientURL(), aCreds, apub, sA, sB, sC) // When lookup happens
|
||||
require_FileEqual(dirC, apub, ajwt2) // was looked up form A or B
|
||||
require_JWTEqual(t, dirC, apub, ajwt2) // was looked up form A or B
|
||||
require_2Connection(sC.ClientURL(), aCreds, apub, sA, sB, sC)
|
||||
// Test exceeding limit. For the exclusive directory resolver, limit is a stop gap measure.
|
||||
// It is not expected to be hit. When hit the administrator is supposed to take action.
|
||||
passCnt = updateJwt(sA.ClientURL(), sysCreds, dpub, djwt1)
|
||||
passCnt = updateJwt(t, sA.ClientURL(), sysCreds, dpub, djwt1, 3)
|
||||
require_True(t, passCnt == 1) // Only Server C updated
|
||||
for _, srv := range []*Server{sA, sB, sC} {
|
||||
if a, ok := srv.accounts.Load(syspub); ok {
|
||||
acc := a.(*Account)
|
||||
checkFor(t, time.Second, 20*time.Millisecond, func() error {
|
||||
acc.mu.Lock()
|
||||
defer acc.mu.Unlock()
|
||||
if acc.ctmr != nil {
|
||||
return fmt.Errorf("Timer still exists")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccountNATSResolverCrossClusterFetch(t *testing.T) {
|
||||
connect := func(url string, credsfile string) {
|
||||
t.Helper()
|
||||
nc := natsConnect(t, url, nats.UserCredentials(credsfile))
|
||||
nc.Close()
|
||||
}
|
||||
createAccountAndUser := func(done chan struct{}, pubKey, jwt1, jwt2, creds *string) {
|
||||
t.Helper()
|
||||
kp, _ := nkeys.CreateAccount()
|
||||
*pubKey, _ = kp.PublicKey()
|
||||
claim := jwt.NewAccountClaims(*pubKey)
|
||||
var err error
|
||||
*jwt1, err = claim.Encode(oKp)
|
||||
require_NoError(t, err)
|
||||
// need to assure that create time differs (resolution is sec)
|
||||
time.Sleep(time.Millisecond * 1100)
|
||||
// create updated claim
|
||||
claim.Tags.Add("tag")
|
||||
*jwt2, err = claim.Encode(oKp)
|
||||
require_NoError(t, err)
|
||||
ukp, _ := nkeys.CreateUser()
|
||||
seed, _ := ukp.Seed()
|
||||
upub, _ := ukp.PublicKey()
|
||||
uclaim := newJWTTestUserClaims()
|
||||
uclaim.Subject = upub
|
||||
ujwt, err := uclaim.Encode(kp)
|
||||
require_NoError(t, err)
|
||||
*creds = genCredsFile(t, ujwt, seed)
|
||||
done <- struct{}{}
|
||||
}
|
||||
// Create Accounts and corresponding user creds. Do so concurrently to speed up the test
|
||||
doneChan := make(chan struct{}, 3)
|
||||
defer close(doneChan)
|
||||
var syspub, sysjwt, dummy1, sysCreds string
|
||||
go createAccountAndUser(doneChan, &syspub, &sysjwt, &dummy1, &sysCreds)
|
||||
var apub, ajwt1, ajwt2, aCreds string
|
||||
go createAccountAndUser(doneChan, &apub, &ajwt1, &ajwt2, &aCreds)
|
||||
var bpub, bjwt1, bjwt2, bCreds string
|
||||
go createAccountAndUser(doneChan, &bpub, &bjwt1, &bjwt2, &bCreds)
|
||||
for i := 0; i < cap(doneChan); i++ {
|
||||
<-doneChan
|
||||
}
|
||||
defer os.Remove(sysCreds)
|
||||
defer os.Remove(aCreds)
|
||||
defer os.Remove(bCreds)
|
||||
// Create one directory for each server
|
||||
dirAA := createDir(t, "srv-a-a")
|
||||
defer os.RemoveAll(dirAA)
|
||||
dirAB := createDir(t, "srv-a-b")
|
||||
defer os.RemoveAll(dirAB)
|
||||
dirBA := createDir(t, "srv-b-a")
|
||||
defer os.RemoveAll(dirBA)
|
||||
dirBB := createDir(t, "srv-b-b")
|
||||
defer os.RemoveAll(dirBB)
|
||||
// simulate a restart of the server by storing files in them
|
||||
// Server AA & AB will completely sync
|
||||
// Server BA & BB will completely sync
|
||||
// Be aware that no syncing will occur between cluster
|
||||
writeJWT(t, dirAA, apub, ajwt1)
|
||||
writeJWT(t, dirBA, bpub, bjwt1)
|
||||
// Create seed server A (using no_advertise to prevent fail over)
|
||||
confAA := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
server_name: srv-A-A
|
||||
operator: %s
|
||||
system_account: %s
|
||||
resolver: {
|
||||
type: full
|
||||
dir: %s
|
||||
interval: "200ms"
|
||||
}
|
||||
gateway: {
|
||||
name: "clust-A"
|
||||
listen: -1
|
||||
}
|
||||
cluster {
|
||||
name: clust-A
|
||||
listen: -1
|
||||
no_advertise: true
|
||||
}
|
||||
`, ojwt, syspub, dirAA)))
|
||||
defer os.Remove(confAA)
|
||||
sAA, _ := RunServerWithConfig(confAA)
|
||||
defer sAA.Shutdown()
|
||||
// Create Server B (using no_advertise to prevent fail over)
|
||||
confAB := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
server_name: srv-A-B
|
||||
operator: %s
|
||||
system_account: %s
|
||||
resolver: {
|
||||
type: full
|
||||
dir: %s
|
||||
interval: "200ms"
|
||||
}
|
||||
gateway: {
|
||||
name: "clust-A"
|
||||
listen: -1
|
||||
}
|
||||
cluster {
|
||||
name: clust-A
|
||||
listen: -1
|
||||
no_advertise: true
|
||||
routes [
|
||||
nats-route://localhost:%d
|
||||
]
|
||||
}
|
||||
`, ojwt, syspub, dirAB, sAA.opts.Cluster.Port)))
|
||||
defer os.Remove(confAB)
|
||||
sAB, _ := RunServerWithConfig(confAB)
|
||||
defer sAB.Shutdown()
|
||||
// Create Server C (using no_advertise to prevent fail over)
|
||||
confBA := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
server_name: srv-B-A
|
||||
operator: %s
|
||||
system_account: %s
|
||||
resolver: {
|
||||
type: full
|
||||
dir: %s
|
||||
interval: "200ms"
|
||||
}
|
||||
gateway: {
|
||||
name: "clust-B"
|
||||
listen: -1
|
||||
gateways: [
|
||||
{name: "clust-A", url: "nats://localhost:%d"},
|
||||
]
|
||||
}
|
||||
cluster {
|
||||
name: clust-B
|
||||
listen: -1
|
||||
no_advertise: true
|
||||
}
|
||||
`, ojwt, syspub, dirBA, sAA.opts.Gateway.Port)))
|
||||
defer os.Remove(confBA)
|
||||
sBA, _ := RunServerWithConfig(confBA)
|
||||
defer sBA.Shutdown()
|
||||
// Create Sever BA (using no_advertise to prevent fail over)
|
||||
confBB := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
server_name: srv-B-B
|
||||
operator: %s
|
||||
system_account: %s
|
||||
resolver: {
|
||||
type: full
|
||||
dir: %s
|
||||
interval: "200ms"
|
||||
}
|
||||
cluster {
|
||||
name: clust-B
|
||||
listen: -1
|
||||
no_advertise: true
|
||||
routes [
|
||||
nats-route://localhost:%d
|
||||
]
|
||||
}
|
||||
gateway: {
|
||||
name: "clust-B"
|
||||
listen: -1
|
||||
gateways: [
|
||||
{name: "clust-A", url: "nats://localhost:%d"},
|
||||
]
|
||||
}
|
||||
`, ojwt, syspub, dirBB, sBA.opts.Cluster.Port, sAA.opts.Cluster.Port)))
|
||||
defer os.Remove(confBB)
|
||||
sBB, _ := RunServerWithConfig(confBB)
|
||||
defer sBB.Shutdown()
|
||||
// Assert topology
|
||||
checkClusterFormed(t, sAA, sAB)
|
||||
checkClusterFormed(t, sBA, sBB)
|
||||
waitForOutboundGateways(t, sAA, 1, 5*time.Second)
|
||||
waitForOutboundGateways(t, sAB, 1, 5*time.Second)
|
||||
waitForOutboundGateways(t, sBA, 1, 5*time.Second)
|
||||
waitForOutboundGateways(t, sBB, 1, 5*time.Second)
|
||||
time.Sleep(500 * time.Millisecond) // wait for the protocol to converge
|
||||
updateJwt(t, sAA.ClientURL(), sysCreds, syspub, sysjwt, 4) // update system account jwt on all server
|
||||
require_JWTEqual(t, dirAA, syspub, sysjwt) // assure this update made it to every server
|
||||
require_JWTEqual(t, dirAB, syspub, sysjwt) // assure this update made it to every server
|
||||
require_JWTEqual(t, dirBA, syspub, sysjwt) // assure this update made it to every server
|
||||
require_JWTEqual(t, dirBB, syspub, sysjwt) // assure this update made it to every server
|
||||
require_JWTAbsent(t, dirAA, bpub) // assure that jwt are not synced across cluster
|
||||
require_JWTAbsent(t, dirAB, bpub) // assure that jwt are not synced across cluster
|
||||
require_JWTAbsent(t, dirBA, apub) // assure that jwt are not synced across cluster
|
||||
require_JWTAbsent(t, dirBB, apub) // assure that jwt are not synced across cluster
|
||||
connect(sAA.ClientURL(), aCreds) // connect to cluster where jwt was initially stored
|
||||
connect(sAB.ClientURL(), aCreds) // connect to cluster where jwt was initially stored
|
||||
connect(sBA.ClientURL(), bCreds) // connect to cluster where jwt was initially stored
|
||||
connect(sBB.ClientURL(), bCreds) // connect to cluster where jwt was initially stored
|
||||
time.Sleep(500 * time.Millisecond) // wait for the protocol to (NOT) converge
|
||||
require_JWTAbsent(t, dirAA, bpub) // assure that jwt are still not synced across cluster
|
||||
require_JWTAbsent(t, dirAB, bpub) // assure that jwt are still not synced across cluster
|
||||
require_JWTAbsent(t, dirBA, apub) // assure that jwt are still not synced across cluster
|
||||
require_JWTAbsent(t, dirBB, apub) // assure that jwt are still not synced across cluster
|
||||
// We have verified that account B does not exist in cluster A, neither does account A in cluster B
|
||||
// Despite that clients from account B can connect to server A, same for account A in cluster B
|
||||
connect(sAA.ClientURL(), bCreds) // connect to cluster where jwt was not initially stored
|
||||
connect(sAB.ClientURL(), bCreds) // connect to cluster where jwt was not initially stored
|
||||
connect(sBA.ClientURL(), aCreds) // connect to cluster where jwt was not initially stored
|
||||
connect(sBB.ClientURL(), aCreds) // connect to cluster where jwt was not initially stored
|
||||
require_JWTEqual(t, dirAA, bpub, bjwt1) // assure that now jwt used in connect is stored
|
||||
require_JWTEqual(t, dirAB, bpub, bjwt1) // assure that now jwt used in connect is stored
|
||||
require_JWTEqual(t, dirBA, apub, ajwt1) // assure that now jwt used in connect is stored
|
||||
require_JWTEqual(t, dirBB, apub, ajwt1) // assure that now jwt used in connect is stored
|
||||
updateJwt(t, sAA.ClientURL(), sysCreds, bpub, bjwt2, 4) // update bjwt, expect updates from everywhere
|
||||
updateJwt(t, sBA.ClientURL(), sysCreds, apub, ajwt2, 4) // update ajwt, expect updates from everywhere
|
||||
require_JWTEqual(t, dirAA, bpub, bjwt2) // assure that jwt got updated accordingly
|
||||
require_JWTEqual(t, dirAB, bpub, bjwt2) // assure that jwt got updated accordingly
|
||||
require_JWTEqual(t, dirBA, apub, ajwt2) // assure that jwt got updated accordingly
|
||||
require_JWTEqual(t, dirBB, apub, ajwt2) // assure that jwt got updated accordingly
|
||||
}
|
||||
|
||||
func newTimeRange(start time.Time, dur time.Duration) jwt.TimeRange {
|
||||
|
||||
Reference in New Issue
Block a user