mirror of
https://github.com/gogrlx/nats-server.git
synced 2026-04-14 10:10:42 -07:00
[fixed] hanging leaf node connection when account can't be found (#2267)
* [fixed] hanging leaf node connection when account can't be found as a result of the issue, the leaf node connection never got created, even after the account can be found. Also tracing account id and name (when available) Signed-off-by: Matthias Hanel <mh@synadia.com>
This commit is contained in:
@@ -2822,6 +2822,16 @@ func (s *Server) UpdateAccountClaims(a *Account, ac *jwt.AccountClaims) {
|
||||
s.updateAccountClaimsWithRefresh(a, ac, true)
|
||||
}
|
||||
|
||||
func (a *Account) traceLabel() string {
|
||||
if a == nil {
|
||||
return _EMPTY_
|
||||
}
|
||||
if a.nameTag != _EMPTY_ {
|
||||
return fmt.Sprintf("%s/%s", a.Name, a.nameTag)
|
||||
}
|
||||
return a.Name
|
||||
}
|
||||
|
||||
// updateAccountClaimsWithRefresh will update an existing account with new claims.
|
||||
// If refreshImportingAccounts is true it will also update incomplete dependent accounts
|
||||
// This will replace any exports or imports previously defined.
|
||||
@@ -2927,7 +2937,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
jsEnabled := s.JetStreamEnabled()
|
||||
if jsEnabled && a == s.SystemAccount() {
|
||||
for _, export := range allJsExports {
|
||||
s.Debugf("Adding jetstream service export %q for %s", export, a.Name)
|
||||
s.Debugf("Adding jetstream service export %q for %s", export, a.traceLabel())
|
||||
if err := a.AddServiceExport(export, nil); err != nil {
|
||||
s.Errorf("Error setting up jetstream service exports: %v", err)
|
||||
}
|
||||
@@ -2938,13 +2948,13 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
for _, e := range ac.Exports {
|
||||
switch e.Type {
|
||||
case jwt.Stream:
|
||||
s.Debugf("Adding stream export %q for %s", e.Subject, a.Name)
|
||||
s.Debugf("Adding stream export %q for %s", e.Subject, a.traceLabel())
|
||||
if err := a.addStreamExportWithAccountPos(
|
||||
string(e.Subject), authAccounts(e.TokenReq), e.AccountTokenPosition); err != nil {
|
||||
s.Debugf("Error adding stream export to account [%s]: %v", a.Name, err.Error())
|
||||
s.Debugf("Error adding stream export to account [%s]: %v", a.traceLabel(), err.Error())
|
||||
}
|
||||
case jwt.Service:
|
||||
s.Debugf("Adding service export %q for %s", e.Subject, a.Name)
|
||||
s.Debugf("Adding service export %q for %s", e.Subject, a.traceLabel())
|
||||
rt := Singleton
|
||||
switch e.ResponseType {
|
||||
case jwt.ResponseTypeStream:
|
||||
@@ -2954,7 +2964,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
}
|
||||
if err := a.addServiceExportWithResponseAndAccountPos(
|
||||
string(e.Subject), rt, authAccounts(e.TokenReq), e.AccountTokenPosition); err != nil {
|
||||
s.Debugf("Error adding service export to account [%s]: %v", a.Name, err)
|
||||
s.Debugf("Error adding service export to account [%s]: %v", a.traceLabel(), err)
|
||||
continue
|
||||
}
|
||||
sub := string(e.Subject)
|
||||
@@ -2964,13 +2974,13 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
if e.Latency.Sampling == jwt.Headers {
|
||||
hdrNote = " (using headers)"
|
||||
}
|
||||
s.Debugf("Error adding latency tracking%s for service export to account [%s]: %v", hdrNote, a.Name, err)
|
||||
s.Debugf("Error adding latency tracking%s for service export to account [%s]: %v", hdrNote, a.traceLabel(), err)
|
||||
}
|
||||
}
|
||||
if e.ResponseThreshold != 0 {
|
||||
// Response threshold was set in options.
|
||||
if err := a.SetServiceExportResponseThreshold(sub, e.ResponseThreshold); err != nil {
|
||||
s.Debugf("Error adding service export response threshold for [%s]: %v", a.Name, err)
|
||||
s.Debugf("Error adding service export response threshold for [%s]: %v", a.traceLabel(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3008,14 +3018,14 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
if i.LocalSubject != _EMPTY_ {
|
||||
// set local subject implies to is empty
|
||||
to = string(i.LocalSubject)
|
||||
s.Debugf("Adding stream import %s:%q for %s:%q", acc.Name, from, a.Name, to)
|
||||
s.Debugf("Adding stream import %s:%q for %s:%q", acc.traceLabel(), from, a.traceLabel(), to)
|
||||
err = a.AddMappedStreamImportWithClaim(acc, from, to, i)
|
||||
} else {
|
||||
s.Debugf("Adding stream import %s:%q for %s:%q", acc.Name, from, a.Name, to)
|
||||
s.Debugf("Adding stream import %s:%q for %s:%q", acc.traceLabel(), from, a.traceLabel(), to)
|
||||
err = a.AddStreamImportWithClaim(acc, from, to, i)
|
||||
}
|
||||
if err != nil {
|
||||
s.Debugf("Error adding stream import to account [%s]: %v", a.Name, err.Error())
|
||||
s.Debugf("Error adding stream import to account [%s]: %v", a.traceLabel(), err.Error())
|
||||
incompleteImports = append(incompleteImports, i)
|
||||
}
|
||||
case jwt.Service:
|
||||
@@ -3023,9 +3033,9 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
from = string(i.LocalSubject)
|
||||
to = string(i.Subject)
|
||||
}
|
||||
s.Debugf("Adding service import %s:%q for %s:%q", acc.Name, from, a.Name, to)
|
||||
s.Debugf("Adding service import %s:%q for %s:%q", acc.traceLabel(), from, a.traceLabel(), to)
|
||||
if err := a.AddServiceImportWithClaim(acc, from, to, i); err != nil {
|
||||
s.Debugf("Error adding service import to account [%s]: %v", a.Name, err.Error())
|
||||
s.Debugf("Error adding service import to account [%s]: %v", a.traceLabel(), err.Error())
|
||||
incompleteImports = append(incompleteImports, i)
|
||||
}
|
||||
}
|
||||
@@ -3168,7 +3178,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
// regardless of enabled or disabled. It handles both cases.
|
||||
if jsEnabled {
|
||||
if err := s.configJetStream(a); err != nil {
|
||||
s.Errorf("Error configuring jetstream for account [%s]: %v", a.Name, err.Error())
|
||||
s.Errorf("Error configuring jetstream for account [%s]: %v", a.traceLabel(), err.Error())
|
||||
a.mu.Lock()
|
||||
// Absent reload of js server cfg, this is going to be broken until js is disabled
|
||||
a.incomplete = true
|
||||
@@ -3235,6 +3245,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
acc.mu.RLock()
|
||||
incomplete := acc.incomplete
|
||||
name := acc.Name
|
||||
label := acc.traceLabel()
|
||||
// Must use jwt in account or risk failing on fetch
|
||||
// This jwt may not be the same that caused exportingAcc to be in incompleteAccExporterMap
|
||||
claimJWT := acc.claimJWT
|
||||
@@ -3250,7 +3261,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim
|
||||
// Since this account just got updated, the import itself may be in error. So trace that.
|
||||
if _, ok := s.incompleteAccExporterMap.Load(old.Name); ok {
|
||||
s.incompleteAccExporterMap.Delete(old.Name)
|
||||
s.Errorf("Account %s has issues importing account %s", name, old.Name)
|
||||
s.Errorf("Account %s has issues importing account %s", label, old.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1492,7 +1492,9 @@ func (c *client) markConnAsClosed(reason ClosedState) {
|
||||
// Be consistent with the creation: for routes, gateways and leaf,
|
||||
// we use Noticef on create, so use that too for delete.
|
||||
if c.srv != nil {
|
||||
if c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF {
|
||||
if c.kind == LEAF {
|
||||
c.Noticef("%s connection closed: %s account: %s", c.typeString(), reason, c.acc.traceLabel())
|
||||
} else if c.kind == ROUTER || c.kind == GATEWAY {
|
||||
c.Noticef("%s connection closed: %s", c.typeString(), reason)
|
||||
} else { // Client, System, Jetstream, and Account connections.
|
||||
c.Debugf("%s connection closed: %s", c.typeString(), reason)
|
||||
|
||||
@@ -795,7 +795,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
|
||||
// Determines if we are soliciting the connection or not.
|
||||
var solicited bool
|
||||
var acc *Account
|
||||
|
||||
var remoteSuffix string
|
||||
if remote != nil {
|
||||
// TODO: Decide what should be the optimal behavior here.
|
||||
// For now, if lookup fails, we will constantly try
|
||||
@@ -814,13 +814,15 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf
|
||||
acc, err = s.LookupAccount(lacc)
|
||||
if err != nil {
|
||||
s.Errorf("No local account %q for leafnode: %v", lacc, err)
|
||||
c.closeConnection(MissingAccount)
|
||||
return nil
|
||||
}
|
||||
remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.initClient()
|
||||
c.Noticef("Leafnode connection created")
|
||||
c.Noticef("Leafnode connection created%s", remoteSuffix)
|
||||
|
||||
if remote != nil {
|
||||
solicited = true
|
||||
|
||||
Reference in New Issue
Block a user