From f3e6cd12ec8446eff4473d79f24c34b2a9aaa0c4 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Fri, 25 Sep 2020 15:36:57 -0400 Subject: [PATCH 1/3] Adding fetch on missing jwt of full nats based resolver Full nats based resolver sync within a cluster. This functionality addresses syncing between cluster. Fixing deadlock when more than one server responds to lookup. Fixing crash when shutdown and pack happen at the same time. --- server/accounts.go | 46 +++-- server/dirstore.go | 4 +- server/jwt_test.go | 417 ++++++++++++++++++++++++++++++++++----------- 3 files changed, 345 insertions(+), 122 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 83615457..5e6963ed 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -2892,6 +2892,7 @@ func (ur *URLAccResolver) Fetch(name string) (string, error) { // Resolver based on nats for synchronization and backing directory for storage. type DirAccResolver struct { *DirJWTStore + *Server syncInterval time.Duration } @@ -2899,6 +2900,10 @@ func (dr *DirAccResolver) IsTrackingUpdate() bool { return true } +func (dr *DirAccResolver) Reload() error { + return dr.DirJWTStore.Reload() +} + func respondToUpdate(s *Server, respSubj string, acc string, message string, err error) { if err == nil { s.Debugf("%s - %s", message, acc) @@ -2929,6 +2934,7 @@ func respondToUpdate(s *Server, respSubj string, acc string, message string, err func (dr *DirAccResolver) Start(s *Server) error { dr.Lock() defer dr.Unlock() + dr.Server = s dr.DirJWTStore.changed = func(pubKey string) { if v, ok := s.accounts.Load(pubKey); !ok { } else if jwt, err := dr.LoadAcc(pubKey); err != nil { @@ -2998,7 +3004,7 @@ func (dr *DirAccResolver) Start(s *Server) error { // let them timeout s.Errorf("pack request error: %v", err) } else { - s.Debugf("pack request hash %x - finished responding with hash %x") + s.Debugf("pack request hash %x - finished responding with hash %x", theirHash, ourHash) s.sendInternalMsgLocked(reply, "", nil, []byte{}) } }); err != nil { @@ -3039,7 +3045,18 @@ func (dr *DirAccResolver) Start(s *Server) error { } func (dr *DirAccResolver) Fetch(name string) (string, error) { - return dr.LoadAcc(name) + if theJWT, err := dr.LoadAcc(name); theJWT != "" { + return theJWT, nil + } else { + dr.Lock() + srv := dr.Server + dr.Unlock() + if srv == nil { + return "", err + } else { + return srv.fetch(dr, name) // lookup from other server + } + } } func (dr *DirAccResolver) Store(name, jwt string) error { @@ -3057,33 +3074,27 @@ func NewDirAccResolver(path string, limit int64, syncInterval time.Duration) (*D if err != nil { return nil, err } - return &DirAccResolver{store, syncInterval}, nil + return &DirAccResolver{store, nil, syncInterval}, nil } // Caching resolver using nats for lookups and making use of a directory for storage type CacheDirAccResolver struct { DirAccResolver - *Server ttl time.Duration } -func (dr *CacheDirAccResolver) Fetch(name string) (string, error) { - if theJWT, _ := dr.LoadAcc(name); theJWT != "" { - return theJWT, nil - } - // lookup from other server - s := dr.Server +func (s *Server) fetch(res AccountResolver, name string) (string, error) { if s == nil { return "", ErrNoAccountResolver } respC := make(chan []byte, 1) accountLookupRequest := fmt.Sprintf(accLookupReqSubj, name) s.mu.Lock() - replySubj := s.newRespInbox() if s.sys == nil || s.sys.replies == nil { s.mu.Unlock() return "", fmt.Errorf("eventing shut down") } + replySubj := s.newRespInbox() replies := s.sys.replies // Store our handler. replies[replySubj] = func(sub *subscription, _ *client, subject, _ string, msg []byte) { @@ -3091,7 +3102,10 @@ func (dr *CacheDirAccResolver) Fetch(name string) (string, error) { copy(clone, msg) s.mu.Lock() if _, ok := replies[replySubj]; ok { - respC <- clone // only send if there is still interest + select { + case respC <- clone: // only use first response and only if there is still interest + default: + } } s.mu.Unlock() } @@ -3106,7 +3120,7 @@ func (dr *CacheDirAccResolver) Fetch(name string) (string, error) { case <-time.After(fetchTimeout): err = errors.New("fetching jwt timed out") case m := <-respC: - if err = dr.Store(name, string(m)); err == nil { + if err = res.Store(name, string(m)); err == nil { theJWT = string(m) } } @@ -3125,7 +3139,7 @@ func NewCacheDirAccResolver(path string, limit int64, ttl time.Duration) (*Cache if err != nil { return nil, err } - return &CacheDirAccResolver{DirAccResolver{store, 0}, nil, ttl}, nil + return &CacheDirAccResolver{DirAccResolver{store, nil, 0}, ttl}, nil } func (dr *CacheDirAccResolver) Start(s *Server) error { @@ -3172,7 +3186,3 @@ func (dr *CacheDirAccResolver) Start(s *Server) error { s.Noticef("Managing some jwt in exclusive directory %s", dr.directory) return nil } - -func (dr *CacheDirAccResolver) Reload() error { - return dr.DirAccResolver.Reload() -} diff --git a/server/dirstore.go b/server/dirstore.go index 4826ffd7..97536800 100644 --- a/server/dirstore.go +++ b/server/dirstore.go @@ -261,11 +261,11 @@ func (store *DirJWTStore) PackWalk(maxJWTs int, cb func(partialPackMsg string)) exp := store.expiration store.Unlock() err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if !info.IsDir() && strings.HasSuffix(path, fileExtension) { // this is a JWT + if info != nil && !info.IsDir() && strings.HasSuffix(path, fileExtension) { // this is a JWT pubKey := strings.TrimSuffix(filepath.Base(path), fileExtension) store.Lock() if exp != nil { - if _, ok := store.expiration.idx[pubKey]; !ok { + if _, ok := exp.idx[pubKey]; !ok { store.Unlock() return nil // only include indexed files } diff --git a/server/jwt_test.go b/server/jwt_test.go index 63b1d2d8..259ee0a1 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -3183,11 +3183,10 @@ func TestExpiredUserCredentialsRenewal(t *testing.T) { } } -func TestAccountNATSResolverFetch(t *testing.T) { - origEventsHBInterval := eventsHBInterval - eventsHBInterval = 50 * time.Millisecond // speed up eventing - defer func() { eventsHBInterval = origEventsHBInterval }() +func updateJwt(t *testing.T, url string, creds string, pubKey string, jwt string, respCnt int) int { + t.Helper() require_NextMsg := func(sub *nats.Subscription) bool { + t.Helper() msg := natsNexMsg(t, sub, time.Second) content := make(map[string]interface{}) json.Unmarshal(msg.Data, &content) @@ -3196,23 +3195,68 @@ func TestAccountNATSResolverFetch(t *testing.T) { } return false } - require_FileAbsent := func(dir string, pub string) { - t.Helper() - _, err := os.Stat(filepath.Join(dir, pub+".jwt")) - require_Error(t, err) - require_True(t, os.IsNotExist(err)) - } - require_FilePresent := func(dir string, pub string) { - t.Helper() - _, err := os.Stat(filepath.Join(dir, pub+".jwt")) - require_NoError(t, err) - } - require_FileEqual := func(dir string, pub string, jwt string) { - t.Helper() - content, err := ioutil.ReadFile(filepath.Join(dir, pub+".jwt")) - require_NoError(t, err) - require_Equal(t, string(content), jwt) + c := natsConnect(t, url, nats.UserCredentials(creds), + nats.DisconnectErrHandler(func(_ *nats.Conn, err error) { + if err != nil { + t.Fatal("error not expected in this test", err) + } + }), + nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) { + t.Fatal("error not expected in this test", err) + }), + ) + defer c.Close() + resp := c.NewRespInbox() + sub := natsSubSync(t, c, resp) + err := sub.AutoUnsubscribe(respCnt) + require_NoError(t, err) + require_NoError(t, c.PublishRequest(fmt.Sprintf(accUpdateEventSubjNew, pubKey), resp, []byte(jwt))) + passCnt := 0 + for i := 0; i < respCnt; i++ { + if require_NextMsg(sub) { + passCnt++ + } } + return passCnt +} + +func require_JWTAbsent(t *testing.T, dir string, pub string) { + t.Helper() + _, err := os.Stat(filepath.Join(dir, pub+".jwt")) + require_Error(t, err) + require_True(t, os.IsNotExist(err)) +} + +func require_JWTPresent(t *testing.T, dir string, pub string) { + t.Helper() + _, err := os.Stat(filepath.Join(dir, pub+".jwt")) + require_NoError(t, err) +} + +func require_JWTEqual(t *testing.T, dir string, pub string, jwt string) { + t.Helper() + content, err := ioutil.ReadFile(filepath.Join(dir, pub+".jwt")) + require_NoError(t, err) + require_Equal(t, string(content), jwt) +} + +func createDir(t *testing.T, prefix string) string { + t.Helper() + dir, err := ioutil.TempDir("", prefix) + require_NoError(t, err) + return dir +} + +func writeJWT(t *testing.T, dir string, pub string, jwt string) { + t.Helper() + err := ioutil.WriteFile(filepath.Join(dir, pub+".jwt"), []byte(jwt), 0644) + require_NoError(t, err) +} + +func TestAccountNATSResolverFetch(t *testing.T) { + origEventsHBInterval := eventsHBInterval + eventsHBInterval = 50 * time.Millisecond // speed up eventing + defer func() { eventsHBInterval = origEventsHBInterval }() require_NoLocalOrRemoteConnections := func(account string, srvs ...*Server) { t.Helper() for _, srv := range srvs { @@ -3253,17 +3297,6 @@ func TestAccountNATSResolverFetch(t *testing.T) { }() require_NoLocalOrRemoteConnections(acc, srvs...) } - writeFile := func(dir string, pub string, jwt string) { - t.Helper() - err := ioutil.WriteFile(filepath.Join(dir, pub+".jwt"), []byte(jwt), 0644) - require_NoError(t, err) - } - createDir := func(prefix string) string { - t.Helper() - dir, err := ioutil.TempDir("", prefix) - require_NoError(t, err) - return dir - } connect := func(url string, credsfile string, acc string, srvs ...*Server) { t.Helper() nc := natsConnect(t, url, nats.UserCredentials(credsfile)) @@ -3299,36 +3332,6 @@ func TestAccountNATSResolverFetch(t *testing.T) { *creds = genCredsFile(t, ujwt, seed) done <- struct{}{} } - updateJwt := func(url string, creds string, pubKey string, jwt string) int { - t.Helper() - c := natsConnect(t, url, nats.UserCredentials(creds), - nats.DisconnectErrHandler(func(_ *nats.Conn, err error) { - if err != nil { - t.Fatal("error not expected in this test", err) - } - }), - nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) { - t.Fatal("error not expected in this test", err) - }), - ) - defer c.Close() - resp := c.NewRespInbox() - sub := natsSubSync(t, c, resp) - err := sub.AutoUnsubscribe(3) - require_NoError(t, err) - require_NoError(t, c.PublishRequest(fmt.Sprintf(accUpdateEventSubjNew, pubKey), resp, []byte(jwt))) - passCnt := 0 - if require_NextMsg(sub) { - passCnt++ - } - if require_NextMsg(sub) { - passCnt++ - } - if require_NextMsg(sub) { - passCnt++ - } - return passCnt - } // Create Accounts and corresponding user creds. Do so concurrently to speed up the test doneChan := make(chan struct{}, 5) defer close(doneChan) @@ -3342,30 +3345,28 @@ func TestAccountNATSResolverFetch(t *testing.T) { go createAccountAndUser(true, doneChan, &cpub, &cjwt1, &cjwt2, &cCreds) var dpub, djwt1, dummy2, dCreds string // extra user used later in the test in order to test limits go createAccountAndUser(true, doneChan, &dpub, &djwt1, &dummy2, &dCreds) - <-doneChan - <-doneChan - <-doneChan - <-doneChan - <-doneChan + for i := 0; i < cap(doneChan); i++ { + <-doneChan + } defer os.Remove(sysCreds) defer os.Remove(aCreds) defer os.Remove(bCreds) defer os.Remove(cCreds) defer os.Remove(dCreds) // Create one directory for each server - dirA := createDir("srv-a") + dirA := createDir(t, "srv-a") defer os.RemoveAll(dirA) - dirB := createDir("srv-b") + dirB := createDir(t, "srv-b") defer os.RemoveAll(dirB) - dirC := createDir("srv-c") + dirC := createDir(t, "srv-c") defer os.RemoveAll(dirC) // simulate a restart of the server by storing files in them // Server A/B will completely sync, so after startup each server // will contain the union off all stored/configured jwt // Server C will send out lookup requests for jwt it does not store itself - writeFile(dirA, apub, ajwt1) - writeFile(dirB, bpub, bjwt1) - writeFile(dirC, cpub, cjwt1) + writeJWT(t, dirA, apub, ajwt1) + writeJWT(t, dirB, bpub, bjwt1) + writeJWT(t, dirC, cpub, cjwt1) // Create seed server A (using no_advertise to prevent fail over) confA := createConfFile(t, []byte(fmt.Sprintf(` listen: -1 @@ -3391,7 +3392,7 @@ func TestAccountNATSResolverFetch(t *testing.T) { sA, _ := RunServerWithConfig(confA) defer sA.Shutdown() // during startup resolver_preload causes the directory to contain data - require_FilePresent(dirA, cpub) + require_JWTPresent(t, dirA, cpub) // Create Server B (using no_advertise to prevent fail over) confB := createConfFile(t, []byte(fmt.Sprintf(` listen: -1 @@ -3445,35 +3446,35 @@ func TestAccountNATSResolverFetch(t *testing.T) { checkClusterFormed(t, sA, sB, sC) time.Sleep(500 * time.Millisecond) // wait for the protocol to converge // Check all accounts - require_FilePresent(dirA, apub) // was already present on startup - require_FilePresent(dirB, apub) // was copied from server A - require_FileAbsent(dirC, apub) - require_FilePresent(dirA, bpub) // was copied from server B - require_FilePresent(dirB, bpub) // was already present on startup - require_FileAbsent(dirC, bpub) - require_FilePresent(dirA, cpub) // was present in preload - require_FilePresent(dirB, cpub) // was copied from server A - require_FilePresent(dirC, cpub) // was already present on startup + require_JWTPresent(t, dirA, apub) // was already present on startup + require_JWTPresent(t, dirB, apub) // was copied from server A + require_JWTAbsent(t, dirC, apub) + require_JWTPresent(t, dirA, bpub) // was copied from server B + require_JWTPresent(t, dirB, bpub) // was already present on startup + require_JWTAbsent(t, dirC, bpub) + require_JWTPresent(t, dirA, cpub) // was present in preload + require_JWTPresent(t, dirB, cpub) // was copied from server A + require_JWTPresent(t, dirC, cpub) // was already present on startup // This is to test that connecting to it still works - require_FileAbsent(dirA, syspub) - require_FileAbsent(dirB, syspub) - require_FileAbsent(dirC, syspub) + require_JWTAbsent(t, dirA, syspub) + require_JWTAbsent(t, dirB, syspub) + require_JWTAbsent(t, dirC, syspub) // system account client can connect to every server connect(sA.ClientURL(), sysCreds, "") connect(sB.ClientURL(), sysCreds, "") connect(sC.ClientURL(), sysCreds, "") checkClusterFormed(t, sA, sB, sC) // upload system account and require a response from each server - passCnt := updateJwt(sA.ClientURL(), sysCreds, syspub, sysjwt) + passCnt := updateJwt(t, sA.ClientURL(), sysCreds, syspub, sysjwt, 3) require_True(t, passCnt == 3) - require_FilePresent(dirA, syspub) // was just received - require_FilePresent(dirB, syspub) // was just received - require_FilePresent(dirC, syspub) // was just received + require_JWTPresent(t, dirA, syspub) // was just received + require_JWTPresent(t, dirB, syspub) // was just received + require_JWTPresent(t, dirC, syspub) // was just received // Only files missing are in C, which is only caching connect(sC.ClientURL(), aCreds, apub, sA, sB, sC) connect(sC.ClientURL(), bCreds, bpub, sA, sB, sC) - require_FilePresent(dirC, apub) // was looked up form A or B - require_FilePresent(dirC, bpub) // was looked up from A or B + require_JWTPresent(t, dirC, apub) // was looked up form A or B + require_JWTPresent(t, dirC, bpub) // was looked up from A or B // Check limits and update jwt B connecting to server A for port, v := range map[string]struct{ pub, jwt, creds string }{ sB.ClientURL(): {bpub, bjwt2, bCreds}, @@ -3482,19 +3483,19 @@ func TestAccountNATSResolverFetch(t *testing.T) { require_1Connection(sA.ClientURL(), v.creds, v.pub, sA, sB, sC) require_1Connection(sB.ClientURL(), v.creds, v.pub, sA, sB, sC) require_1Connection(sC.ClientURL(), v.creds, v.pub, sA, sB, sC) - passCnt := updateJwt(port, sysCreds, v.pub, v.jwt) + passCnt := updateJwt(t, port, sysCreds, v.pub, v.jwt, 3) require_True(t, passCnt == 3) require_2Connection(sA.ClientURL(), v.creds, v.pub, sA, sB, sC) require_2Connection(sB.ClientURL(), v.creds, v.pub, sA, sB, sC) require_2Connection(sC.ClientURL(), v.creds, v.pub, sA, sB, sC) - require_FileEqual(dirA, v.pub, v.jwt) - require_FileEqual(dirB, v.pub, v.jwt) - require_FileEqual(dirC, v.pub, v.jwt) + require_JWTEqual(t, dirA, v.pub, v.jwt) + require_JWTEqual(t, dirB, v.pub, v.jwt) + require_JWTEqual(t, dirC, v.pub, v.jwt) } // Simulates A having missed an update // shutting B down as it has it will directly connect to A and connect right away sB.Shutdown() - writeFile(dirB, apub, ajwt2) // this will be copied to server A + writeJWT(t, dirB, apub, ajwt2) // this will be copied to server A sB, _ = RunServerWithConfig(confB) defer sB.Shutdown() checkClusterFormed(t, sA, sB, sC) @@ -3502,9 +3503,9 @@ func TestAccountNATSResolverFetch(t *testing.T) { // Restart server C. this is a workaround to force C to do a lookup in the absence of account cleanup sC.Shutdown() sC, _ = RunServerWithConfig(confClongTTL) //TODO remove this once we clean up accounts - require_FileEqual(dirA, apub, ajwt2) // was copied from server B - require_FileEqual(dirB, apub, ajwt2) // was restarted with this - require_FileEqual(dirC, apub, ajwt1) // still contains old cached value + require_JWTEqual(t, dirA, apub, ajwt2) // was copied from server B + require_JWTEqual(t, dirB, apub, ajwt2) // was restarted with this + require_JWTEqual(t, dirC, apub, ajwt1) // still contains old cached value require_2Connection(sA.ClientURL(), aCreds, apub, sA, sB, sC) require_2Connection(sB.ClientURL(), aCreds, apub, sA, sB, sC) require_1Connection(sC.ClientURL(), aCreds, apub, sA, sB, sC) @@ -3512,7 +3513,7 @@ func TestAccountNATSResolverFetch(t *testing.T) { sC.Shutdown() sC, _ = RunServerWithConfig(confCshortTTL) //TODO remove this once we clean up accounts defer sC.Shutdown() - require_FileEqual(dirC, apub, ajwt1) // still contains old cached value + require_JWTEqual(t, dirC, apub, ajwt1) // still contains old cached value checkClusterFormed(t, sA, sB, sC) // Force next connect to do a lookup exceeds ttl fname := filepath.Join(dirC, apub+".jwt") @@ -3524,14 +3525,226 @@ func TestAccountNATSResolverFetch(t *testing.T) { return fmt.Errorf("File not removed in time") }) connect(sC.ClientURL(), aCreds, apub, sA, sB, sC) // When lookup happens - require_FileEqual(dirC, apub, ajwt2) // was looked up form A or B + require_JWTEqual(t, dirC, apub, ajwt2) // was looked up form A or B require_2Connection(sC.ClientURL(), aCreds, apub, sA, sB, sC) // Test exceeding limit. For the exclusive directory resolver, limit is a stop gap measure. // It is not expected to be hit. When hit the administrator is supposed to take action. - passCnt = updateJwt(sA.ClientURL(), sysCreds, dpub, djwt1) + passCnt = updateJwt(t, sA.ClientURL(), sysCreds, dpub, djwt1, 3) require_True(t, passCnt == 1) // Only Server C updated } +func TestAccountNATSResolverCrossClusterFetch(t *testing.T) { + connect := func(url string, credsfile string) { + t.Helper() + nc := natsConnect(t, url, nats.UserCredentials(credsfile)) + nc.Close() + } + createAccountAndUser := func(done chan struct{}, pubKey, jwt1, jwt2, creds *string) { + t.Helper() + kp, _ := nkeys.CreateAccount() + *pubKey, _ = kp.PublicKey() + claim := jwt.NewAccountClaims(*pubKey) + var err error + *jwt1, err = claim.Encode(oKp) + require_NoError(t, err) + // need to assure that create time differs (resolution is sec) + time.Sleep(time.Millisecond * 1100) + // create updated claim + claim.Tags.Add("tag") + *jwt2, err = claim.Encode(oKp) + require_NoError(t, err) + ukp, _ := nkeys.CreateUser() + seed, _ := ukp.Seed() + upub, _ := ukp.PublicKey() + uclaim := newJWTTestUserClaims() + uclaim.Subject = upub + ujwt, err := uclaim.Encode(kp) + require_NoError(t, err) + *creds = genCredsFile(t, ujwt, seed) + done <- struct{}{} + } + // Create Accounts and corresponding user creds. Do so concurrently to speed up the test + doneChan := make(chan struct{}, 3) + defer close(doneChan) + var syspub, sysjwt, dummy1, sysCreds string + go createAccountAndUser(doneChan, &syspub, &sysjwt, &dummy1, &sysCreds) + var apub, ajwt1, ajwt2, aCreds string + go createAccountAndUser(doneChan, &apub, &ajwt1, &ajwt2, &aCreds) + var bpub, bjwt1, bjwt2, bCreds string + go createAccountAndUser(doneChan, &bpub, &bjwt1, &bjwt2, &bCreds) + for i := 0; i < cap(doneChan); i++ { + <-doneChan + } + defer os.Remove(sysCreds) + defer os.Remove(aCreds) + defer os.Remove(bCreds) + // Create one directory for each server + dirAA := createDir(t, "srv-a-a") + defer os.RemoveAll(dirAA) + dirAB := createDir(t, "srv-a-b") + defer os.RemoveAll(dirAB) + dirBA := createDir(t, "srv-b-a") + defer os.RemoveAll(dirBA) + dirBB := createDir(t, "srv-b-b") + defer os.RemoveAll(dirBB) + // simulate a restart of the server by storing files in them + // Server AA & AB will completely sync + // Server BA & BB will completely sync + // Be aware that no syncing will occur between cluster + writeJWT(t, dirAA, apub, ajwt1) + writeJWT(t, dirBA, bpub, bjwt1) + // Create seed server A (using no_advertise to prevent fail over) + confAA := createConfFile(t, []byte(fmt.Sprintf(` + listen: -1 + server_name: srv-A-A + operator: %s + system_account: %s + resolver: { + type: full + dir: %s + interval: "200ms" + } + gateway: { + name: "clust-A" + listen: -1 + } + cluster { + name: clust-A + listen: -1 + no_advertise: true + } + `, ojwt, syspub, dirAA))) + defer os.Remove(confAA) + sAA, _ := RunServerWithConfig(confAA) + defer sAA.Shutdown() + // Create Server B (using no_advertise to prevent fail over) + confAB := createConfFile(t, []byte(fmt.Sprintf(` + listen: -1 + server_name: srv-A-B + operator: %s + system_account: %s + resolver: { + type: full + dir: %s + interval: "200ms" + } + gateway: { + name: "clust-A" + listen: -1 + } + cluster { + name: clust-A + listen: -1 + no_advertise: true + routes [ + nats-route://localhost:%d + ] + } + `, ojwt, syspub, dirAB, sAA.opts.Cluster.Port))) + defer os.Remove(confAB) + sAB, _ := RunServerWithConfig(confAB) + defer sAB.Shutdown() + // Create Server C (using no_advertise to prevent fail over) + confBA := createConfFile(t, []byte(fmt.Sprintf(` + listen: -1 + server_name: srv-B-A + operator: %s + system_account: %s + resolver: { + type: full + dir: %s + interval: "200ms" + } + gateway: { + name: "clust-B" + listen: -1 + gateways: [ + {name: "clust-A", url: "nats://localhost:%d"}, + ] + } + cluster { + name: clust-B + listen: -1 + no_advertise: true + } + `, ojwt, syspub, dirBA, sAA.opts.Gateway.Port))) + defer os.Remove(confBA) + sBA, _ := RunServerWithConfig(confBA) + defer sBA.Shutdown() + // Create Sever BA (using no_advertise to prevent fail over) + confBB := createConfFile(t, []byte(fmt.Sprintf(` + listen: -1 + server_name: srv-B-B + operator: %s + system_account: %s + resolver: { + type: full + dir: %s + interval: "200ms" + } + cluster { + name: clust-B + listen: -1 + no_advertise: true + routes [ + nats-route://localhost:%d + ] + } + gateway: { + name: "clust-B" + listen: -1 + gateways: [ + {name: "clust-A", url: "nats://localhost:%d"}, + ] + } + `, ojwt, syspub, dirBB, sBA.opts.Cluster.Port, sAA.opts.Cluster.Port))) + defer os.Remove(confBB) + sBB, _ := RunServerWithConfig(confBB) + defer sBB.Shutdown() + // Assert topology + checkClusterFormed(t, sAA, sAB) + checkClusterFormed(t, sBA, sBB) + waitForOutboundGateways(t, sAA, 1, 5*time.Second) + waitForOutboundGateways(t, sAB, 1, 5*time.Second) + waitForOutboundGateways(t, sBA, 1, 5*time.Second) + waitForOutboundGateways(t, sBB, 1, 5*time.Second) + time.Sleep(500 * time.Millisecond) // wait for the protocol to converge + updateJwt(t, sAA.ClientURL(), sysCreds, syspub, sysjwt, 4) // update system account jwt on all server + require_JWTEqual(t, dirAA, syspub, sysjwt) // assure this update made it to every server + require_JWTEqual(t, dirAB, syspub, sysjwt) // assure this update made it to every server + require_JWTEqual(t, dirBA, syspub, sysjwt) // assure this update made it to every server + require_JWTEqual(t, dirBB, syspub, sysjwt) // assure this update made it to every server + require_JWTAbsent(t, dirAA, bpub) // assure that jwt are not synced across cluster + require_JWTAbsent(t, dirAB, bpub) // assure that jwt are not synced across cluster + require_JWTAbsent(t, dirBA, apub) // assure that jwt are not synced across cluster + require_JWTAbsent(t, dirBB, apub) // assure that jwt are not synced across cluster + connect(sAA.ClientURL(), aCreds) // connect to cluster where jwt was initially stored + connect(sAB.ClientURL(), aCreds) // connect to cluster where jwt was initially stored + connect(sBA.ClientURL(), bCreds) // connect to cluster where jwt was initially stored + connect(sBB.ClientURL(), bCreds) // connect to cluster where jwt was initially stored + time.Sleep(500 * time.Millisecond) // wait for the protocol to (NOT) converge + require_JWTAbsent(t, dirAA, bpub) // assure that jwt are still not synced across cluster + require_JWTAbsent(t, dirAB, bpub) // assure that jwt are still not synced across cluster + require_JWTAbsent(t, dirBA, apub) // assure that jwt are still not synced across cluster + require_JWTAbsent(t, dirBB, apub) // assure that jwt are still not synced across cluster + // We have verified that account B does not exist in cluster A, neither does account A in cluster B + // Despite that clients from account B can connect to server A, same for account A in cluster B + connect(sAA.ClientURL(), bCreds) // connect to cluster where jwt was not initially stored + connect(sAB.ClientURL(), bCreds) // connect to cluster where jwt was not initially stored + connect(sBA.ClientURL(), aCreds) // connect to cluster where jwt was not initially stored + connect(sBB.ClientURL(), aCreds) // connect to cluster where jwt was not initially stored + require_JWTEqual(t, dirAA, bpub, bjwt1) // assure that now jwt used in connect is stored + require_JWTEqual(t, dirAB, bpub, bjwt1) // assure that now jwt used in connect is stored + require_JWTEqual(t, dirBA, apub, ajwt1) // assure that now jwt used in connect is stored + require_JWTEqual(t, dirBB, apub, ajwt1) // assure that now jwt used in connect is stored + updateJwt(t, sAA.ClientURL(), sysCreds, bpub, bjwt2, 4) // update bjwt, expect updates from everywhere + updateJwt(t, sBA.ClientURL(), sysCreds, apub, ajwt2, 4) // update ajwt, expect updates from everywhere + require_JWTEqual(t, dirAA, bpub, bjwt2) // assure that jwt got updated accordingly + require_JWTEqual(t, dirAB, bpub, bjwt2) // assure that jwt got updated accordingly + require_JWTEqual(t, dirBA, apub, ajwt2) // assure that jwt got updated accordingly + require_JWTEqual(t, dirBB, apub, ajwt2) // assure that jwt got updated accordingly +} + func newTimeRange(start time.Time, dur time.Duration) jwt.TimeRange { return jwt.TimeRange{Start: start.Format("15:04:05"), End: start.Add(dur).Format("15:04:05")} } From 4e055d7b72cf6302e6e53a38f82293f8b5b92f04 Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Tue, 29 Sep 2020 14:34:37 -0400 Subject: [PATCH 2/3] Fixing test race condition Signed-off-by: Matthias Hanel --- server/accounts.go | 3 +-- server/jwt_test.go | 13 +++++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 5e6963ed..7b1826e0 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -3053,9 +3053,8 @@ func (dr *DirAccResolver) Fetch(name string) (string, error) { dr.Unlock() if srv == nil { return "", err - } else { - return srv.fetch(dr, name) // lookup from other server } + return srv.fetch(dr, name) // lookup from other server } } diff --git a/server/jwt_test.go b/server/jwt_test.go index 259ee0a1..8b4a8f6f 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -3531,6 +3531,19 @@ func TestAccountNATSResolverFetch(t *testing.T) { // It is not expected to be hit. When hit the administrator is supposed to take action. passCnt = updateJwt(t, sA.ClientURL(), sysCreds, dpub, djwt1, 3) require_True(t, passCnt == 1) // Only Server C updated + for _, srv := range []*Server{sA, sB, sC} { + if a, ok := srv.accounts.Load(syspub); ok { + acc := a.(*Account) + checkFor(t, time.Second, 20*time.Millisecond, func() error { + acc.mu.Lock() + defer acc.mu.Unlock() + if acc.ctmr != nil { + return fmt.Errorf("Timer still exists") + } + return nil + }) + } + } } func TestAccountNATSResolverCrossClusterFetch(t *testing.T) { From 01453e03cd004c2506e7d36e9b1ca61f71f2d06e Mon Sep 17 00:00:00 2001 From: Matthias Hanel Date: Tue, 29 Sep 2020 15:54:20 -0400 Subject: [PATCH 3/3] Add defer srv.Shutdown() where manual shutdown was done Signed-off-by: Matthias Hanel --- server/jwt_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/server/jwt_test.go b/server/jwt_test.go index 8b4a8f6f..da15039d 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -3416,6 +3416,7 @@ func TestAccountNATSResolverFetch(t *testing.T) { `, ojwt, syspub, dirB, sA.opts.Cluster.Port))) defer os.Remove(confB) sB, _ := RunServerWithConfig(confB) + defer sB.Shutdown() // Create Server C (using no_advertise to prevent fail over) fmtC := ` listen: -1 @@ -3442,6 +3443,7 @@ func TestAccountNATSResolverFetch(t *testing.T) { confCshortTTL := createConfFile(t, []byte(fmt.Sprintf(fmtC, ojwt, syspub, dirC, 1000, sA.opts.Cluster.Port))) defer os.Remove(confCshortTTL) sC, _ := RunServerWithConfig(confClongTTL) // use long ttl to assure it is not kicking + defer sC.Shutdown() // startup cluster checkClusterFormed(t, sA, sB, sC) time.Sleep(500 * time.Millisecond) // wait for the protocol to converge @@ -3503,9 +3505,10 @@ func TestAccountNATSResolverFetch(t *testing.T) { // Restart server C. this is a workaround to force C to do a lookup in the absence of account cleanup sC.Shutdown() sC, _ = RunServerWithConfig(confClongTTL) //TODO remove this once we clean up accounts - require_JWTEqual(t, dirA, apub, ajwt2) // was copied from server B - require_JWTEqual(t, dirB, apub, ajwt2) // was restarted with this - require_JWTEqual(t, dirC, apub, ajwt1) // still contains old cached value + defer sC.Shutdown() + require_JWTEqual(t, dirA, apub, ajwt2) // was copied from server B + require_JWTEqual(t, dirB, apub, ajwt2) // was restarted with this + require_JWTEqual(t, dirC, apub, ajwt1) // still contains old cached value require_2Connection(sA.ClientURL(), aCreds, apub, sA, sB, sC) require_2Connection(sB.ClientURL(), aCreds, apub, sA, sB, sC) require_1Connection(sC.ClientURL(), aCreds, apub, sA, sB, sC)