mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 11:04:42 -07:00
[FIXED] Error when importing an account results in an error, retry later (#1578)
* [FIXED] Error when importing an account results in an error When the account that could not be imported is updated, update the original account as well. Fixes #1582 Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
@@ -71,6 +71,7 @@ type Account struct {
|
||||
jsLimits *JetStreamAccountLimits
|
||||
limits
|
||||
expired bool
|
||||
incomplete bool
|
||||
signingKeys []string
|
||||
srv *Server // server this account is registered with (possibly nil)
|
||||
lds string // loop detection subject for leaf nodes
|
||||
@@ -2350,6 +2351,14 @@ func (a *Account) isClaimAccount() bool {
|
||||
// This will replace any exports or imports previously defined.
|
||||
// Lock MUST NOT be held upon entry.
|
||||
func (s *Server) UpdateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
s.updateAccountClaimsWithRefresh(a, ac, true)
|
||||
}
|
||||
|
||||
// updateAccountClaimsWithRefresh will update an existing account with new claims.
|
||||
// If refreshImportingAccounts is true it will also update incomplete dependent accounts
|
||||
// This will replace any exports or imports previously defined.
|
||||
// Lock MUST NOT be held upon entry.
|
||||
func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaims, refreshImportingAccounts bool) {
|
||||
if a == nil {
|
||||
return
|
||||
}
|
||||
@@ -2448,6 +2457,7 @@ func (s *Server) UpdateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
a.mu.Unlock()
|
||||
}
|
||||
}
|
||||
var incompleteImports []*jwt.Import
|
||||
for _, i := range ac.Imports {
|
||||
// check tmpAccounts with priority
|
||||
var acc *Account
|
||||
@@ -2459,6 +2469,7 @@ func (s *Server) UpdateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
}
|
||||
if acc == nil || err != nil {
|
||||
s.Errorf("Can't locate account [%s] for import of [%v] %s (err=%v)", i.Account, i.Subject, i.Type, err)
|
||||
incompleteImports = append(incompleteImports, i)
|
||||
continue
|
||||
}
|
||||
switch i.Type {
|
||||
@@ -2466,12 +2477,14 @@ func (s *Server) UpdateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
s.Debugf("Adding stream import %s:%q for %s:%q", acc.Name, i.Subject, a.Name, i.To)
|
||||
if err := a.AddStreamImportWithClaim(acc, string(i.Subject), string(i.To), i); err != nil {
|
||||
s.Debugf("Error adding stream import to account [%s]: %v", a.Name, err.Error())
|
||||
incompleteImports = append(incompleteImports, i)
|
||||
}
|
||||
case jwt.Service:
|
||||
// FIXME(dlc) - need to add in respThresh here eventually.
|
||||
s.Debugf("Adding service import %s:%q for %s:%q", acc.Name, i.Subject, a.Name, i.To)
|
||||
if err := a.AddServiceImportWithClaim(acc, string(i.Subject), string(i.To), i); err != nil {
|
||||
s.Debugf("Error adding service import to account [%s]: %v", a.Name, err.Error())
|
||||
incompleteImports = append(incompleteImports, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2574,6 +2587,10 @@ func (s *Server) UpdateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
}
|
||||
}
|
||||
a.defaultPerms = buildPermissionsFromJwt(&ac.DefaultPermissions)
|
||||
a.incomplete = len(incompleteImports) != 0
|
||||
for _, i := range incompleteImports {
|
||||
s.incompleteAccExporterMap.Store(i.Account, struct{}{})
|
||||
}
|
||||
a.mu.Unlock()
|
||||
|
||||
clients := gatherClients()
|
||||
@@ -2623,6 +2640,36 @@ func (s *Server) UpdateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if _, ok := s.incompleteAccExporterMap.Load(old.Name); ok && refreshImportingAccounts {
|
||||
s.incompleteAccExporterMap.Delete(old.Name)
|
||||
s.accounts.Range(func(key, value interface{}) bool {
|
||||
acc := value.(*Account)
|
||||
acc.mu.RLock()
|
||||
incomplete := acc.incomplete
|
||||
name := acc.Name
|
||||
// Must use jwt in account or risk failing on fetch
|
||||
// This jwt may not be the same that caused exportingAcc to be in incompleteAccExporterMap
|
||||
claimJWT := acc.claimJWT
|
||||
acc.mu.RUnlock()
|
||||
if incomplete && name != old.Name {
|
||||
if accClaims, _, err := s.verifyAccountClaims(claimJWT); err == nil {
|
||||
// Since claimJWT has not changed, acc can become complete
|
||||
// but it won't alter incomplete for it's dependents accounts.
|
||||
s.updateAccountClaimsWithRefresh(acc, accClaims, false)
|
||||
// old.Name was deleted before ranging over accounts
|
||||
// If it exists again, UpdateAccountClaims set it for failed imports of acc.
|
||||
// So there was one import of acc that imported this account and failed again.
|
||||
// Since this account just got updated, the import itself may be in error. So trace that.
|
||||
if _, ok := s.incompleteAccExporterMap.Load(old.Name); ok {
|
||||
s.incompleteAccExporterMap.Delete(old.Name)
|
||||
s.Errorf("Account %s has issues importing account %s", name, old.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Helper to build an internal account structure from a jwt.AccountClaims.
|
||||
|
||||
@@ -1719,6 +1719,304 @@ func TestAccountURLResolverNoFetchOnReload(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccountURLResolverFetchFailureInServer1(t *testing.T) {
|
||||
const subj = "test"
|
||||
const crossAccSubj = "test"
|
||||
// Create Exporting Account
|
||||
expkp, _ := nkeys.CreateAccount()
|
||||
exppub, _ := expkp.PublicKey()
|
||||
expac := jwt.NewAccountClaims(exppub)
|
||||
expac.Exports.Add(&jwt.Export{
|
||||
Subject: crossAccSubj,
|
||||
Type: jwt.Stream,
|
||||
})
|
||||
expjwt, err := expac.Encode(oKp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
// Create importing Account
|
||||
impkp, _ := nkeys.CreateAccount()
|
||||
imppub, _ := impkp.PublicKey()
|
||||
impac := jwt.NewAccountClaims(imppub)
|
||||
impac.Imports.Add(&jwt.Import{
|
||||
Account: exppub,
|
||||
Subject: crossAccSubj,
|
||||
Type: jwt.Stream,
|
||||
})
|
||||
impjwt, err := impac.Encode(oKp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
// Simulate an account server that drops the first request to exppub
|
||||
chanImpA := make(chan struct{}, 10)
|
||||
defer close(chanImpA)
|
||||
chanExpS := make(chan struct{}, 10)
|
||||
defer close(chanExpS)
|
||||
chanExpF := make(chan struct{}, 1)
|
||||
defer close(chanExpF)
|
||||
failureCnt := int32(0)
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path == "/A/" {
|
||||
// Server startup
|
||||
w.Write(nil)
|
||||
chanImpA <- struct{}{}
|
||||
} else if r.URL.Path == "/A/"+imppub {
|
||||
w.Write([]byte(impjwt))
|
||||
chanImpA <- struct{}{}
|
||||
} else if r.URL.Path == "/A/"+exppub {
|
||||
if atomic.AddInt32(&failureCnt, 1) <= 1 {
|
||||
// skip the write to simulate the failure
|
||||
chanExpF <- struct{}{}
|
||||
} else {
|
||||
w.Write([]byte(expjwt))
|
||||
chanExpS <- struct{}{}
|
||||
}
|
||||
} else {
|
||||
t.Fatal("not expected")
|
||||
}
|
||||
}))
|
||||
defer ts.Close()
|
||||
// Create server
|
||||
confA := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
operator: %s
|
||||
resolver: URL("%s/A/")
|
||||
`, ojwt, ts.URL)))
|
||||
defer os.Remove(confA)
|
||||
sA := RunServer(LoadConfig(confA))
|
||||
defer sA.Shutdown()
|
||||
// server observed one fetch on startup
|
||||
chanRecv(t, chanImpA, 10*time.Second)
|
||||
// Create first client
|
||||
ncA := natsConnect(t, sA.ClientURL(), createUserCreds(t, nil, impkp))
|
||||
defer ncA.Close()
|
||||
// create a test subscription
|
||||
subA, err := ncA.SubscribeSync(subj)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error during subscribe: %v", err)
|
||||
}
|
||||
defer subA.Unsubscribe()
|
||||
// Connect of client triggered a fetch of both accounts
|
||||
// the fetch for the imported account will fail
|
||||
chanRecv(t, chanImpA, 10*time.Second)
|
||||
chanRecv(t, chanExpF, 10*time.Second)
|
||||
// create second client for user exporting
|
||||
ncB := natsConnect(t, sA.ClientURL(), createUserCreds(t, nil, expkp))
|
||||
defer ncB.Close()
|
||||
chanRecv(t, chanExpS, 10*time.Second)
|
||||
// Connect of client triggered another fetch, this time passing
|
||||
checkSubInterest(t, sA, imppub, subj, 10*time.Second)
|
||||
checkSubInterest(t, sA, exppub, crossAccSubj, 10*time.Second) // Will fail as a result of this issue
|
||||
}
|
||||
|
||||
func TestAccountURLResolverFetchFailurePushReorder(t *testing.T) {
|
||||
const subj = "test"
|
||||
const crossAccSubj = "test"
|
||||
// Create System Account
|
||||
syskp, _ := nkeys.CreateAccount()
|
||||
syspub, _ := syskp.PublicKey()
|
||||
sysAc := jwt.NewAccountClaims(syspub)
|
||||
sysjwt, err := sysAc.Encode(oKp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
// Create Exporting Account
|
||||
expkp, _ := nkeys.CreateAccount()
|
||||
exppub, _ := expkp.PublicKey()
|
||||
expac := jwt.NewAccountClaims(exppub)
|
||||
expjwt1, err := expac.Encode(oKp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
expac.Exports.Add(&jwt.Export{
|
||||
Subject: crossAccSubj,
|
||||
Type: jwt.Stream,
|
||||
})
|
||||
expjwt2, err := expac.Encode(oKp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
// Create importing Account
|
||||
impkp, _ := nkeys.CreateAccount()
|
||||
imppub, _ := impkp.PublicKey()
|
||||
impac := jwt.NewAccountClaims(imppub)
|
||||
impac.Imports.Add(&jwt.Import{
|
||||
Account: exppub,
|
||||
Subject: crossAccSubj,
|
||||
Type: jwt.Stream,
|
||||
})
|
||||
impjwt, err := impac.Encode(oKp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
// Simulate an account server that does not serve the updated jwt for exppub
|
||||
chanImpA := make(chan struct{}, 10)
|
||||
defer close(chanImpA)
|
||||
chanExpS := make(chan struct{}, 10)
|
||||
defer close(chanExpS)
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path == "/A/" {
|
||||
// Server startup
|
||||
w.Write(nil)
|
||||
chanImpA <- struct{}{}
|
||||
} else if r.URL.Path == "/A/"+imppub {
|
||||
w.Write([]byte(impjwt))
|
||||
chanImpA <- struct{}{}
|
||||
} else if r.URL.Path == "/A/"+exppub {
|
||||
// respond with jwt that does not have the export
|
||||
// this simulates an ordering issue
|
||||
w.Write([]byte(expjwt1))
|
||||
chanExpS <- struct{}{}
|
||||
} else if r.URL.Path == "/A/"+syspub {
|
||||
w.Write([]byte(sysjwt))
|
||||
} else {
|
||||
t.Fatal("not expected")
|
||||
}
|
||||
}))
|
||||
defer ts.Close()
|
||||
confA := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
operator: %s
|
||||
resolver: URL("%s/A/")
|
||||
system_account: %s
|
||||
`, ojwt, ts.URL, syspub)))
|
||||
defer os.Remove(confA)
|
||||
sA := RunServer(LoadConfig(confA))
|
||||
defer sA.Shutdown()
|
||||
// server observed one fetch on startup
|
||||
chanRecv(t, chanImpA, 10*time.Second)
|
||||
// Create first client
|
||||
ncA := natsConnect(t, sA.ClientURL(), createUserCreds(t, nil, impkp))
|
||||
defer ncA.Close()
|
||||
// create a test subscription
|
||||
subA, err := ncA.SubscribeSync(subj)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error during subscribe: %v", err)
|
||||
}
|
||||
defer subA.Unsubscribe()
|
||||
// Connect of client triggered a fetch of both accounts
|
||||
// the fetch for the imported account will fail
|
||||
chanRecv(t, chanImpA, 10*time.Second)
|
||||
chanRecv(t, chanExpS, 10*time.Second)
|
||||
// create second client for user exporting
|
||||
ncB := natsConnect(t, sA.ClientURL(), createUserCreds(t, nil, expkp))
|
||||
defer ncB.Close()
|
||||
// update expjwt2, this will correct the import issue
|
||||
sysc := natsConnect(t, sA.ClientURL(), createUserCreds(t, nil, syskp))
|
||||
defer sysc.Close()
|
||||
natsPub(t, sysc, fmt.Sprintf(accUpdateEventSubj, exppub), []byte(expjwt2))
|
||||
sysc.Flush()
|
||||
// updating expjwt should cause this to pass
|
||||
checkSubInterest(t, sA, imppub, subj, 10*time.Second)
|
||||
checkSubInterest(t, sA, exppub, crossAccSubj, 10*time.Second) // Will fail as a result of this issue
|
||||
}
|
||||
|
||||
type captureDebugLogger struct {
|
||||
DummyLogger
|
||||
dbgCh chan string
|
||||
}
|
||||
|
||||
func (l *captureDebugLogger) Debugf(format string, v ...interface{}) {
|
||||
select {
|
||||
case l.dbgCh <- fmt.Sprintf(format, v...):
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccountURLResolverPermanentFetchFailure(t *testing.T) {
|
||||
const crossAccSubj = "test"
|
||||
expkp, _ := nkeys.CreateAccount()
|
||||
exppub, _ := expkp.PublicKey()
|
||||
impkp, _ := nkeys.CreateAccount()
|
||||
imppub, _ := impkp.PublicKey()
|
||||
// Create System Account
|
||||
syskp, _ := nkeys.CreateAccount()
|
||||
syspub, _ := syskp.PublicKey()
|
||||
sysAc := jwt.NewAccountClaims(syspub)
|
||||
sysjwt, err := sysAc.Encode(oKp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
// Create 2 Accounts. Each importing from the other, but NO matching export
|
||||
expac := jwt.NewAccountClaims(exppub)
|
||||
expac.Imports.Add(&jwt.Import{
|
||||
Account: imppub,
|
||||
Subject: crossAccSubj,
|
||||
Type: jwt.Stream,
|
||||
})
|
||||
expjwt, err := expac.Encode(oKp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
// Create importing Account
|
||||
impac := jwt.NewAccountClaims(imppub)
|
||||
impac.Imports.Add(&jwt.Import{
|
||||
Account: exppub,
|
||||
Subject: crossAccSubj,
|
||||
Type: jwt.Stream,
|
||||
})
|
||||
impjwt, err := impac.Encode(oKp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
// Simulate an account server that does not serve the updated jwt for exppub
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path == "/A/" {
|
||||
// Server startup
|
||||
w.Write(nil)
|
||||
} else if r.URL.Path == "/A/"+imppub {
|
||||
w.Write([]byte(impjwt))
|
||||
} else if r.URL.Path == "/A/"+exppub {
|
||||
w.Write([]byte(expjwt))
|
||||
} else if r.URL.Path == "/A/"+syspub {
|
||||
w.Write([]byte(sysjwt))
|
||||
} else {
|
||||
t.Fatal("not expected")
|
||||
}
|
||||
}))
|
||||
defer ts.Close()
|
||||
confA := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
operator: %s
|
||||
resolver: URL("%s/A/")
|
||||
system_account: %s
|
||||
`, ojwt, ts.URL, syspub)))
|
||||
defer os.Remove(confA)
|
||||
o := LoadConfig(confA)
|
||||
sA := RunServer(o)
|
||||
defer sA.Shutdown()
|
||||
l := &captureDebugLogger{dbgCh: make(chan string, 100)} // has enough space to not block
|
||||
sA.SetLogger(l, true, false)
|
||||
// Create clients
|
||||
ncA := natsConnect(t, sA.ClientURL(), createUserCreds(t, nil, impkp))
|
||||
defer ncA.Close()
|
||||
ncB := natsConnect(t, sA.ClientURL(), createUserCreds(t, nil, expkp))
|
||||
defer ncB.Close()
|
||||
sysc := natsConnect(t, sA.ClientURL(), createUserCreds(t, nil, syskp))
|
||||
defer sysc.Close()
|
||||
// push accounts
|
||||
natsPub(t, sysc, fmt.Sprintf(accUpdateEventSubj, imppub), []byte(impjwt))
|
||||
natsPub(t, sysc, fmt.Sprintf(accUpdateEventSubj, exppub), []byte(expjwt))
|
||||
sysc.Flush()
|
||||
importErrCnt := 0
|
||||
tmr := time.NewTimer(500 * time.Millisecond)
|
||||
defer tmr.Stop()
|
||||
for {
|
||||
select {
|
||||
case line := <-l.dbgCh:
|
||||
if strings.HasPrefix(line, "Error adding stream import to account") {
|
||||
importErrCnt++
|
||||
}
|
||||
case <-tmr.C:
|
||||
// connecting and updating, each cause 3 traces (2 + 1 on iteration)
|
||||
if importErrCnt != 6 {
|
||||
t.Fatalf("Expected 6 debug traces, got %d", importErrCnt)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccountURLResolverFetchFailureInCluster(t *testing.T) {
|
||||
assertChanLen := func(x int, chans ...chan struct{}) {
|
||||
t.Helper()
|
||||
@@ -1730,15 +2028,6 @@ func TestAccountURLResolverFetchFailureInCluster(t *testing.T) {
|
||||
}
|
||||
const subj = ">"
|
||||
const crossAccSubj = "test"
|
||||
// Create Operator
|
||||
op, _ := nkeys.CreateOperator()
|
||||
opub, _ := op.PublicKey()
|
||||
oc := jwt.NewOperatorClaims(opub)
|
||||
oc.Subject = opub
|
||||
ojwt, err := oc.Encode(op)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating operator JWT: %v", err)
|
||||
}
|
||||
// Create Exporting Account
|
||||
expkp, _ := nkeys.CreateAccount()
|
||||
exppub, _ := expkp.PublicKey()
|
||||
@@ -1747,7 +2036,7 @@ func TestAccountURLResolverFetchFailureInCluster(t *testing.T) {
|
||||
Subject: crossAccSubj,
|
||||
Type: jwt.Stream,
|
||||
})
|
||||
expjwt, err := expac.Encode(op)
|
||||
expjwt, err := expac.Encode(oKp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
@@ -1764,7 +2053,7 @@ func TestAccountURLResolverFetchFailureInCluster(t *testing.T) {
|
||||
Subject: "srvc",
|
||||
Type: jwt.Service,
|
||||
})
|
||||
impjwt, err := impac.Encode(op)
|
||||
impjwt, err := impac.Encode(oKp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
|
||||
@@ -218,6 +218,9 @@ type Server struct {
|
||||
|
||||
// Websocket structure
|
||||
websocket srvWebsocket
|
||||
|
||||
// exporting account name the importer experienced issues with
|
||||
incompleteAccExporterMap sync.Map
|
||||
}
|
||||
|
||||
// Make sure all are 64bits for atomic use
|
||||
@@ -1206,13 +1209,13 @@ func (s *Server) updateAccountWithClaimJWT(acc *Account, claimJWT string) error
|
||||
if acc == nil {
|
||||
return ErrMissingAccount
|
||||
}
|
||||
acc.updated = time.Now()
|
||||
if acc.claimJWT != "" && acc.claimJWT == claimJWT {
|
||||
if acc.claimJWT != "" && acc.claimJWT == claimJWT && !acc.incomplete {
|
||||
s.Debugf("Requested account update for [%s], same claims detected", acc.Name)
|
||||
return ErrAccountResolverSameClaims
|
||||
}
|
||||
accClaims, _, err := s.verifyAccountClaims(claimJWT)
|
||||
if err == nil && accClaims != nil {
|
||||
acc.updated = time.Now()
|
||||
acc.mu.Lock()
|
||||
if acc.Issuer == "" {
|
||||
acc.Issuer = accClaims.Issuer
|
||||
|
||||
Reference in New Issue
Block a user