mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-16 19:14:41 -07:00
[Fixed] Skip fetch when a non config based account resolver is used
Resolves #1532 Instead of the fetched account we create a dummy account that is expired. Any client connecting will trigger a fetch of the actual account jwt. This also avoids one fetch, thus the unit test was changed to reflect this. Unlike other resolver the memory resolver does not depend on external systems. It is purely based on server configuration. Therefore, fetch can be done and not finding an account means there is a configuration issue.
This commit is contained in:
@@ -1696,8 +1696,16 @@ func TestAccountURLResolverNoFetchOnReload(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAccountURLResolverFetchFailureInCluster(t *testing.T) {
|
||||
t.SkipNow()
|
||||
const subj = "subscription.loss.test"
|
||||
assertChanLen := func(x int, chans ...chan struct{}) {
|
||||
t.Helper()
|
||||
for _, c := range chans {
|
||||
if len(c) != x {
|
||||
t.Fatalf("length of channel is not %d", x)
|
||||
}
|
||||
}
|
||||
}
|
||||
const subj = ">"
|
||||
const crossAccSubj = "test"
|
||||
// Create Operator
|
||||
op, _ := nkeys.CreateOperator()
|
||||
opub, _ := op.PublicKey()
|
||||
@@ -1707,11 +1715,32 @@ func TestAccountURLResolverFetchFailureInCluster(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating operator JWT: %v", err)
|
||||
}
|
||||
// Create Account
|
||||
akp, _ := nkeys.CreateAccount()
|
||||
apub, _ := akp.PublicKey()
|
||||
nac := jwt.NewAccountClaims(apub)
|
||||
ajwt, err := nac.Encode(op)
|
||||
// Create Exporting Account
|
||||
expkp, _ := nkeys.CreateAccount()
|
||||
exppub, _ := expkp.PublicKey()
|
||||
expac := jwt.NewAccountClaims(exppub)
|
||||
expac.Exports.Add(&jwt.Export{
|
||||
Subject: crossAccSubj,
|
||||
Type: jwt.Stream,
|
||||
})
|
||||
expjwt, err := expac.Encode(op)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
// Create importing Account
|
||||
impkp, _ := nkeys.CreateAccount()
|
||||
imppub, _ := impkp.PublicKey()
|
||||
impac := jwt.NewAccountClaims(imppub)
|
||||
impac.Imports.Add(&jwt.Import{
|
||||
Account: exppub,
|
||||
Subject: crossAccSubj,
|
||||
Type: jwt.Stream,
|
||||
})
|
||||
impac.Exports.Add(&jwt.Export{
|
||||
Subject: "srvc",
|
||||
Type: jwt.Service,
|
||||
})
|
||||
impjwt, err := impac.Encode(op)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating account JWT: %v", err)
|
||||
}
|
||||
@@ -1721,39 +1750,46 @@ func TestAccountURLResolverFetchFailureInCluster(t *testing.T) {
|
||||
upub, _ := nkp.PublicKey()
|
||||
nuc := newJWTTestUserClaims()
|
||||
nuc.Subject = upub
|
||||
uJwt, err := nuc.Encode(akp)
|
||||
uJwt, err := nuc.Encode(impkp)
|
||||
if err != nil {
|
||||
t.Fatalf("Error generating user JWT: %v", err)
|
||||
}
|
||||
creds := genCredsFile(t, uJwt, uSeed)
|
||||
defer os.Remove(creds)
|
||||
// Simulate an account server that drops the first request to /B/acc
|
||||
chanA := make(chan string, 4)
|
||||
defer close(chanA)
|
||||
chanB := make(chan string, 4)
|
||||
defer close(chanB)
|
||||
bCnt := int32(0)
|
||||
errCnt := int32(1)
|
||||
chanImpA := make(chan struct{}, 4)
|
||||
defer close(chanImpA)
|
||||
chanImpB := make(chan struct{}, 4)
|
||||
defer close(chanImpB)
|
||||
chanExpA := make(chan struct{}, 4)
|
||||
defer close(chanExpA)
|
||||
chanExpB := make(chan struct{}, 4)
|
||||
defer close(chanExpB)
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path == "/A/" {
|
||||
// Server A startup
|
||||
w.Write([]byte(ajwt))
|
||||
chanA <- "/A/"
|
||||
w.Write(nil)
|
||||
chanImpA <- struct{}{}
|
||||
} else if r.URL.Path == "/B/" {
|
||||
// Server B startup
|
||||
w.Write([]byte(ajwt))
|
||||
chanB <- "/B/"
|
||||
} else if r.URL.Path == "/A/"+apub {
|
||||
w.Write(nil)
|
||||
chanImpB <- struct{}{}
|
||||
} else if r.URL.Path == "/A/"+imppub {
|
||||
// First Client connecting to Server A
|
||||
w.Write([]byte(ajwt))
|
||||
chanA <- "/A/"
|
||||
} else if r.URL.Path == "/B/"+apub {
|
||||
// This will be observed twice:
|
||||
// on a route connect (which is not responded to) then once the second client logs on
|
||||
if atomic.AddInt32(&bCnt, 1) > errCnt {
|
||||
w.Write([]byte(ajwt))
|
||||
}
|
||||
chanB <- "/B/"
|
||||
w.Write([]byte(impjwt))
|
||||
chanImpA <- struct{}{}
|
||||
} else if r.URL.Path == "/B/"+imppub {
|
||||
// Second Client connecting to Server B
|
||||
w.Write([]byte(impjwt))
|
||||
chanImpB <- struct{}{}
|
||||
} else if r.URL.Path == "/A/"+exppub {
|
||||
// First Client connecting to Server A
|
||||
w.Write([]byte(expjwt))
|
||||
chanExpA <- struct{}{}
|
||||
} else if r.URL.Path == "/B/"+exppub {
|
||||
// Second Client connecting to Server B
|
||||
w.Write([]byte(expjwt))
|
||||
chanExpB <- struct{}{}
|
||||
} else {
|
||||
t.Fatal("not expected")
|
||||
}
|
||||
@@ -1766,19 +1802,21 @@ func TestAccountURLResolverFetchFailureInCluster(t *testing.T) {
|
||||
resolver: URL("%s/A/")
|
||||
cluster {
|
||||
name: clust
|
||||
no_advertise: true
|
||||
listen: -1
|
||||
}
|
||||
`, ojwt, ts.URL)))
|
||||
defer os.Remove(confA)
|
||||
sA, _ := RunServerWithConfig(confA)
|
||||
sA := RunServer(LoadConfig(confA))
|
||||
defer sA.Shutdown()
|
||||
// Create Server B
|
||||
// Create Server B (using no_advertise to prevent failover)
|
||||
confB := createConfFile(t, []byte(fmt.Sprintf(`
|
||||
listen: -1
|
||||
operator: %s
|
||||
resolver: URL("%s/B/")
|
||||
cluster {
|
||||
name: clust
|
||||
no_advertise: true
|
||||
listen: -1
|
||||
routes [
|
||||
nats-route://localhost:%d
|
||||
@@ -1786,17 +1824,14 @@ func TestAccountURLResolverFetchFailureInCluster(t *testing.T) {
|
||||
}
|
||||
`, ojwt, ts.URL, sA.opts.Cluster.Port)))
|
||||
defer os.Remove(confB)
|
||||
sB, _ := RunServerWithConfig(confB)
|
||||
sB := RunServer(LoadConfig(confB))
|
||||
defer sB.Shutdown()
|
||||
// startup cluster
|
||||
checkClusterFormed(t, sA, sB)
|
||||
// Both server observed one fetch on startup
|
||||
if path := <-chanA; path != "/A/" {
|
||||
t.Fatal("Expect one fetch on A", path)
|
||||
}
|
||||
if path := <-chanB; path != "/B/" {
|
||||
t.Fatal("Expect one fetch on B", path)
|
||||
}
|
||||
<-chanImpA
|
||||
<-chanImpB
|
||||
assertChanLen(0, chanImpA, chanImpB, chanExpA, chanExpB)
|
||||
// Create first client, directly connects to A
|
||||
urlA := fmt.Sprintf("nats://%s:%d", sA.opts.Host, sA.opts.Port)
|
||||
ncA, err := nats.Connect(urlA, nats.UserCredentials(creds),
|
||||
@@ -1819,37 +1854,37 @@ func TestAccountURLResolverFetchFailureInCluster(t *testing.T) {
|
||||
t.Fatalf("Expected no error during subscribe: %v", err)
|
||||
}
|
||||
defer subA.Unsubscribe()
|
||||
// Both server observed one fetch for the connecting client
|
||||
if path := <-chanA; path != "/A/" {
|
||||
t.Fatal("Expect one fetch on A")
|
||||
}
|
||||
if path := <-chanB; path != "/B/" {
|
||||
t.Fatal("Expect one fetch on B")
|
||||
}
|
||||
// Connect of client triggered a fetch by Server A
|
||||
<-chanImpA
|
||||
<-chanExpA
|
||||
assertChanLen(0, chanImpA, chanImpB, chanExpA, chanExpB)
|
||||
//time.Sleep(10 * time.Second)
|
||||
// create second client, directly connect to B
|
||||
urlB := fmt.Sprintf("nats://%s:%d", sB.opts.Host, sB.opts.Port)
|
||||
ncB, err := nats.Connect(urlB, nats.UserCredentials(creds))
|
||||
ncB, err := nats.Connect(urlB, nats.UserCredentials(creds), nats.NoReconnect())
|
||||
if err != nil {
|
||||
t.Fatalf("Expected to connect, got %v %s", err, urlB)
|
||||
}
|
||||
defer ncB.Close()
|
||||
// Because the first request was a failure, a second request is observed as a result of connect to B
|
||||
for i := int32(0); i < errCnt; i++ {
|
||||
if path := <-chanB; path != "/B/" {
|
||||
t.Fatal("Expect one fetch on B")
|
||||
}
|
||||
}
|
||||
// Connect of client triggered a fetch by Server B
|
||||
<-chanImpB
|
||||
<-chanExpB
|
||||
assertChanLen(0, chanImpA, chanImpB, chanExpA, chanExpB)
|
||||
checkClusterFormed(t, sA, sB)
|
||||
// the route subscription was lost due to the failed fetch
|
||||
// Now we test if some recover mechanism is in play
|
||||
checkSubInterest(t, sB, apub, subj, 10*time.Second) // Will fail as a result of this issue
|
||||
checkSubInterest(t, sB, imppub, subj, 10*time.Second) // Will fail as a result of this issue
|
||||
checkSubInterest(t, sB, exppub, crossAccSubj, 10*time.Second) // Will fail as a result of this issue
|
||||
if err := ncB.Publish(subj, []byte("msg")); err != nil {
|
||||
t.Fatalf("Expected to publish %v", err)
|
||||
}
|
||||
// expect the message from B to flow to A
|
||||
if _, err := subA.NextMsg(10 * time.Second); err != nil {
|
||||
if m, err := subA.NextMsg(10 * time.Second); err != nil {
|
||||
t.Fatalf("Expected to receive a message %v", err)
|
||||
} else if string(m.Data) != "msg" {
|
||||
t.Fatalf("Expected to receive 'msg', got: %s", string(m.Data))
|
||||
}
|
||||
assertChanLen(0, chanImpA, chanImpB, chanExpA, chanExpB)
|
||||
}
|
||||
|
||||
func TestAccountURLResolverReturnDifferentOperator(t *testing.T) {
|
||||
|
||||
@@ -992,13 +992,47 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
|
||||
// Lookup the account
|
||||
// FIXME(dlc) - This may start having lots of contention?
|
||||
accountName := string(args[0+off])
|
||||
acc, _ := srv.LookupAccount(accountName)
|
||||
if acc == nil {
|
||||
if !srv.NewAccountsAllowed() {
|
||||
c.Debugf("Unknown account %q for subject %q", accountName, sub.subject)
|
||||
return nil
|
||||
// Lookup account while avoiding fetch.
|
||||
// A slow fetch delays subsequent remote messages. It also avoids the expired check (see below).
|
||||
// With all but memory resolver lookup can be delayed or fail.
|
||||
// It is also possible that the account can't be resolved yet.
|
||||
// This does not apply to the memory resolver.
|
||||
// When used, perform the fetch.
|
||||
staticResolver := true
|
||||
if res := srv.AccountResolver(); res != nil {
|
||||
if _, ok := res.(*MemAccResolver); ok {
|
||||
staticResolver = true
|
||||
} else {
|
||||
staticResolver = false
|
||||
}
|
||||
}
|
||||
acc := (*Account)(nil)
|
||||
if staticResolver {
|
||||
acc, _ = srv.LookupAccount(accountName)
|
||||
} else if v, ok := srv.accounts.Load(accountName); ok {
|
||||
acc = v.(*Account)
|
||||
} else if v, ok := srv.tmpAccounts.Load(accountName); ok {
|
||||
acc = v.(*Account)
|
||||
}
|
||||
if acc == nil {
|
||||
expire := false
|
||||
isNew := false
|
||||
if !srv.NewAccountsAllowed() {
|
||||
// if the option of retrieving accounts later exists, create an expired one.
|
||||
// When a client comes along, expiration will prevent it from being used,
|
||||
// cause a fetch and update the account to what is should be.
|
||||
if staticResolver {
|
||||
c.Errorf("Unknown account %q for remote subject %q", accountName, sub.subject)
|
||||
return
|
||||
}
|
||||
c.Debugf("Unknown account %q for remote subject %q", accountName, sub.subject)
|
||||
expire = true
|
||||
}
|
||||
if acc, isNew = srv.LookupOrRegisterAccount(accountName); isNew && expire {
|
||||
acc.mu.Lock()
|
||||
acc.expired = true
|
||||
acc.mu.Unlock()
|
||||
}
|
||||
acc, _ = srv.LookupOrRegisterAccount(accountName)
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
|
||||
@@ -1184,7 +1184,15 @@ func (s *Server) updateAccountWithClaimJWT(acc *Account, claimJWT string) error
|
||||
}
|
||||
accClaims, _, err := s.verifyAccountClaims(claimJWT)
|
||||
if err == nil && accClaims != nil {
|
||||
acc.mu.Lock()
|
||||
if acc.Issuer == "" {
|
||||
acc.Issuer = accClaims.Issuer
|
||||
} else if acc.Issuer != accClaims.Issuer {
|
||||
acc.mu.Unlock()
|
||||
return ErrAccountValidation
|
||||
}
|
||||
acc.claimJWT = claimJWT
|
||||
acc.mu.Unlock()
|
||||
s.UpdateAccountClaims(acc, accClaims)
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user